jayzeng / scrapy-elasticsearch Goto Github PK
View Code? Open in Web Editor NEWThis project forked from julien-duponchelle/scrapy-elasticsearch
A scrapy pipeline which send items to Elastic Search server
This project forked from julien-duponchelle/scrapy-elasticsearch
A scrapy pipeline which send items to Elastic Search server
I successfully installed it according to the document https://pypi.python.org/pypi/ScrapyElasticSearch. However when i try
scrapy crawl myspider
in the Python 3.5.2 (v3.5.2:4def2a2901a5, Jun 25 2016, 22:18:55) [MSC v.1900 64 bit (AMD64)] on win32, i get this error:
File "E:\Python35\lib\site-packages\scrapyelasticsearch\scrapyelasticsearch.py", line 25, in <module> from transportNTLM import TransportNTLM ImportError: No module named 'transportNTLM'
I checked in the folder and the transportNTLM.py module is there.
Hi,
I want to suggest to change the operation of the pipeline so that the items to be indexed are created by the user at the parser level and not via the parameters 'ELASTICSEARCH_INDEX' and 'ELASTICSEARCH_TYPE' by the pipeline.
Advantages:
-The user can specify different indices in elasticsearch for different parsers
-The user can control the '_op_type' setting of the bulk method to change for example from 'index' to 'update'
Cheers,
Julian
With Elasticsearch its common practice to add a date suffix to an index. If you index name is "test" there should we a way to automatically create monthly indexes (example: test-2016-6, test-2016-7, etc...)
I am willing to submit a Pull Request to this repo with the changes I made to my local copy, just need permission.
I am using scrapy-redis.
My spider is RedisSpider
(from docs)
The class scrapy_redis.spiders.RedisSpider enables a spider to read the urls from redis. The urls in the redis queue will be processed one after another, if the first request yields more requests, the spider will process those requests before fetching another url from redis.
If i am right: scrapy-elasticsearch sends data to elasticsearch after number of items >= ELASTICSEARCH_BUFFER_LENGTH setting.
if len(self.items_buffer) >= self.settings.get('ELASTICSEARCH_BUFFER_LENGTH', 500): self.send_items() self.items_buffer = []
RedisSpider is waiting when idle, so if we send 600 urls to redis and our ELASTICSEARCH_BUFFER_LENGTH is 500, 100 urls wont be saved. Beacuse RedisSpider never closes.
So i overrided spider_idle method:
Now spider closes when its idle. It works.
But using this code
I keep running the spider in loop that never ends. So when it closes it runs again.
If there are urls in redis queue they are crawled. Spider is closed, data is send to elasticsearch and spider restarts.
It works, but now the loop:
[]
Here's log of the loop:
https://gist.github.com/pythoncontrol/4e88f5de253ca406b24885af0b4673fd
I'm using S3 feed export to download images during scraping. I'm able to download the images to my S3 bucket but it happens after scrapy-elasticsearch has already completed. How can I index the response from S3 to include the image s3 url along with my item to be indexed?
Example:
item['thumbnail'] = 'https://s3-eu-west-1.amazonaws.com/image-url-response-from-s3
You can have an item without a single-field primary key, so this functionality is useless (or even dangerous!). Sure you can compute and add a new really unique field to the item, ie. from a concatenation of fields, but so it will be indexed along with the others. Maybe ELASTICSEARCH_UNIQ_KEY can accept a list of fields keys and concatenate their values (forced to strings) before the hash computing.
meh ignore =p
Configuration for Elasticsearch index and type is done statically in settings.py
. Is there a recommended approach to setting this during runtime, perhaps based on the item that is being piped?
I think pyes won't be migrating to elasticsearch 2.x any time soon so the only serious alternative I see is elasticsearch-py
I am recently changing from python 2 to 3, not sure if this is a valid issue.
When I configured my ELASTICSEARCH_UNIQ_KEY
value, I ran into a problem - if my unique ID is str, hashlib.sha1()
complains:
TypeError: Unicode-objects must be encoded before hashing
If I .encode('utf-8')
the ID before putting it in the field, line 73 in scrapyelasticsearch.py
complains 'unique key must be str'
To work around it, I have to put the ID in a list!
What's the purpose of if isinstance(unique_key, list):
in the def get_unique_key(self, unique_key)
method?
Hi,
I'm testing this plugin with the new Elasticsearch 6.x. version.
The Content-Type (json) is now required. I get following error:
[elasticsearch] DEBUG: < {"error":"Content-Type header [] is not supported","status":406}
Is there a way to set the content-type to json ?
Thanks !
Hello,
How can I solve issues related to encoding/decoding, below is the traceback:
Traceback (most recent call last): File "/usr/local/lib/python2.7/dist-packages/twisted/internet/defer.py", line 654, in _runCallbacks current.result = callback(current.result, *args, **kw) File "/usr/local/lib/python2.7/dist-packages/scrapyelasticsearch/scrapyelasticsearch.py", line 159, in close_spider self.send_items() File "/usr/local/lib/python2.7/dist-packages/scrapyelasticsearch/scrapyelasticsearch.py", line 146, in send_items helpers.bulk(self.es, self.items_buffer) File "/usr/local/lib/python2.7/dist-packages/elasticsearch/helpers/actions.py", line 304, in bulk for ok, item in streaming_bulk(client, actions, *args, **kwargs): File "/usr/local/lib/python2.7/dist-packages/elasticsearch/helpers/actions.py", line 216, in streaming_bulk actions, chunk_size, max_chunk_bytes, client.transport.serializer File "/usr/local/lib/python2.7/dist-packages/elasticsearch/helpers/actions.py", line 75, in _chunk_actions cur_size += len(data.encode("utf-8")) + 1 UnicodeDecodeError: 'ascii' codec can't decode byte 0xc2 in position 58: ordinal not in range(128)
Thanks,
Hi,
I’m trying to integrate Elasticsearch with Scrapy. I’ve followed the steps from https://github.com/knockrentals/scrapy-elasticsearch,
but it’s not loading the pipeline. Im using Scrapy 1.3.3 with Elasticsearch 5.2.
Logging:
INFO: Enabled item pipelines: []
ITEM_PIPELINES = {
'scrapyelasticsearch.scrapyelasticsearch.ElasticSearchPipeline': 500
}
ELASTICSEARCH_SERVERS = ['http://172.17.0.2:9200']
ELASTICSEARCH_INDEX = 'scrapy'
#ELASTICSEARCH_INDEX_DATE_FORMAT = '%Y-%m'
ELASTICSEARCH_INDEX_DATE_FORMAT = '%A %d %B %Y'
ELASTICSEARCH_TYPE = 'items'
ELASTICSEARCH_UNIQ_KEY = 'url' # Custom uniqe key
—
Am I missing something? Do you need to define the Pipeline in pipelines.py?
The “dirbot” example didn’t.
Usage (Configure settings.py:)
ITEM_PIPELINES = [
'scrapyelasticsearch.scrapyelasticsearch.ElasticSearchPipeline',
]
seems to be deprecated in newer scrapy versions
now use
ITEM_PIPELINES = {
'scrapyelasticsearch.scrapyelasticsearch.ElasticSearchPipeline': 100,
}
(the number defines the order of the Pipelines, if you have more than one)
see: http://doc.scrapy.org/en/latest/topics/item-pipeline.html#activating-an-item-pipeline-component
The parameter ELASTICSEARCH_INDEX_DATE_FORMAT sets index suffix from scraping timestamp to the specified format (ie. -%Y%m%d). But I need to set it to a string or a datetime (of a given format) taken from the scraped item. Here is a simple solution with two more parameters (ELASTICSEARCH_INDEX_DATE_KEY and ELASTICSEARCH_INDEX_DATE_KEY_FORMAT): jenkin@e834082.
I would like to know how to debug my situation: I have a postgresql pipeline that's working flawlessly, adding 2k items to my relational database when I run scrapy. I've installed scrapy-elasticsearch as well to be able to use elasticsearch along my postgresql, but after scraping when I get into kibana I have... 36 items. My index is the day the item was scraped, and even selecting "years ago" in kibana interface I only get 36 hits.
How and where do I debug to check where are things going wrong?
Hi!
How could I define a mapping for each field?
I want all my index fields to be indexed with mapping "not_analyzed" so i can get exacte values in search results.
Thank you,
If I am using items for parsing scrapy responses, the unique key when retrieved from the item is a tuple, and process_unique_key() will raise an exception. This can easily be fixed by changing line 94 in scrapyelasticsearch.py
From
if isinstance(unique_key, list):
To
if isinstance(unique_key, (list,tuple)):
Elasticsearch has two default values for host and port: localhost and 9200. So the ELASTICSEARCH_SERVERS paramater is not really required, because you can assume the previous default values. See jenkin@e834082.
Thank you all for putting together this great tool. I was thrilled to find this.
I am currently getting an error as follows:
{"error":"Content-Type header [] is not supported","status":406}
According to this URL, elasticsearch-dump/elasticsearch-dump#350 , additional headers need to be passed for ElasticSearch 6.x. As follows:
-headers='{"Content-Type": "application/json"}
Could this be a new configuration added?
We have recently upgraded to elasticsearch 6.2.x which does not require a type, is there a way to remove the requirement for ELASTICSEARCH_TYPE in the code?
I'll work on this; pretty basic - expect pull request. If there's a way and I've missed it, let me know.
Is there any way to provide NTLM credentials to the elasticsearch server ?
After adding ElasticSearchPipeline to my ITEM_PIPELINES array I see this error:
Traceback (most recent call last):
File "/home/spl/Code/python_env/myenv/lib/python3.5/site-packages/twisted/internet/defer.py", line 587, in _runCallbacks
current.result = callback(current.result, _args, *_kw)
File "/home/spl/Code/python_env/myenv/lib/python3.5/site-packages/scrapyelasticsearch/scrapyelasticsearch.py", line 108, in process_item
if isinstance(item, types.GeneratorType) or isinstance(item, types.ListType):
AttributeError: module 'types' has no attribute 'ListType'
All involved packages are installed in the most recent versions.
Is it for adding _id value? I added some documents and it didn't seem like that.. If so, is there any way to add id value? I want to use scraped urls as id.
BulkIndexError: (u'1 document(s) failed to index.', [{u'create': {u'status': 400, u'_type': u'jd_comment_test', u'_index': u'jd_comment-2018-05-26', u'error': {u'reason': u'Field [_id] is defined twice in [jd_comment_test]', u'type': u'illegal_argument_exception'}
How can I solve this problem?
Hi,
Have the following issue when running the following spider before its added to ES.
The ES Key is set as "link".
Any help would be greatly appreciated.
import scrapy
import uuid
from compass.items import CompassItem
class DarkReadingSpider(scrapy.Spider):
name = "darkreading"
allowed_domains = ["darkreading.com"]
start_urls = (
'http://www.darkreading.com/rss_simple.asp',
)
def parse(self, response):
for sel in response.xpath('//item'):
item = CompassItem()
item['id'] = uuid.uuid4()
item['title'] = sel.xpath('title/text()').extract()
item['link'] = sel.xpath('link/text()').extract()
item['desc'] = sel.xpath('description/text()').extract()
print item
yield item
Output/Error:
{'desc': [u'Two-thirds of IT security professionals say that network security has become more difficult over the last two years with growing complexity in managing heterogeneous network environments.'],
'id': UUID('0112e36e-50ce-4660-9072-da2a5fec09e6'),
'link': [u'http://www.darkreading.com/cloud/survey-shows-cloud-infrastructure-security-a-major-challenge-/d/d-id/1324872?_mc=RSS_DR_EDT'],
'title': [u'Survey Shows Cloud Infrastructure Security A Major Challenge ']}
2016-04-01 15:15:34 [scrapy] ERROR: Error processing {'desc': [u'Two-thirds of IT security professionals say that network security has become more difficult over the last two years with growing complexity in managing heterogeneous network environments.'],
'id': UUID('0112e36e-50ce-4660-9072-da2a5fec09e6'),
'link': [u'http://www.darkreading.com/cloud/survey-shows-cloud-infrastructure-security-a-major-challenge-/d/d-id/1324872?_mc=RSS_DR_EDT'],
'title': [u'Survey Shows Cloud Infrastructure Security A Major Challenge ']}
Traceback (most recent call last):
File "/usr/local/lib64/python2.7/site-packages/twisted/internet/defer.py", line 588, in _runCallbacks
current.result = callback(current.result, _args, *_kw)
File "/usr/local/lib/python2.7/site-packages/scrapyelasticsearch/scrapyelasticsearch.py", line 70, in process_item
self.index_item(item)
File "/usr/local/lib/python2.7/site-packages/scrapyelasticsearch/scrapyelasticsearch.py", line 52, in index_item
local_id = hashlib.sha1(item[uniq_key]).hexdigest()
TypeError: sha1() argument 1 must be string or buffer, not list
I can't see how I can specify an ingest pipeline on the elasticsearch bulk request:
https://www.elastic.co/guide/en/elasticsearch/reference/5.2/ingest.html
Longer term fix would be to be able to pass these settings as variables from the scrapy settings.py
A shorter term fix would be letting everyone know in the documentation which methods can be overridden to generate this behaviour.
Hi,
Every time I save a text field to ES, the mapping has the following structure:
"text": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
Meaning I cannot search for any text that appears after 256 characters.
Is there any way of avoiding this? Thanks very much in advance!
It's the pip package requests_ntlm. I'm using Python 2.7.3 on Debian 7.11. Maybe you can import the ntlm module only if the ELASTICSEARCH_AUTH parameters has NTLM value?
I needed to do a :
pip install requests_ntlm
for this pipeline to work
Is it possible update item if this item with same id already exists in elastic instead of adding new one? I mean:
{
itemId: 1,
color: ['red', 'blue']
}
{
itemId: 1,
color: ['green']
}
result:
{
itemId: 1,
color: ['red', 'blue', 'green']
}
I use scrapy-redis, my spider is waiting for input from redis queue.
If i send less urls than the buffer_length they wont be ever pushed into elasticsearch.
Do you have any workarounds?
Hello, Im trying to insert data into Bonsai.io ES cloud and getting this error :
File "/usr/local/lib/python3.5/dist-packages/elasticsearch/client/init.py", line 1155, in bulk
headers={'content-type': 'application/x-ndjson'})
TypeError: perform_request() got an unexpected keyword argument 'headers'
How can I solve it?
Thanks
@jayzeng Thanks for ur effort. I have done exactly what u instructed. installed ScrapyElasticSearch 0.8.3.
But now i get this error:
.....
File "C:\Miniconda2\lib\site-packages\scrapy\utils\misc.py", line 44, in load_
object
mod = import_module(module)
File "C:\Miniconda2\lib\importlib__init__.py", line 37, in import_module
import(name)
File "C:\Miniconda2\lib\site-packages\scrapyelasticsearch\scrapyelasticsearch.
py", line 21, in
from .transportNTLM import TransportNTLM
File "C:\Miniconda2\lib\site-packages\scrapyelasticsearch\transportNTLM.py", l
ine 7, in
from requests_ntlm import HttpNtlmAuth
ImportError: No module named requests_ntlm
As the title says, Is there any inbuilt command in this plugin, that clears existing data in ES before populating it again ? Thanks
In settings you can set ELASTICSEARCH_SERVER only by using a simple string. Pyes supports multiple hosts in ES initialization and this string is passed wrapped in a single-element list (see. L50 in scrapyelasticsearch.py).
It could be useful to set ELASTICSEARCH_SERVER also by using a list of hosts, maybe checking if the setting is actually a string or a list, ie via isistance().
Suggest making item_id = hashlib.sha1(unique_key).hexdigest()
in def get_id
optional so users can set the elasticsearch _id without encoding.
Hi there,
This plugin works great with the latest version of Scrapy (1.3) and Elasticsearch (5.1.1) on Ubuntu 16. Great work, Thanks.
There is a little problem. I have already setup an 'index' and 'mappings' in Elasticsearch. How do I configure this plugin to insert data in that existing index rather than creating a new one?
I did these settings ... (where 'news' index and type 'allNews' already exists). The following settings create a new index called "news-2017-01" and insert all the data in that index. I don't want that. I want this plugin to populate an already existing index. How do I do that?
ELASTICSEARCH_SERVERS = ['localhost']
ELASTICSEARCH_INDEX = 'news'
ELASTICSEARCH_INDEX_DATE_FORMAT = '%Y-%m'
ELASTICSEARCH_TYPE = 'allNews'
#ELASTICSEARCH_UNIQ_KEY = 'url' # Custom uniqe key
Please help.
Thanks
Good work on the extension. Appreciate the help being provided to the community. Though I haven't used your extension personally in production but the extension I wrote is very similar to the code you have.
I wanted to bring your attention to issues I faced personally which may improve your extension. When there long running scrapers (we have had scrapers run for 20 hrs sometimes) it is possible that your machine will run out of memory if all the items are appended to the items_buffer
like so. My scrapers have failed to insert items after raising a MemoryError. The work around I use is to set a max_insert_limit_counter
in the extension class and bulk insert items into ES after the max limit is hit. Probably something who uses this extension might find run into this issue in the future.
If you would like me to create a PR for this, let me know.
Maybe it would be possible to ingest a mapping into elasticsearch before writing items? E.g.
ELASTICSEARCH_MAPPING = { ... some json ...}
or
ELASTICSEARCH_MAPPING = "/file/to/mapping.json"
It'd be a nice flow to have it be included at this stage.
Hi there, I am using ElasticSearch in AWS and there authentification is different.
I added my own pipeline for this, maybe you want to have a look:
https://github.com/philippbussche/scrapy-tooling/tree/master/src/elasticsearchAWSpipeline
Maybe we want to incorporate this into the official pipeline ?
Disclaimer: I am using kind of an old version of Scrapy so obviously I would have to change things.
Working in a pipeline, every item is indexed separately with many requests to ES (one per item). In addition, in some cases you want to break the pipeline concept, applying a global transformation to items before indexing (ie by overloading open_spider and close_spider methods in a pipeline class).
Using ES bulk api you can temporarily add items to an item buffer (with a length controlled by a setting) and then index them sometimes and not for every single item.
Hi, I need to tweak this file "scrapyelasticsearch.py". Not sure where its located. My OS is Ubuntu 16.04. Can anyone please help ?
Thanks
We have a elastic stood up using a internal CA. This plugin does not like that. Is there a way where we can pass in the cert, or have it ignore it?
I had setup scrapy on my local machine with CrawlSpider to index a local static html site. So far so good, I get a valid json file as output.
Next I installed ScrapyElasticSearch (currently, configured settings.py with correct ITEM_PIPELINES and ran scrapy crawl on my site.
If I look at the logs, I get this:
2017-08-01 13:25:56 [root] DEBUG: Generated unique key bbd9eba5e56d510757eb42eed3b130520b7b1958
2017-08-01 13:25:56 [root] DEBUG: Item sent to Elastic Search scrapy
But when I look at my Elasticsearch server, no data has been entered. Even worse, I can just shutdown my Elasticsearch engine and the log entry will still say the same. So no error message is thrown.
I've also tested this in a clean vagrant machine with virtualenv enabled, but the problem is the same. I tried logging network traffic with tcpdump, not a single byte is passed. I have no clue what I did wrong, other than that something is broken.
Below is my pip list:
argparse (1.2.1)
asn1crypto (0.22.0)
attrs (17.2.0)
Automat (0.6.0)
cffi (1.10.0)
constantly (15.1.0)
cryptography (2.0.2)
cssselect (1.0.1)
elasticsearch (5.4.0)
enum34 (1.1.6)
hyperlink (17.3.0)
idna (2.5)
incremental (17.5.0)
ipaddress (1.0.18)
lxml (3.8.0)
parsel (1.2.0)
pip (1.5.6)
pyasn1 (0.3.1)
pyasn1-modules (0.0.10)
pycparser (2.18)
PyDispatcher (2.0.5)
pyOpenSSL (17.2.0)
queuelib (1.4.2)
Scrapy (1.4.0)
ScrapyElasticSearch (0.8.9)
service-identity (17.0.0)
setuptools (5.5.1)
six (1.10.0)
Twisted (17.5.0)
urllib3 (1.22)
w3lib (1.17.0)
wsgiref (0.1.2)
zope.interface (4.4.2)
My relevant settings.py entries:
ITEM_PIPELINES = {
'scrapyelasticsearch.scrapyelasticsearch.ElasticSearchPipeline': 100
}
ELASTICSEARCH_SERVERS = ['http://127.0.0.1:9200']
ELASTICSEARCH_INDEX = 'scrapy'
ELASTICSEARCH_INDEX_DATE_FORMAT = '%Y-%m'
ELASTICSEARCH_TYPE = 'items'
ELASTICSEARCH_UNIQ_KEY = 'url'
Thank you for your help
I am also storing the raw html along with the items, but do not want to send that to ES index. Can we specify the specific fields which should be send to ES for indexeing
Example: I'm crawling an API with JSON documents. From time to time some documents get removed from the source database and are missing in the API. How to handle this with scrapy-elasticsearch to keep es up to date?
From the source code, I can see that there's no _op_type
parameter specified in the bulk
call so it probably resorts to default 'index'.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.