from pykafka import KafkaClient
import json
import logging as log
log.basicConfig(level=log.DEBUG)
kafka_client = KafkaClient(hosts='xx:9092,xx:9092,xx:9092')
kafka_msg_topic = kafka_client.topics['product_incoming']
producer = kafka_msg_topic.get_sync_producer()
producer.produce(json.dumps({'tbl': 'product', 'ids': [239981], 'action': 'u', 'data': {'raw_price': 100, 'zk_price': 90}}))
----------------- 以下时报错日志 -----------------
INFO:pykafka.cluster:Requesting API version information
DEBUG:pykafka.connection:Connecting to 172.17.112.191:9092
DEBUG:pykafka.connection:Successfully connected to 172.17.112.191:9092
INFO:pykafka.handlers:RequestHandler.stop: about to flush requests queue
INFO:pykafka.cluster:Got api version info: {0: ApiVersionsSpec(key=0, min=0, max=2), 1: ApiVersionsSpec(key=1, min=0, max=2), 2: ApiVersionsSpec(key=2, min=0, max=0), 3: ApiVersionsSpec(key=3, min=0, max=1), 4: ApiVersionsSpec(key=4, min=0, max=0), 5: ApiVersionsSpec(key=5, min=0, max=0), 6: ApiVersionsSpec(key=6, min=0, max=2), 7: ApiVersionsSpec(key=7, min=1, max=1), 8: ApiVersionsSpec(key=8, min=0, max=2), 9: ApiVersionsSpec(key=9, min=0, max=1), 10: ApiVersionsSpec(key=10, min=0, max=0), 11: ApiVersionsSpec(key=11, min=0, max=0), 12: ApiVersionsSpec(key=12, min=0, max=0), 13: ApiVersionsSpec(key=13, min=0, max=0), 14: ApiVersionsSpec(key=14, min=0, max=0), 15: ApiVersionsSpec(key=15, min=0, max=0), 16: ApiVersionsSpec(key=16, min=0, max=0), 17: ApiVersionsSpec(key=17, min=0, max=0), 18: ApiVersionsSpec(key=18, min=0, max=0), 19: ApiVersionsSpec(key=19, min=0, max=0)}
DEBUG:pykafka.cluster:Updating cluster, attempt 1/3
DEBUG:pykafka.connection:Connecting to 172.17.112.191:9092
DEBUG:pykafka.connection:Successfully connected to 172.17.112.191:9092
INFO:pykafka.handlers:RequestHandler.stop: about to flush requests queue
INFO:pykafka.cluster:Discovered 3 brokers
DEBUG:pykafka.cluster:Discovered broker id 101: 172.17.112.191:9092
DEBUG:pykafka.connection:Connecting to 172.17.112.191:9092
DEBUG:pykafka.connection:Successfully connected to 172.17.112.191:9092
DEBUG:pykafka.cluster:Discovered broker id 102: 172.17.112.192:9092
DEBUG:pykafka.connection:Connecting to 172.17.112.192:9092
DEBUG:pykafka.connection:Successfully connected to 172.17.112.192:9092
DEBUG:pykafka.cluster:Discovered broker id 103: 172.17.112.193:9092
DEBUG:pykafka.connection:Connecting to 172.17.112.193:9092
DEBUG:pykafka.connection:Successfully connected to 172.17.112.193:9092
INFO:pykafka.cluster:Discovered 2 topics
DEBUG:pykafka.cluster:Discovered topic 'product_incoming'
DEBUG:pykafka.connection:Connecting to 172.17.112.191:9092
DEBUG:pykafka.connection:Successfully connected to 172.17.112.191:9092
INFO:pykafka.handlers:RequestHandler.stop: about to flush requests queue
INFO:pykafka.topic:Adding 24 partitions
DEBUG:pykafka.topic:Adding partition product_incoming/0
DEBUG:pykafka.topic:Adding partition product_incoming/1
DEBUG:pykafka.topic:Adding partition product_incoming/2
DEBUG:pykafka.topic:Adding partition product_incoming/3
DEBUG:pykafka.topic:Adding partition product_incoming/4
DEBUG:pykafka.topic:Adding partition product_incoming/5
DEBUG:pykafka.topic:Adding partition product_incoming/6
DEBUG:pykafka.topic:Adding partition product_incoming/7
DEBUG:pykafka.topic:Adding partition product_incoming/8
DEBUG:pykafka.topic:Adding partition product_incoming/9
DEBUG:pykafka.topic:Adding partition product_incoming/10
DEBUG:pykafka.topic:Adding partition product_incoming/11
DEBUG:pykafka.topic:Adding partition product_incoming/12
DEBUG:pykafka.topic:Adding partition product_incoming/13
DEBUG:pykafka.topic:Adding partition product_incoming/14
DEBUG:pykafka.topic:Adding partition product_incoming/15
DEBUG:pykafka.topic:Adding partition product_incoming/16
DEBUG:pykafka.topic:Adding partition product_incoming/17
DEBUG:pykafka.topic:Adding partition product_incoming/18
DEBUG:pykafka.topic:Adding partition product_incoming/19
DEBUG:pykafka.topic:Adding partition product_incoming/20
DEBUG:pykafka.topic:Adding partition product_incoming/21
DEBUG:pykafka.topic:Adding partition product_incoming/22
DEBUG:pykafka.topic:Adding partition product_incoming/23
<pykafka.topic.Topic at 0x2593c90 (name=product_incoming)>
INFO:pykafka.producer:Starting new produce worker for broker 101
INFO:pykafka.producer:Starting new produce worker for broker 102
INFO:pykafka.producer:Starting new produce worker for broker 103
DEBUG:pykafka.producer:Sending 1 messages to broker 101
WARNING:pykafka.producer:Produce request for product_incoming/1 to 172.17.112.191:9092 failed with error code -1.
DEBUG:pykafka.producer:Successfully sent 0/1 messages to broker 101
DEBUG:pykafka.producer:Sending 1 messages to broker 101
WARNING:pykafka.producer:Produce request for product_incoming/1 to 172.17.112.191:9092 failed with error code -1.
DEBUG:pykafka.producer:Successfully sent 0/1 messages to broker 101
DEBUG:pykafka.producer:Sending 1 messages to broker 101
WARNING:pykafka.producer:Produce request for product_incoming/1 to 172.17.112.191:9092 failed with error code -1.
DEBUG:pykafka.producer:Successfully sent 0/1 messages to broker 101
DEBUG:pykafka.producer:Sending 1 messages to broker 101
WARNING:pykafka.producer:Produce request for product_incoming/1 to 172.17.112.191:9092 failed with error code -1.
DEBUG:pykafka.producer:Successfully sent 0/1 messages to broker 101
ERROR:pykafka.producer:Message not delivered!! UnknownError('Produce request for product_incoming/1 to 172.17.112.191:9092 failed with error code -1.',)
Traceback (most recent call last):
File "pykafka_test.py", line 13, in
producer.produce(json.dumps({'tbl': 'product', 'ids': [239981], 'action': 'u', 'data': {'raw_price': 100, 'zk_price': 90}}))
File "/usr/lib64/python2.7/site-packages/pykafka/producer.py", line 354, in produce
raise exc
pykafka.exceptions.UnknownError: Produce request for product_incoming/1 to 172.17.112.191:9092 failed with error code -1.
DEBUG:pykafka.producer:Finalising <pykafka.producer.Producer at 0x259a310>
INFO:pykafka.producer:Blocking until all messages are sent
INFO:pykafka.handlers:RequestHandler worker: exiting cleanly
INFO:pykafka.handlers:RequestHandler worker: exiting cleanly
INFO:pykafka.handlers:RequestHandler worker: exiting cleanly
INFO:pykafka.producer:Worker exited for broker 172.17.112.192:9092
INFO:pykafka.producer:Worker exited for broker 172.17.112.193:9092
INFO:pykafka.producer:Worker exited for broker 172.17.112.191:9092
INFO:pykafka.producer:Blocking until all messages are sent
INFO:pykafka.handlers:RequestHandler.stop: about to flush requests queue
INFO:pykafka.handlers:RequestHandler.stop: about to flush requests queue
INFO:pykafka.handlers:RequestHandler.stop: about to flush requests queue