aio-libs / aiokafka Goto Github PK
View Code? Open in Web Editor NEWasyncio client for kafka
Home Page: http://aiokafka.readthedocs.io/
License: Apache License 2.0
asyncio client for kafka
Home Page: http://aiokafka.readthedocs.io/
License: Apache License 2.0
Currently commiting is a bit complicated cause we need to increment the offset. Instead of writing something like: consumer.commit({tp: msg.offset+1})
it would make sense to add another, more human API. For example:
consumer.commit(msg1, msg2) # messages are from different partitions.
We can see that it's a message and properly handle offset increment.
Running into an issue while running tests on OSX, it always fails with:
@pytest.fixture(scope='session')
def docker_ip_address(docker):
"""Returns IP address of the docker daemon service."""
# Fallback docker daemon bridge name
ifname = 'docker0'
try:
for network in docker.networks():
_ifname = network['Options'].get(
'com.docker.network.bridge.name')
if _ifname is not None:
ifname = _ifname
break
except libdocker.errors.InvalidVersion:
pass
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
E OSError: [Errno 6] Device not configured
The code that causes this is:
fcntl.ioctl(
s.fileno(),
0x8915, # SIOCGIFADDR
struct.pack('256s', ifname[:15].encode('utf-8')))
from what I can tell the struct in my case is:
b'docker0\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x0
0\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x
00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\
x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00
\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
and fileno is 11
. (Not sure if this helps to debug this)
I am able to run tests when changing the method to just return
127.0.0.1
using native docker for osx
Hi,
I've encountered strange behavior. When I create an AIOKafkaConsumer
and I do not subscribe any topic it fails during next hearbeat on following error:
Traceback (most recent call last):
File "/home/artimi/.PyCharmCE2016.3/config/scratches/scratch_4.py", line 32, in <module>
main()
File "/home/artimi/.PyCharmCE2016.3/config/scratches/scratch_4.py", line 18, in main
loop.run_until_complete(subscribe_later(consumer))
File "/usr/lib/python3.5/asyncio/base_events.py", line 387, in run_until_complete
return future.result()
File "/usr/lib/python3.5/asyncio/futures.py", line 274, in result
raise self._exception
File "/usr/lib/python3.5/asyncio/tasks.py", line 239, in _step
result = coro.send(None)
File "/home/artimi/.PyCharmCE2016.3/config/scratches/scratch_4.py", line 28, in subscribe_later
await consumer.stop()
File "/home/artimi/.virtualenvs/messaging/lib/python3.5/site-packages/aiokafka/consumer.py", line 272, in stop
yield from self._coordinator.close()
File "/home/artimi/.virtualenvs/messaging/lib/python3.5/site-packages/aiokafka/group_coordinator.py", line 134, in close
yield from self.heartbeat_task
File "/usr/lib/python3.5/asyncio/futures.py", line 363, in __iter__
return self.result() # May raise too.
File "/usr/lib/python3.5/asyncio/futures.py", line 274, in result
raise self._exception
File "/usr/lib/python3.5/asyncio/tasks.py", line 239, in _step
result = coro.send(None)
File "/home/artimi/.virtualenvs/messaging/lib/python3.5/site-packages/aiokafka/group_coordinator.py", line 806, in _heartbeat_task_routine
yield from self._send_req(self.coordinator_id, request)
File "/home/artimi/.virtualenvs/messaging/lib/python3.5/site-packages/aiokafka/group_coordinator.py", line 159, in _send_req
resp = yield from self._client.send(node_id, request)
File "/home/artimi/.virtualenvs/messaging/lib/python3.5/site-packages/aiokafka/client.py", line 309, in send
if not (yield from self.ready(node_id)):
File "/home/artimi/.virtualenvs/messaging/lib/python3.5/site-packages/aiokafka/client.py", line 287, in ready
conn = yield from self._get_conn(node_id)
File "/home/artimi/.virtualenvs/messaging/lib/python3.5/site-packages/aiokafka/client.py", line 268, in _get_conn
assert broker, 'Broker id %s not in current metadata' % node_id
AssertionError: Broker id None not in current metadata
You can try it with following piece of code:
import asyncio
import aiokafka
import uuid
HEARTBEAT_INTERVAL = 1.0
def main():
loop = asyncio.get_event_loop()
consumer = aiokafka.AIOKafkaConsumer(
loop = loop,
bootstrap_servers = 'localhost:9092',
group_id = uuid.uuid4().hex,
enable_auto_commit = False,
auto_offset_reset = 'earliest',
heartbeat_interval_ms = HEARTBEAT_INTERVAL * 1000,
metadata_max_age_ms = 30000,
)
loop.run_until_complete(subscribe_later(consumer))
async def subscribe_later(consumer: aiokafka.AIOKafkaConsumer):
await consumer.start()
await asyncio.sleep(1.1 * HEARTBEAT_INTERVAL)
await consumer.stop()
if __name__ == '__main__':
main()
This happens even if I subscribe and then unsubscribe from some topic before hearbeat is performed.
I tried this on aiokafka 0.1.2 and 0.1.4 with kafka docker container quantlane/kafka:0.9.
We should use AbstractEventLoop.create_future()
in order to create futures bounded to the event loop, since third party event loop may provide custom asyncio.Future
implementation. uvloop
has its own implementation in C.
There are a few broken tests on this version
@classmethod
def encode(cls, items):
# RecordAccumulator encodes messagesets internally
if isinstance(items, io.BytesIO):
size = Int32.decode(items)
# rewind and return all the bytes
items.seek(-4, 1)
return items.read(size + 4)
encoded_values = []
> for (offset, message) in items:
E ValueError: too many values to unpack (expected 2)
> self.assertEqual(sorted(brokers), sorted(list(c_brokers)))
E AssertionError: Lists differ: [(0, 'broker_1', 4567), (1, 'broker_2', 5678)] != [BrokerMetadata(nodeId=0, host='broker_1',[83 chars]one)]
E
E First differing element 0:
E (0, 'broker_1', 4567)
E BrokerMetadata(nodeId=0, host='broker_1', port=4567, rack=None)
E
E - [(0, 'broker_1', 4567), (1, 'broker_2', 5678)]
E + [BrokerMetadata(nodeId=0, host='broker_1', port=4567, rack=None),
E + BrokerMetadata(nodeId=1, host='broker_2', port=5678, rack=None)]
As mentioned in KIP-62 Kafka will (as of 0.10.1.0) have an ability to send HeartBeats from a Background thread.
aiokafka
has done this from the start by design, as long as the event loop itself is spinning, but this KIP also proposes to limit the processing time by a different parameter max.poll.interval.ms
. I think it is good to adapt the same idea for aiokafka
so we assure message consumption progress on the client itself.
When I try to write to a topic that doesn't yet exist, both vanilla kafka-python and AIOKafkaProducer.send() silently create the topic before writing to it, which is how it should be.
However, when I try to do the same in a call to send_and_wait, I get an error, see below:
AIOKafkaProducer
async def produce(loop):
# Just adds message to sending queue
future = await producer.send(topic+'1', b'some_message_bytes')
# waiting for message to be delivered
resp = await future
print("Message produced: partition {}; offset {}".format(
resp.partition, resp.offset))
# Also can use a helper to send and wait in 1 call
resp = await producer.send_and_wait(
topic + '2', key=b'foo', value=b'bar')
#resp = yield from producer.send_and_wait(
# 'foobar', b'message for partition 1', partition=1)
loop = asyncio.get_event_loop()
producer = AIOKafkaProducer(loop=loop, bootstrap_servers=kafka_host, api_version="0.9")
# Bootstrap client, will get initial cluster metadata
loop.run_until_complete(producer.start())
loop.run_until_complete(produce(loop))
# Wait for all pending messages to be delivered or expire
loop.run_until_complete(producer.stop())
#loop.close()
Running that produces
Message produced: partition 0; offset 3
---------------------------------------------------------------------------
UnknownTopicOrPartitionError Traceback (most recent call last)
<ipython-input-7-d00f93f5f3ce> in <module>()
16 # Bootstrap client, will get initial cluster metadata
17 loop.run_until_complete(producer.start())
---> 18 loop.run_until_complete(produce(loop))
19 # Wait for all pending messages to be delivered or expire
20 loop.run_until_complete(producer.stop())
C:\Users\Egor\Anaconda2\envs\py3k\lib\asyncio\base_events.py in run_until_complete(self, future)
385 raise RuntimeError('Event loop stopped before Future completed.')
386
--> 387 return future.result()
388
389 def stop(self):
C:\Users\Egor\Anaconda2\envs\py3k\lib\asyncio\futures.py in result(self)
272 self._tb_logger = None
273 if self._exception is not None:
--> 274 raise self._exception
275 return self._result
276
C:\Users\Egor\Anaconda2\envs\py3k\lib\asyncio\tasks.py in _step(***failed resolving arguments***)
237 # We use the `send` method directly, because coroutines
238 # don't have `__iter__` and `__next__` methods.
--> 239 result = coro.send(None)
240 else:
241 result = coro.throw(exc)
<ipython-input-7-d00f93f5f3ce> in produce(loop)
8 # Also can use a helper to send and wait in 1 call
9 resp = await producer.send_and_wait(
---> 10 topic + '2', key=b'foo', value=b'bar')
11 #resp = yield from producer.send_and_wait(
12 # 'foobar', b'message for partition 1', partition=1)
C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\aiokafka\producer.py in send_and_wait(self, topic, value, key, partition)
302 def send_and_wait(self, topic, value=None, key=None, partition=None):
303 """Publish a message to a topic and wait the result"""
--> 304 future = yield from self.send(topic, value, key, partition)
305 return (yield from future)
306
C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\aiokafka\producer.py in send(self, topic, value, key, partition)
286
287 # first make sure the metadata for the topic is available
--> 288 yield from self._wait_on_metadata(topic)
289
290 key_bytes, value_bytes = self._serialize(topic, key, value)
C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\aiokafka\producer.py in _wait_on_metadata(self, topic)
238 yield from self.client.force_metadata_update()
239 if topic not in self.client.cluster.topics():
--> 240 raise UnknownTopicOrPartitionError()
241
242 return self._metadata.partitions_for_topic(topic)
UnknownTopicOrPartitionError: [Error 3] UnknownTopicOrPartitionError:
Java's API docs contain some useful examples to work with the consumer (https://kafka.apache.org/0101/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html). As aiokafka
does some a bit different it would be useful to place a counter-example for each in our docs
Old compacted topics may have holes in offsets, ie. broker can return offsets larger than position. As I remember we don't handle such a case at all, so needs to be rechecked
Kafka-python added it with 1.1.1. After migrating we should add it too.
Update tests and docs
Hi,
we've encountered again the problem with stale connection that does not reconnect. First I get some an error in AIOKafkaClient.send()
from AIOKafkaConnection.send()
method.
Got error produce response: ConnectionError: Connection at kafka.example.com closed
This may happen and we should be able to recover. However, we do not reconnect and in some time after this we start to get exceptions from MessageAccumulator
that is full and not drained:
Traceback (most recent call last):
File "/home/artimi/env/lib/python3.5/site-packages/messaging_client/mqclient/kafka_producer.py", line 72, in _run
await self._producer.send(topic, payload)\
File "/home/artimi/env/lib/python3.5/site-packages/aiokafka/producer.py", line 279, in send
tp, key_bytes, value_bytes, self._request_timeout_ms / 1000)
File "/home/artimi/env/lib/python3.5/site-packages/aiokafka/message_accumulator.py", line 208, in add_message
raise KafkaTimeoutError()
kafka.errors.KafkaTimeoutError: KafkaTimeoutError
I was thinking that we could do the same thing that was done in #149: when there is any error from AIOKafkaConnection.send()
close connection which is then reconnected. Now it would wrap whole method AIOKafkaConnection.send()
and if any exception was raised from this function we would close the connection. This approach would solve this for every usage of AIOKafkaConnection.send()
in the code (I found 2 in Fetcher
, 1 in GroupCoordinator
and 1 in AIOKafkaClient
). However, I'm not sure if there could some case that in infinite loop reopens connection because of some recurrent error.
What do you think about that? I'm willing to implement this on my own if you think it is a good idea.
aiokafka version: 0.2.2
kafka version: 0.9.0
There is recursion in Fetcher.next_record()
that unhlandled may lead to RecursionError
eventually. In our environment we are using single AIOKafkaConsumer
for consuming multiple topics. But we want to get message only from desired topics, that's why we've used AIOKafkaConsumer.getone(*partitions)
with partitions
that consists of all TopicPartitions
that corresponds to topic we want to consume in given coroutine. Using AIOKafkaConsumer.getone()
calls Fetcher.next_record()
where I encountered said problem. We may consume multiple partitions in Fetcher
but it is synchronized by only one Fetcher._wait_empty_future
(that serves to synchronization on new data). Here follows next_record()
method:
@asyncio.coroutine
def next_record(self, partitions):
for tp in list(self._records.keys()):
if partitions and tp not in partitions:
continue
res_or_error = self._records[tp]
if type(res_or_error) == FetchResult:
message = res_or_error.getone()
if message is None:
# We already processed all messages, request new ones
del self._records[tp]
self._notify(self._wait_consume_future)
else:
return message
else:
# Remove error, so we can fetch on partition again
del self._records[tp]
self._notify(self._wait_consume_future)
res_or_error.check_raise()
# No messages ready. Wait for some to arrive
if self._wait_empty_future is None or self._wait_empty_future.done():
self._wait_empty_future = asyncio.Future(loop=self._loop)
yield from asyncio.shield(self._wait_empty_future, loop=self._loop)
return (yield from self.next_record(partitions))
Imagine that this coroutine is awaited multiple times with partitions corresponding to each topic we have. Each of this coroutines awaits at one line before end on asyncio.shield
. When one, e.g. first, topic receives data it sets self._wait_empty_future
and all of those waiting coroutines recurses to the method next_record
and again await on the same line until that first topic receives data. When this happens 1000 times (standard recursion limit) and no message comes for other topics all those coroutines fails with RecursionError
because they get down one level each time the first topic receives data.
One thing that can solve this bug is removing recursion from Fetcher.next_record()
. Maybe it would be even better to synchronize that coroutines per partition and not all by one _wait_empty_future
but that would be maybe to complex and expensive.
Hi,
I was digging in our messaging system and I noticed that consumer.commit()
takes quite a lot of time. In my informal benchmarking I saw that it takes about 100 ms but I also saw 500 ms delay. I tried to compare it with kafka-python
on the same topic at the same time and it took about 1 ms. Do you have any idea why it can take so long? Does commit in aiokafka
do more than commit in kafka-python
? We want to call commit after every message and this is really damaging performance. I would like to confirm every message and not just use auto commit.
While researching this I discovered that Java client and also kafka-python has so called ConsumerCoordinator.commit_offsets_async()
https://github.com/dpkp/kafka-python/blob/master/kafka/coordinator/consumer.py#L334 . As I understand it is fire-and-forget commit that does not wait for the response from kafka server. This might help our perfomance. Why it is not present in aiokafka
?
While working on #88 I noticed a strange bug. We monitor metadata change in Coordinator and if metadata changes we request a rejoin. What we don't do thou is clear metadata snapshot (_partitions_per_topic
) on rebalance. This can lead to following situation:
topic1
. _partitions_per_topic = {'topic1': {0, 1}
. 1 Rebalancetopic2
. Metadata is updated before rebalance is completed and so _partitions_per_topic = {'topic1': {0, 1}, 'topic2': {0, 1}}
. This is caused by self._group_subscription in Subscription not replacing, but updating set._partitions_per_topic = {'topic2': {0, 1}}
, triggering an unneeded rebalance.Seems like KIP-35 has landed, if dpkp/kafka-python#678 will be released we can fetch that and support v10 Brokers already using ApiVersionRequest instead of strange version discovery.
During testing our code we've tweaked some parameters of AIOKafkaConsumer
. One of them is heartbeat_interval_ms
that we set to 100 ms. After this we run a test that was waiting 100 ms between subscribing to more topics. Strange thing occurred sometimes (at first 1 of 20 cases, then 1 of 200 cases) that when I subscribed and unsubscribed a topic too fast kafka/aiokafka
raised this exception:
raise self._exception
/usr/lib/python3.5/asyncio/tasks.py:239: in _step
result = coro.send(None)
messaging_client/mqclient/kafka_consumer.py:227: in stop
await self._consumer.stop()
../../.virtualenvs/messaging/lib/python3.5/site-packages/aiokafka/consumer.py:279: in stop
yield from self._coordinator.close()
../../.virtualenvs/messaging/lib/python3.5/site-packages/aiokafka/group_coordinator.py:132: in close
yield from self.heartbeat_task
/usr/lib/python3.5/asyncio/futures.py:363: in __iter__
return self.result() # May raise too.
/usr/lib/python3.5/asyncio/futures.py:274: in result
raise self._exception
/usr/lib/python3.5/asyncio/tasks.py:239: in _step
result = coro.send(None)
../../.virtualenvs/messaging/lib/python3.5/site-packages/aiokafka/group_coordinator.py:789: in _heartbeat_task_routine
yield from self.ensure_active_group()
../../.virtualenvs/messaging/lib/python3.5/site-packages/aiokafka/group_coordinator.py:593: in ensure_active_group
yield from self._perform_group_join()
../../.virtualenvs/messaging/lib/python3.5/site-packages/aiokafka/group_coordinator.py:677: in _perform_group_join
self.protocol, member_assignment_bytes)
../../.virtualenvs/messaging/lib/python3.5/site-packages/aiokafka/group_coordinator.py:266: in _on_join_complete
self._subscription.assign_from_subscribed(assignment.partitions())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <kafka.consumer.subscription_state.SubscriptionState object at 0x7f62fcb32240>, assignments = [TopicPartition(topic='TestSchema.c6d8cc1a2fb242b38bee95a4a189e1fa.1', partition=0)]
def assign_from_subscribed(self, assignments):
"""Update the assignment to the specified partitions
This method is called by the coordinator to dynamically assign
partitions based on the consumer's topic subscription. This is different
from assign_from_user() which directly sets the assignment from a
user-supplied TopicPartition list.
Arguments:
assignments (list of TopicPartition): partitions to assign to this
consumer instance.
"""
if self.subscription is None:
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
for tp in assignments:
if tp.topic not in self.subscription:
> raise ValueError("Assigned partition %s for non-subscribed topic." % str(tp))
E ValueError: Assigned partition TopicPartition(topic='TestSchema.c6d8cc1a2fb242b38bee95a4a189e1fa.1', partition=0) for non-subscribed topic.
I tried to capture some logs and will show important part of it:
[1480672858.4598098][ubuntu][/home/artimi/.virtualenvs/messaging/bin/pytest][aiokafka.group_coordinator][DEBUG] Received successful heartbeat response.
[1480672858.5570168][ubuntu][/home/artimi/.virtualenvs/messaging/bin/pytest][kafka.consumer.subscription_state][INFO] Updating subscribed topics to: {'TestSchema.aaa34fbd914a4d90ad20335be9061c77.1', 'TestSchema.aaa34fbd914a4d90ad20335be9061c77.2', 'TestSchema.aaa34fbd914a4d90ad20335be9061c77.3'}
[1480672858.557299][ubuntu][/home/artimi/.virtualenvs/messaging/bin/pytest][aiokafka.consumer][DEBUG] Subscribed to topic(s): {'TestSchema.aaa34fbd914a4d90ad20335be9061c77.1', 'TestSchema.aaa34fbd914a4d90ad20335be9061c77.2', 'TestSchema.aaa34fbd914a4d90ad20335be9061c77.3'}
[1480672858.558063][ubuntu][/home/artimi/.virtualenvs/messaging/bin/pytest][aiokafka][DEBUG] Sending metadata request MetadataRequest(topics=['TestSchema.aaa34fbd914a4d90ad20335be9061c77.1', 'TestSchema.aaa34fbd914a4d90ad20335be9061c77.2', 'TestSchema.aaa34fbd914a4d90ad20335be9061c77.3']) to 0
[1480672858.559933][ubuntu][/home/artimi/.virtualenvs/messaging/bin/pytest][aiokafka.group_coordinator][DEBUG] Revoking previously assigned partitions set()
[1480672858.560181][ubuntu][/home/artimi/.virtualenvs/messaging/bin/pytest][aiokafka.group_coordinator][DEBUG] (Re-)joining group 794401da87a540daba226dee38a81b1c
[1480672858.5605147][ubuntu][/home/artimi/.virtualenvs/messaging/bin/pytest][aiokafka.group_coordinator][DEBUG] Issuing request (JoinGroupRequest(group='794401da87a540daba226dee38a81b1c', session_timeout=30000, member_id='aiokafka-0.1.2-030f6183-dbbb-4733-895a-8f8fe4527dfa', protocol_type='consumer', group_protocols=[(protocol_name='roundrobin', protocol_metadata=b'\x00\x00\x00\x00\x00\x03\x00-TestSchema.aaa34fbd914a4d90ad20335be9061c77.1\x00-TestSchema.aaa34fbd914a4d90ad20335be9061c77.2\x00-TestSchema.aaa34fbd914a4d90ad20335be9061c77.3\x00\x00\x00\x00')])) to coordinator 0
[1480672858.669306][ubuntu][/home/artimi/.virtualenvs/messaging/bin/pytest][kafka.consumer.subscription_state][INFO] Updating subscribed topics to: {'TestSchema.aaa34fbd914a4d90ad20335be9061c77.2', 'TestSchema.aaa34fbd914a4d90ad20335be9061c77.3'}
[1480672858.6696045][ubuntu][/home/artimi/.virtualenvs/messaging/bin/pytest][aiokafka.consumer][DEBUG] Subscribed to topic(s): {'TestSchema.aaa34fbd914a4d90ad20335be9061c77.2', 'TestSchema.aaa34fbd914a4d90ad20335be9061c77.3'}
[1480672858.760019][ubuntu][/home/artimi/.virtualenvs/messaging/bin/pytest][aiokafka.conn][DEBUG] <AIOKafkaConnection host=10.107.20.205 port=9092> Response 7: MetadataResponse(brokers=[(node_id=0, host='10.107.20.205', port=9092)], topics=[(error_code=0, topic='TestSchema.aaa34fbd914a4d90ad20335be9061c77.1', partitions=[(error_code=0, partition=0, leader=0, replicas=[0], isr=[0])]), (error_code=5, topic='TestSchema.aaa34fbd914a4d90ad20335be9061c77.2', partitions=[]), (error_code=5, topic='TestSchema.aaa34fbd914a4d90ad20335be9061c77.3', partitions=[])])
[1480672858.7602575][ubuntu][/home/artimi/.virtualenvs/messaging/bin/pytest][kafka.cluster][WARNING] Topic TestSchema.aaa34fbd914a4d90ad20335be9061c77.2 is not available during auto-create initialization
[1480672858.7603266][ubuntu][/home/artimi/.virtualenvs/messaging/bin/pytest][kafka.cluster][WARNING] Topic TestSchema.aaa34fbd914a4d90ad20335be9061c77.3 is not available during auto-create initialization
[1480672858.773429][ubuntu][/home/artimi/.virtualenvs/messaging/bin/pytest][kafka.cluster][DEBUG] Updated cluster metadata to Cluster(brokers: 1, topics: 1, groups: 0)
[1480672858.7737765][ubuntu][/home/artimi/.virtualenvs/messaging/bin/pytest][aiokafka.conn][DEBUG] <AIOKafkaConnection host=10.107.20.205 port=9092> Response 8: JoinGroupResponse(error_code=0, generation_id=2, group_protocol='roundrobin', leader_id='aiokafka-0.1.2-030f6183-dbbb-4733-895a-8f8fe4527dfa', member_id='aiokafka-0.1.2-030f6183-dbbb-4733-895a-8f8fe4527dfa', members=[(member_id='aiokafka-0.1.2-030f6183-dbbb-4733-895a-8f8fe4527dfa', member_metadata=b'\x00\x00\x00\x00\x00\x03\x00-TestSchema.aaa34fbd914a4d90ad20335be9061c77.1\x00-TestSchema.aaa34fbd914a4d90ad20335be9061c77.2\x00-TestSchema.aaa34fbd914a4d90ad20335be9061c77.3\x00\x00\x00\x00')])
[1480672858.7740512][ubuntu][/home/artimi/.virtualenvs/messaging/bin/pytest][aiokafka.group_coordinator][DEBUG] Join group response JoinGroupResponse(error_code=0, generation_id=2, group_protocol='roundrobin', leader_id='aiokafka-0.1.2-030f6183-dbbb-4733-895a-8f8fe4527dfa', member_id='aiokafka-0.1.2-030f6183-dbbb-4733-895a-8f8fe4527dfa', members=[(member_id='aiokafka-0.1.2-030f6183-dbbb-4733-895a-8f8fe4527dfa', member_metadata=b'\x00\x00\x00\x00\x00\x03\x00-TestSchema.aaa34fbd914a4d90ad20335be9061c77.1\x00-TestSchema.aaa34fbd914a4d90ad20335be9061c77.2\x00-TestSchema.aaa34fbd914a4d90ad20335be9061c77.3\x00\x00\x00\x00')])
[1480672858.7741585][ubuntu][/home/artimi/.virtualenvs/messaging/bin/pytest][aiokafka.group_coordinator][INFO] Joined group '794401da87a540daba226dee38a81b1c' (generation 2) with member_id aiokafka-0.1.2-030f6183-dbbb-4733-895a-8f8fe4527dfa
[1480672858.774216][ubuntu][/home/artimi/.virtualenvs/messaging/bin/pytest][aiokafka.group_coordinator][INFO] Elected group leader -- performing partition assignments using roundrobin
[1480672858.7743132][ubuntu][/home/artimi/.virtualenvs/messaging/bin/pytest][aiokafka.group_coordinator][DEBUG] Performing roundrobin assignment for subscriptions {'aiokafka-0.1.2-030f6183-dbbb-4733-895a-8f8fe4527dfa': ConsumerProtocolMemberMetadata(version=0, subscription=['TestSchema.aaa34fbd914a4d90ad20335be9061c77.1', 'TestSchema.aaa34fbd914a4d90ad20335be9061c77.2', 'TestSchema.aaa34fbd914a4d90ad20335be9061c77.3'], user_data=b'')}
[1480672858.7743835][ubuntu][/home/artimi/.virtualenvs/messaging/bin/pytest][kafka.coordinator.assignors.roundrobin][WARNING] No partition metadata for topic TestSchema.aaa34fbd914a4d90ad20335be9061c77.2
[1480672858.774436][ubuntu][/home/artimi/.virtualenvs/messaging/bin/pytest][kafka.coordinator.assignors.roundrobin][WARNING] No partition metadata for topic TestSchema.aaa34fbd914a4d90ad20335be9061c77.3
[1480672858.774536][ubuntu][/home/artimi/.virtualenvs/messaging/bin/pytest][aiokafka.group_coordinator][DEBUG] Finished assignment: {'aiokafka-0.1.2-030f6183-dbbb-4733-895a-8f8fe4527dfa': ConsumerProtocolMemberAssignment(version=0, assignment=[(topic='TestSchema.aaa34fbd914a4d90ad20335be9061c77.1', partitions=[0])], user_data=b'')}
[1480672858.774634][ubuntu][/home/artimi/.virtualenvs/messaging/bin/pytest][aiokafka.group_coordinator][DEBUG] Issuing leader SyncGroup (SyncGroupRequest(group='794401da87a540daba226dee38a81b1c', generation_id=2, member_id='aiokafka-0.1.2-030f6183-dbbb-4733-895a-8f8fe4527dfa', group_assignment=[(member_id='aiokafka-0.1.2-030f6183-dbbb-4733-895a-8f8fe4527dfa', member_metadata=b'\x00\x00\x00\x00\x00\x01\x00-TestSchema.aaa34fbd914a4d90ad20335be9061c77.1\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00')])) to coordinator 0
[1480672858.7753685][ubuntu][/home/artimi/.virtualenvs/messaging/bin/pytest][aiokafka.conn][DEBUG] <AIOKafkaConnection host=10.107.20.205 port=9092> Response 9: SyncGroupResponse(error_code=0, member_assignment=b'\x00\x00\x00\x00\x00\x01\x00-TestSchema.aaa34fbd914a4d90ad20335be9061c77.1\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00')
[1480672858.77553][ubuntu][/home/artimi/.virtualenvs/messaging/bin/pytest][aiokafka.group_coordinator][DEBUG] Received successful sync group response for group 794401da87a540daba226dee38a81b1c: SyncGroupResponse(error_code=0, member_assignment=b'\x00\x00\x00\x00\x00\x01\x00-TestSchema.aaa34fbd914a4d90ad20335be9061c77.1\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00')
You can see on line 2 that we are subscribing to topics ending with 1, 2 and 3. Then on line 4 MetadataRequest
is sent. Then on line 8 topics are updated to leave out 1 and only keep 2 and 3. But on line 10 MetadataResponse
finally arrives with topic 1 as successful and topic 2 and 3 with error 5 (that does not bother us right now). So we are now in situation where we have subscriptions == {2, 3}
, assignments == {1}
. Then when kafka method SubscriptionState.assign_from_subscribed()
is called it discovers that the topic partion 1 is not in list of subscription and raises that previously described error.
Basically, assignment partitions are in state that does not correspond with current subscriptions because metadata are not updated fast (often?) enough. We hopefully found a workaround this by calling force_metadata_update
after every unsubscription (it is quite uncommon in our workflow so we can afford it).
This happens rarely and unfortunately I could not create reliable minimal working example of this bug. I hope this helps somebody because I spent some time tracking this one.
This happened with aiokafka == 0.1.2
and kafka in docker container quantlane/kafka:0.9
.
Hello, just got to a point when events are processed with no problem until I assign my consumer to a group.
When I assign my consumer to a group the events are processed for only about 2 minutes and then it constantly returning an ERROR and not processing any messages:
storage_in_1 | 2016-05-27 12:58:06,471 - aiokafka.group_coordinator - ERROR - Skipping heartbeat: no active group: GroupCoordinatorNotAvailableError - 15 - The broker returns this error code for group coordinator requests, offset commits, and most group management request
s if the offsets topic has not yet been created, or if the group coordinator is not active.
storage_in_1 | 2016-05-27 12:58:06,573 - aiokafka.group_coordinator - ERROR - Skipping heartbeat: no active group: GroupCoordinatorNotAvailableError - 15 - The broker returns this error code for group coordinator requests, offset commits, and most group management request
s if the offsets topic has not yet been created, or if the group coordinator is not active.
Just before this happens, group_coordinator
outputs:
storage_in_1 | 2016-05-27 12:58:05,778 - aiokafka.group_coordinator - INFO - Heartbeat failed: local member_id was not recognized; resetting and re-joining group
storage_in_1 | 2016-05-27 12:58:05,779 - aiokafka.group_coordinator - ERROR - Heartbeat session expired - marking coordinator dead
storage_in_1 | 2016-05-27 12:58:05,779 - aiokafka.group_coordinator - INFO - Marking the coordinator dead (node 0): None.
storage_in_1 | 2016-05-27 12:58:05,779 - aiokafka.group_coordinator - ERROR - OffsetCommit failed for group test-consumer-group due to group error (IllegalGenerationError - 22 - Returned from group membership requests (such as heartbeats) when the generation id provided i
n the request is not the current generation.), will rejoin
storage_in_1 | 2016-05-27 12:58:05,780 - aiokafka.group_coordinator - WARNING - Auto offset commit failed: IllegalGenerationError - 22 - Returned from group membership requests (such as heartbeats) when the generation id provided in the request is not the current generati
on.
The consumer is configured as:
AIOKafkaConsumer(
'some-topic',
loop=loop,
bootstrap_servers='my-local-kafka-url',
group_id='some-group-name'
)
Kafka version: flozano/kafka:0.9.0.1
aiokafka version: aiokafka (0.1.2)
Any idea how can we resolve this issue?
Thanks
edit:
Just tried version 0.1.0, 0.1.1, 0.1.2, latest master and having same issue on all the versions
In README.rst
there is written that all you need to do is:
docker
libsnappy-dev
make setup
make test
However after few hours I've discovered that you need to have installed also utility keytool
https://docs.oracle.com/javase/6/docs/technotes/tools/solaris/keytool.html that is part of Java (not installed on my computer before). It is used in script gen-ssl-certs.sh
to generate few files and one of them was br_server.keystore.jks
. gen-ssl-certs.sh
on tests start silently failed and didn't make those files. Kafka in docker then failed everytime it tried to start on exception:
org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: java.io.FileNotFoundException: /ssl_cert/br_server.keystore.jks (No such file or directory)
I recommend to update README with keytool
requirements and also raise exception in fixture ssl_folder.py
if gen-ssl-certs.sh
does not return 0.
Hi,
I'm running latest PyPi aiokafka 0.2.2. with kafka-python 1.3.1, and when running the following code I get the following mysterious error, which disappears when I downgrade to 0.2.1 - can you please take a look?
from kafka.common import KafkaError
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
import asyncio, random
topic = 'test_topic_' + str(random.randint(1, 100000))
async def produce():
# Just adds message to sending queue
future = await producer.send(topic, b'some_message_bytes')
resp = await future
print("Message produced: partition {}; offset {}".format(
resp.partition, resp.offset))
async def consume_task(consumer):
try:
msg = await consumer.getone()
print("consumed: ", msg.topic, msg.partition, msg.offset,
msg.key, msg.value, msg.timestamp)
except KafkaError as err:
print("error while consuming message: ", err)
loop = asyncio.get_event_loop()
producer = AIOKafkaProducer(loop=loop, bootstrap_servers='192.168.99.100:9092')
# Bootstrap client, will get initial cluster metadata
loop.run_until_complete(producer.start())
loop.run_until_complete(produce())
loop.run_until_complete(producer.stop())
consumer = AIOKafkaConsumer(topic, loop=loop, bootstrap_servers='192.168.99.100:9092')
# Bootstrap client, will get initial cluster metadata
loop.run_until_complete(consumer.start())
loop.run_until_complete(consume_task(consumer))
# Will gracefully leave consumer group; perform autocommit if enabled
loop.run_until_complete(consumer.stop())
Error:
Task exception was never retrieved
future: <Task finished coro=<AIOKafkaConsumer._update_fetch_positions() done, defined at C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\aiokafka\consumer.py:576> exception=ValueError(<class 'struct.error'>,)>
Traceback (most recent call last):
File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\kafka\protocol\types.py", line 10, in _pack
return pack(f, value)
struct.error: required argument is not an integer
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\Users\Egor\Anaconda2\envs\py3k\lib\asyncio\tasks.py", line 239, in _step
result = coro.send(None)
File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\aiokafka\consumer.py", line 595, in _update_fetch_positions
yield from self._fetcher.update_fetch_positions(partitions)
File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\aiokafka\fetcher.py", line 514, in update_fetch_positions
x.result()
File "C:\Users\Egor\Anaconda2\envs\py3k\lib\asyncio\futures.py", line 274, in result
raise self._exception
File "C:\Users\Egor\Anaconda2\envs\py3k\lib\asyncio\tasks.py", line 239, in _step
result = coro.send(None)
File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\aiokafka\fetcher.py", line 535, in _reset_offset
offset = yield from self._offset(partition, timestamp)
File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\aiokafka\fetcher.py", line 560, in _offset
partition, timestamp)
File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\aiokafka\fetcher.py", line 598, in _proc_offset_request
response = yield from self._client.send(node_id, request)
File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\aiokafka\client.py", line 375, in send
request, expect_response=expect_response)
File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\aiokafka\conn.py", line 141, in send
message = header.encode() + request.encode()
File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\kafka\protocol\struct.py", line 34, in _encode_self
[self.__dict__[name] for name in self.SCHEMA.names]
File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\kafka\protocol\types.py", line 124, in encode
for i, field in enumerate(self.fields)
File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\kafka\protocol\types.py", line 124, in <listcomp>
for i, field in enumerate(self.fields)
File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\kafka\protocol\types.py", line 162, in encode
[self.array_of.encode(item) for item in items]
File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\kafka\protocol\types.py", line 162, in <listcomp>
[self.array_of.encode(item) for item in items]
File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\kafka\protocol\types.py", line 124, in encode
for i, field in enumerate(self.fields)
File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\kafka\protocol\types.py", line 124, in <listcomp>
for i, field in enumerate(self.fields)
File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\kafka\protocol\types.py", line 162, in encode
[self.array_of.encode(item) for item in items]
File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\kafka\protocol\types.py", line 162, in <listcomp>
[self.array_of.encode(item) for item in items]
File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\kafka\protocol\types.py", line 124, in encode
for i, field in enumerate(self.fields)
File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\kafka\protocol\types.py", line 124, in <listcomp>
for i, field in enumerate(self.fields)
File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\kafka\protocol\types.py", line 56, in encode
return _pack('>q', value)
File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\kafka\protocol\types.py", line 12, in _pack
raise ValueError(error)
ValueError: <class 'struct.error'>
Message produced: partition 0; offset 0
Hi,
we are having a few network interruption lately and so I'm digging in reconnecting. I was not able to find reconnect feature in aiokafka
in conn.py
nor in client.py
. Is it implented in aiokafka
elsewhere or is it missing?
I was looking in the kafka documentation and there is an option reconnect.backoff.ms
for consumer and producer (https://kafka.apache.org/0102/documentation/#newconsumerconfigs) that prevents too fast reconnecting in case of failure. In kafka-python
it is implemented in KafkaClient._maybe_refresh_metadata
and another part in KafkaClient._boostrap
.
I was trying to drop connection using tcpkill and I discovered that it blocks connection from rising sequence of ports
10.107.20.205:39614 > 172.17.0.2:9092: R 2713530037:2713530037(0) win 0
...
10.107.20.205:39616 > 172.17.0.2:9092: R 3723147408:3723147408(0) win 0
...
10.107.20.205:39618 > 172.17.0.2:9092: R 830983828:830983828(0) win 0
Do you have an idea how can this happen? Is there some reconnect after all? However when I stopped tcpkill
no new messages went through although in kafka-python
it reconnected and started to accepting messages again.
I'm willing to implement this feature but I would welcome any advice where it would be best to put (I was thinking about putting it in conn.py
in case of some error in sending message).
I'm trying to get the test suite running but no luck. I'm on OSX und use Python 3.5.
When I run make cov
as suggested, I get this error:
$ make cov
extra=$(python -c "import sys;sys.stdout.write('--exclude tests/test_pep492.py') if sys.version_info[:3] < (3, 5, 0) else sys.stdout.write('')"); \
flake8 aiokafka tests $extra
tests/test_pep492.py:23:17: F999 'break' outside loop
tests/test_pep492.py:46:21: F999 'break' outside loop
If i fix those by adding # noqa
behind them, flake8
passes but the tests can't seem to communicate with Docker:
$ KAFKA_VERSION=0.10.0.0 SCALA_VERSION=2.11 make cov
extra=$(python -c "import sys;sys.stdout.write('--exclude tests/test_pep492.py') if sys.version_info[:3] < (3, 5, 0) else sys.stdout.write('')"); \
flake8 aiokafka tests $extra
=========================================================================================================== test session starts ============================================================================================================
platform darwin -- Python 3.5.0, pytest-2.9.2, py-1.4.31, pluggy-0.3.1
rootdir: /Users/fkochem/workspace/code/aiokafka, inifile:
plugins: catchlog-1.2.2, cov-2.3.0
collected 55 items
tests/test_client.py EEEEEEEEE
tests/test_conn.py EEEEEEEE
tests/test_consumer.py EEEEEEEEEEEEEE
tests/test_coordinator.py EEEEEEEEE
tests/test_fetcher.py ..
tests/test_message_accumulator.py ..
tests/test_pep492.py EEE
tests/test_producer.py EEEEEEEE
---------- coverage: platform darwin, python 3.5.0-final-0 -----------
Coverage HTML written to dir htmlcov
================================================================================================================== ERRORS ==================================================================================================================
_________________________________________________________________________________________ ERROR at setup of TestAIOKafkaClient.test_init_with_csv __________________________________________________________________________________________
docker = <docker.client.Client object at 0x10a6decf8>
@pytest.fixture(scope='session')
def docker_ip_address(docker):
"""Returns IP address of the docker daemon service."""
# Fallback docker daemon bridge name
ifname = 'docker0'
try:
for network in docker.networks():
_ifname = docker.networks()[0]['Options'].get(
'com.docker.network.bridge.name')
if _ifname is not None:
ifname = _ifname
break
except libdocker.errors.InvalidVersion:
pass
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
return socket.inet_ntoa(fcntl.ioctl(
s.fileno(),
0x8915, # SIOCGIFADDR
> struct.pack('256s', ifname[:15].encode('utf-8')))[20:24])
E OSError: [Errno 6] Device not configured
tests/conftest.py:42: OSError
All tests error out with the same exception.
Hi,
when I try to run both examples from the main page of this project, I get the following error on both the producer and the consumer:
future: <Task finished coro=<wait_for() done, defined at /home/egor/anaconda3/lib/python3.5/asyncio/tasks.py:355> exception=CorrelationIdError('Correlation ids do not match: sent 1, recv 2',)>
Traceback (most recent call last):
File "/home/egor/anaconda3/lib/python3.5/asyncio/tasks.py", line 239, in _step
result = coro.send(None)
File "/home/egor/anaconda3/lib/python3.5/asyncio/tasks.py", line 392, in wait_for
return fut.result()
File "/home/egor/anaconda3/lib/python3.5/asyncio/futures.py", line 274, in result
raise self._exception
kafka.errors.CorrelationIdError: CorrelationIdError: Correlation ids do not match: sent 1, recv 2
Any idea how to fix it? Notebook attached, am using Kafka 0.9.0.1 and Python 3.5 on Ubuntu 16.04
Thanks a lot!
E.
Currently Consumer does something like:
assert all(map(lambda k: isinstance(k, TopicPartition), partitions))
Next release (0.2.0) will feature full compatibility with lastest Kafka and kafka-python. This is just a simple TODO for the compatibility review (mostly build from the kafka-python changelog):
This one's not so tricky, but can be a pain. If we start a rebalance right after subscription changes and MetadataResponse doesn't arrive before leader performs partition assignment we can end up with an assignment of 0 partitions, as leader just does not know about a new topic yet.
Any plans to make docs and examples?
AIOKafkaClient store list of topics and pass it into MetadataRequest. So if a new topic appear in kafka,
then consumer did not know anything about it. Does it really need - to store topic list inside client? Or we just can always do MetadataRequest with empty topic list to get all inforamtion about current state of kafka?
Java client implemented this as of 0.10. See https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#offsetsForTimes(java.util.Map)
asyncio
context (like number of select calls).There is a problem with Fetcher
, I don't know if its bad design or simply "ported as is" issue, anyway:
fetched_records
and next_record
methods use _wait_empty_future
to halt until results are read,
but this would only work if you had only one consumer task running. If you'd need to run several
asyncio tasks for consuming and processing messages you'll definetly end up with
_wait_empty_future
being overriden with other one from other task and thus leave first task
waiting for future's result until loop is closed.
https://github.com/aio-libs/aiokafka/blob/master/aiokafka/fetcher.py#L604
Basically there is no check if _wait_empty_future
already set (not None)
offset_reset_strategy
is not enough if we have already commited some offset. We probably should return the seek_to_*
API's.
Nothing critical, but annoying during tests. If you execute consumer.getmany(timeout_ms=500)
it can return {}
before the timeout expires in case of assignment change, as it only waits on data once and that data is discarded as not assigned.
It should never be a bad thing for an actual application, as it will be running in a while True:
loop and will only cause an unneeded loop iteration.
kafka-python already has support for that, so it shouldn't be that hard.
RPM build fails as it cannot find CHANGES.rst
python3 setup.py bdist_rpm
- STATUS=0
- '[' 0 -ne 0 ']'
- cd aiokafka-0.0.1
- /usr/bin/chmod -Rf a+rX,u+w,g-w,o-w .
- exit 0
Executing(%build): /bin/sh -e /var/tmp/rpm-tmp.iKQpUb- umask 022
- cd /home/salehi/projects/need-seek/backend/aiokafka-master/build/bdist.linux-x86_64/rpm/BUILD
- cd aiokafka-0.0.1
- python3 setup.py build
Traceback (most recent call last):
File "setup.py", line 53, in
long_description='\n\n'.join((read('README.rst'), read('CHANGES.rst'))),
File "setup.py", line 20, in read
return open(os.path.join(os.path.dirname(file), f)).read().strip()
FileNotFoundError: [Errno 2] No such file or directory: 'CHANGES.rst'
error: Bad exit status from /var/tmp/rpm-tmp.iKQpUb (%build)RPM build errors:
Bad exit status from /var/tmp/rpm-tmp.iKQpUb (%build)
error: command 'rpmbuild' failed with exit status 1
Now the async iterator will still continue waiting for data even after consumer.stop()
call. I think this is not correct behaviour, as this makes cancellation the only option to stop our consumer's.
Kafka cluster: 0.10.2.1
aiokafka: 0.2.2 (kafka-python 1.3.1)
Python: 3.5.2
import asyncio
from aiokafka import AIOKafkaProducer
@asyncio.coroutine
def produce(loop):
while True:
resp = yield from producer.send_and_wait('my-topic', value=b'some_message_bytes')
print("Message produced: partition {}; offset {}".format(resp.partition, resp.offset))
yield from asyncio.sleep(1)
loop = asyncio.get_event_loop()
producer = AIOKafkaProducer(loop=loop, bootstrap_servers='bootstrap-server1:9092,bootstrap-server2:9092,bootstrap-server3:9092')
loop.run_until_complete(producer.start())
loop.run_until_complete(produce(loop))
loop.run_until_complete(producer.stop())
loop.close()
If you send kill command to the script, you will see this exception:
Task exception was never retrieved
future: <Task finished coro=<AIOKafkaProducer._send_produce_req() done, defined at lib/python3.5/site-packages/aiokafka/producer.py:331> exception=ValueError(<class 'struct.error'>,)>
Traceback (most recent call last):
File "lib/python3.5/site-packages/kafka/protocol/types.py", line 17, in _unpack
(value,) = unpack(f, data)
struct.error: unpack requires a bytes object of length 4
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "lib/python3.5/asyncio/tasks.py", line 239, in _step
result = coro.send(None)
File "lib/python3.5/site-packages/aiokafka/producer.py", line 365, in _send_produce_req
response = yield from self.client.send(node_id, request)
File "lib/python3.5/site-packages/aiokafka/client.py", line 375, in send
request, expect_response=expect_response)
File "lib/python3.5/site-packages/aiokafka/conn.py", line 141, in send
message = header.encode() + request.encode()
File "lib/python3.5/site-packages/kafka/protocol/struct.py", line 34, in _encode_self
[self.__dict__[name] for name in self.SCHEMA.names]
File "lib/python3.5/site-packages/kafka/protocol/types.py", line 124, in encode
for i, field in enumerate(self.fields)
File "lib/python3.5/site-packages/kafka/protocol/types.py", line 124, in <listcomp>
for i, field in enumerate(self.fields)
File "lib/python3.5/site-packages/kafka/protocol/types.py", line 162, in encode
[self.array_of.encode(item) for item in items]
File "lib/python3.5/site-packages/kafka/protocol/types.py", line 162, in <listcomp>
[self.array_of.encode(item) for item in items]
File "lib/python3.5/site-packages/kafka/protocol/types.py", line 124, in encode
for i, field in enumerate(self.fields)
File "lib/python3.5/site-packages/kafka/protocol/types.py", line 124, in <listcomp>
for i, field in enumerate(self.fields)
File "lib/python3.5/site-packages/kafka/protocol/types.py", line 162, in encode
[self.array_of.encode(item) for item in items]
File "lib/python3.5/site-packages/kafka/protocol/types.py", line 162, in <listcomp>
[self.array_of.encode(item) for item in items]
File "lib/python3.5/site-packages/kafka/protocol/types.py", line 124, in encode
for i, field in enumerate(self.fields)
File "lib/python3.5/site-packages/kafka/protocol/types.py", line 124, in <listcomp>
for i, field in enumerate(self.fields)
File "lib/python3.5/site-packages/kafka/protocol/message.py", line 154, in encode
size = Int32.decode(items)
File "lib/python3.5/site-packages/kafka/protocol/types.py", line 50, in decode
return _unpack('>i', data.read(4))
File "lib/python3.5/site-packages/kafka/protocol/types.py", line 20, in _unpack
raise ValueError(error)
ValueError: <class 'struct.error'>
We are looking into this, but feel free to comment.
We already have the iterator interface, so it's not that critical, but Kafka's consumers introduced it, and we could do it too. Details in KIP-41
Is there a plan on when aiokafka will support the newly added exactly once semantics using Idempotent and Transactional Producers?
One of the idea's of listener
's API is to allow setUp and tearDown of topic or partition resources. Like invalidate or populate caches, commit last batch of messages, etc.
In the current implementation we only allow to call functions on_partitions_revoked
and on_partitions_assigned
. It will not stall group management (new messages will be fetched right away, Join Group will happen right away) defeating the purpose.
TODO:
on_partitions_assigned
and on_partitions_revoked
are by any chance coroutine functions (asyncio.iscoroutinefunction
) and await them if they are.consumer.rst
docs. Fix subscribe
docstring.If I let aiokafka connect to Kafka 0.10 after it has just started, it keeps spamming out these error messages and won't stop:
[2016-07-21 08:00:04,646] ERROR [aiokafka.group_coordinator]: Skipping heartbeat: no active group:
[2016-07-21 08:00:04,747] ERROR [aiokafka.group_coordinator]: Skipping heartbeat: no active group:
[2016-07-21 08:00:04,849] ERROR [aiokafka.group_coordinator]: Skipping heartbeat: no active group:
[2016-07-21 08:00:04,950] ERROR [aiokafka.group_coordinator]: Skipping heartbeat: no active group:
If I stop my app, keep Kafka running and start my app again it will connect just fine.
This forces me to introduce sleep()
commands to my code and hope that after an arbitrary delay everything works.
Hi,
when I run aiokafka consumer with consumer_timeout=100, it should terminate 100ms after reaching the end of the topic; instead (as demonstrated in the attached notebook) it hangs on, presumably waiting for the next record to show up, until terminated from keyboard.
Is it a bug or am I doing something wrong?
Thanks a lot!
consumer_timeout ignored.zip
Hi,
when I run the following code using kafka-python 1.3.1, it runs fine.
from kafka.common import KafkaError
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
import asyncio
topic = 'test_topic'
def produce(loop):
# Just adds message to sending queue
future = yield from producer.send(topic, b'some_message_bytes')
resp = yield from future
print("Message produced: partition {}; offset {}".format(
resp.partition, resp.offset))
loop = asyncio.get_event_loop()
producer = AIOKafkaProducer(loop=loop, bootstrap_servers='192.168.99.100:9092')
# Bootstrap client, will get initial cluster metadata
loop.run_until_complete(producer.start())
loop.run_until_complete(produce(loop))
loop.run_until_complete(producer.stop())
Yet when I update to kafka-python 1.3.3, it produces the error below.
Could you please take a look?
Thanks a lot!
AttributeError Traceback (most recent call last)
<ipython-input-1-81a83ca85581> in <module>()
14 # Bootstrap client, will get initial cluster metadata
15 loop.run_until_complete(producer.start())
---> 16 loop.run_until_complete(produce(loop))
17 loop.run_until_complete(producer.stop())
C:\Users\Egor\Anaconda2\envs\py3k\lib\asyncio\base_events.py in run_until_complete(self, future)
385 raise RuntimeError('Event loop stopped before Future completed.')
386
--> 387 return future.result()
388
389 def stop(self):
C:\Users\Egor\Anaconda2\envs\py3k\lib\asyncio\futures.py in result(self)
272 self._tb_logger = None
273 if self._exception is not None:
--> 274 raise self._exception
275 return self._result
276
C:\Users\Egor\Anaconda2\envs\py3k\lib\asyncio\tasks.py in _step(***failed resolving arguments***)
237 # We use the `send` method directly, because coroutines
238 # don't have `__iter__` and `__next__` methods.
--> 239 result = coro.send(None)
240 else:
241 result = coro.throw(exc)
<ipython-input-1-81a83ca85581> in produce(loop)
5 def produce(loop):
6 # Just adds message to sending queue
----> 7 future = yield from producer.send(topic, b'some_message_bytes')
8 resp = yield from future
9 print("Message produced: partition {}; offset {}".format(
C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\aiokafka\producer.py in send(self, topic, value, key, partition)
277
278 fut = yield from self._message_accumulator.add_message(
--> 279 tp, key_bytes, value_bytes, self._request_timeout_ms / 1000)
280 return fut
281
C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\aiokafka\message_accumulator.py in add_message(self, tp, key, value, timeout)
197 self._wait_data_future.set_result(None)
198
--> 199 future = batch.append(key, value)
200 if future is None:
201 # Batch is full, can't append data atm,
C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\aiokafka\message_accumulator.py in append(self, key, value)
83 return None
84
---> 85 encoded = Message(value, key=key, magic=self._version_id).encode()
86 msg = Int64.encode(self._relative_offset) + Int32.encode(len(encoded))
87 msg += encoded
C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\kafka\util.py in __call__(self, *args, **kwargs)
151 Calls the method on target with args and kwargs.
152 """
--> 153 return self.method()(self.target(), *args, **kwargs)
154
155 def __hash__(self):
C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\kafka\protocol\message.py in _encode_self(self, recalc_crc)
71
72 def _encode_self(self, recalc_crc=True):
---> 73 version = self.magic
74 if version == 1:
75 fields = (self.crc, self.magic, self.attributes, self.timestamp, self.key, self.value)
AttributeError: 'NoneType' object has no attribute 'magic'
As mypy
is becoming more of a talk, it would be great to be have a defined schema for all public API in aiokafka
. Might as well look if we can benefit from using it internally.
In process of porting python-kafka
code to asyncio
I can see many places, that require version-dependant management of Kafka cluster. The author of python-kafka
fixed those in process, but I have no idea why and what were the issues. It's hard to understand some concepts and just blindly port them I think will resolve in quite some support work to do.
For now I think we should focus on the Last Kafka release - 0.9. I can understand the need to support old clusters, as most of the existing ones are older. I doubt there are alot of 0.9 production environments (if any). But the driver was never released and 0.9 has very big changes in cluster management of the consumer.
We can always add backward compatibility after we review the 0.9 code good enough.
Any objections?
This is a quite handy configuration. Clients must handle big clusters, but if we don't drop connections long running consumers may be troublesome for brokers.
Followup dev ticket for #128. See also https://issues.apache.org/jira/browse/KAFKA-1925
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.