Code Monkey home page Code Monkey logo

scrapy-rabbitmq-scheduler's Introduction

Scrapy分布式RabbitMQ调度器

安装

使用pip安装

pip install scrapy-rabbitmq-scheduler

或者克隆这个项目并且执行setup.py安装

python setup.py install

使用

第一步: 在你的项目中的settings.py中添加配置项

# 指定项目的调度器
SCHEDULER = "scrapy_rabbitmq_scheduler.scheduler.SaaS"

# 指定rabbitmq的连接DSN
RABBITMQ_CONNECTION_PARAMETERS = 'amqp://guest:guest@localhost:5672/'

# 指定重试的http状态码(重新加回队列重试)
SCHEDULER_REQUEUE_ON_STATUS = [500]

# 指定下载器中间件, 确认任务是否成功
DOWNLOADER_MIDDLEWARES = {
    'scrapy_rabbitmq_scheduler.middleware.RabbitMQMiddleware': 999
}
# 指定item处理方式, item会加入到rabbitmq中
ITEM_PIPELINES = {
    'scrapy_rabbitmq_scheduler.pipelines.RabbitmqPipeline': 300,
}

第二步: 修改Spider的继承类

import scrapy
from scrapy_rabbitmq_scheduler.spiders import RabbitSpider

class CustomSpider(RabbitSpider):
    name = 'custom_spider'    
    queue_name = 'test_urls' # 指定任务队列的名称
    items_key = 'test_item' # 指定item队列名称

    def parse(self, response):
        item = ... # parse item
        yield item

第三步: 将任务写入到RabbitMQ队列

#!/usr/bin/env python
import pika
import settings

connection = pika.BlockingConnection(pika.URLParameters(settings.RABBITMQ_CONNECTION_PARAMETERS))
channel = connection.channel()

queue_key = 'test_urls'

# 读取文件中的链接并写入到队列中
with open('urls.txt') as f:
    for url in f:
        url = url.strip(' \n\r')
        channel.basic_publish(exchange='',
                        routing_key=queue_key,
                        body=url,
                        properties=pika.BasicProperties(
                            content_type='text/plain',
                            delivery_mode=2
                        ))

connection.close()

urls.txt

http://www.baidu.com

高级特色

1. 支持消息优先级

  1. 消息优先级的范围为0~255, 数字越大, 优先级越高
yield scrapy.Request(url, priority=优先级)

则可以直接指定消息的优先级

2. 队列持久化

# settings.py
RABBITMQ_DURABLE = True # 是否持久化队列, True为持久化 False为非持久化, 默认True

3. 消息确认

# settings.py
RABBITMQ_CONFIRM_DELIVERY = True # 消息是否需要确认, True为需要, False为不需要, 默认是True

4. 增加消息延时

scrapy-rabbitmq-scheduler的消息延时是使用rabbitmq-delayed-message-exchange插件实现的, 所以在使用之前需要先安装以及开启这个插件 rabbitmq-delayed-message-exchange: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

在spider中开启延时队列

# -*- coding: utf-8 -*-
import scrapy
from scrapy_rabbitmq_scheduler.spiders import RabbitSpider
from example.items import ArticleItem


class CcidcomSpider(RabbitSpider):
    ....
    # 队列名称
    queue_name = 'ccidcom'
    # 是否是延迟队列
    is_delay_queue = True
    ...

is_delay_queue设置为True,则自动会开启延时

使用延时

yield scrapy.Request('http://www.ccidcom.com/', callback=self.parse, meta={'_delay_time': 10000})

在meta中增加_delay_time, 指定延时毫秒数, 则自动生效

TODO

  • 支持延时请求
  • 增加任务持久化配置

scrapy-rabbitmq-scheduler's People

Contributors

aox-lei avatar dependabot[bot] avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar

scrapy-rabbitmq-scheduler's Issues

pika相关

1.connection.py文件中队列优先级是否可改成参数化配置;

2.pipline中的队列发布时会出现channel关闭的问题;

Channel is closed

Traceback (most recent call last):
File "c:\users\administrator\appdata\local\programs\python\python38\lib\site-packages\twisted\internet\defer.py", line 654, in _runCallbacks
current.result = callback(current.result, *args, **kw)
File "c:\users\administrator\appdata\local\programs\python\python38\lib\site-packages\scrapy\utils\defer.py", line 154, in f
return deferred_from_coro(coro_f(*coro_args, **coro_kwargs))
File "c:\users\administrator\appdata\local\programs\python\python38\lib\site-packages\scrapy_rabbitmq_scheduler\pipelines.py", line 31, in process_item
self.channel.basic_publish(exchange='',
File "c:\users\administrator\appdata\local\programs\python\python38\lib\site-packages\pika\adapters\blocking_connection.py", line 2205, in basic_publish
self._impl.basic_publish(
File "c:\users\administrator\appdata\local\programs\python\python38\lib\site-packages\pika\channel.py", line 421, in basic_publish
self._raise_if_not_open()
File "c:\users\administrator\appdata\local\programs\python\python38\lib\site-packages\pika\channel.py", line 1389, in _raise_if_not_open
raise exceptions.ChannelWrongStateError('Channel is closed.')
pika.exceptions.ChannelWrongStateError: Channel is closed.

我在获取 RabbitMQ 的数据之后,进行抓取网页数据,然后向另一个Queue推送消息,刚开始是没有问题的,
程序一直处于运行状态,间隔20分钟~30分钟之后,再向Queue推送消息的时候,会出现 Channel is closed 的问题,还有下一个问题:

[pika.adapters.utils.io_services_utils] INFO: _AsyncTransportBase._initate_abort(): Initiating abrupt asynchronous transport shutdown: state=1; error=ConnectionResetError(10054, '远程主机强迫关闭了一个现有的连接。', None, 10054, None); <socket.socket fd=492, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('10.0.8.200', 50751), raddr=('10.0.253.201', 5672)>

RABBITMQ_CONFIRM_DELIVERY 参数问题

settings.py

RABBITMQ_CONFIRM_DELIVERY = True # 消息是否需要确认, True为需要, False为不需要, 默认是True

这个参数为False的时候会自动提交确认, 为True时反而不会确认了???

虽然 RabbitMQMiddleware 中有调用, 但是好像没生效
SaaS中 self.quue虽然有对象, 但是返回的布尔值是False

采集过程中如何重新返回队列

请问一下在采集过程中因为某些字段没有采集到,应该返回什么样的结果使其重新加入队列
列如

指定重试的http状态码(重新加回队列重试)

SCHEDULER_REQUEUE_ON_STATUS = [500]

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.