Code Monkey home page Code Monkey logo

faust's People

Contributors

ask avatar atomsforpeace avatar billaram avatar bobh66 avatar bryantbiggs avatar cdeil avatar cesarpantoja avatar dhruvapatil98 avatar espenalbert avatar fr-ser avatar hustclf avatar jamshedvesuna avatar jerrylinew avatar jsurloppe avatar kataev avatar lsabi avatar lvwerra avatar marcosschroh avatar martinmaillard avatar mihatroha avatar nemosupremo avatar omarrayward avatar patkivikram avatar robertzk avatar rubyw avatar seifertm avatar sheshtawy avatar swist avatar trauter avatar witekest avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

faust's Issues

CommitFailedError when a new generation may have formed

A new generation may have formed in the consumer groups and we see this error:

[2018-05-09 09:01:27,183: ERROR]: [^--Consumer]: Committing raised exception: CommitFailedError('Commit cannot be completed since the group has already rebalanced and may have assigned the partitions to another member',)
Traceback (most recent call last):
  File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/consumer/group_coordinator.py", line 820, in commit_offsets
    loop=self._loop)
  File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/consumer/group_coordinator.py", line 940, in _do_commit_offsets
    raise first_error
kafka.errors.RebalanceInProgressError: [Error 27] RebalanceInProgressError: risk-faust-v15

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/robinhood/env/lib/python3.6/site-packages/faust/transport/drivers/aiokafka.py", line 332, in _commit
    await self._consumer.commit(commitable)
  File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/consumer/consumer.py", line 436, in commit
    yield from self._coordinator.commit_offsets(assignment, offsets)
  File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/consumer/group_coordinator.py", line 825, in commit_offsets
    "Commit cannot be completed since the group has already "
kafka.errors.CommitFailedError: CommitFailedError: Commit cannot be completed since the group has already rebalanced and may have assigned the partitions to another member

We should die hard on this error.

Error while releasing lock

Saw this issue in one of the production boxes. This is not reproducible.

[2018-05-09 13:51:45,694: INFO]: [^--Consumer]: Waiting for lock to pause partitions
[2018-05-09 13:51:45,694: INFO]: [^--Consumer]: Acquired lock to pause partitions
[2018-05-09 13:51:45,694: INFO]: [^--Consumer]: Released pause partitions lock
[2018-05-09 13:51:45,694: INFO]: [^--Fetcher]: Starting...
[2018-05-09 13:51:45,695: INFO]: [^-App]: Restarted fetcher
[2018-05-09 13:51:45,696: INFO]: [^--TableManager]: Triggered recovery in background
[2018-05-09 13:51:45,697: INFO]: [^--TableManager]: New assignments found
[2018-05-09 13:51:45,697: INFO]: [^--Consumer]: Waiting for lock to pause partitions
[2018-05-09 13:51:45,697: INFO]: [^--Consumer]: Acquired lock to pause partitions
[2018-05-09 13:51:45,697: INFO]: [^--Consumer]: Released pause partitions lock
[2018-05-09 13:51:45,698: INFO]: [^--TableManager]: Restoring state from changelog topics...
[2018-05-09 13:51:45,698: INFO]: [^--TableManager]: Waiting for restore to finish...
[2018-05-09 13:51:45,698: INFO]: [^--TableManager]: Done reading all changelogs
[2018-05-09 13:51:45,698: INFO]: [^--TableManager]: Done reading from changelog topics
[2018-05-09 13:51:45,699: INFO]: [^--TableManager]: Stopped restoring
[2018-05-09 13:51:45,699: INFO]: [^--TableManager]: Restore complete!
[2018-05-09 13:51:45,706: INFO]: [^--TableManager]: Attempting to start standbys
[2018-05-09 13:51:45,706: INFO]: [^--TableManager]: New assignments handled
[2018-05-09 13:51:45,706: INFO]: [^--Consumer]: Waiting for lock to resume partitions
[2018-05-09 13:51:45,707: INFO]: [^--Consumer]: Acquired lock to resume partitions
[2018-05-09 13:51:45,707: INFO]: [^--Consumer]: Released resume partitions lock
[2018-05-09 13:51:45,729: ERROR]: Unexpected error in fetcher routine
Traceback (most recent call last):
  File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/consumer/fetcher.py", line 328, in _fetch_requests_routine
    yield from task
  File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/consumer/fetcher.py", line 624, in _update_fetch_positions
    node_id, topic_data)
  File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/consumer/fetcher.py", line 747, in _proc_offset_request
    response = yield from self._client.send(node_id, request)
  File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/client.py", line 403, in send
    if not (yield from self.ready(node_id, group=group)):
  File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/client.py", line 381, in ready
    conn = yield from self._get_conn(node_id, group=group)
  File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/client.py", line 369, in _get_conn
    max_idle_ms=self._connections_max_idle_ms)
  File "/home/robinhood/python-3.6.3/lib/python3.6/asyncio/locks.py", line 38, in __exit__
    self._lock.release()
  File "/home/robinhood/python-3.6.3/lib/python3.6/asyncio/locks.py", line 201, in release
    raise RuntimeError('Lock is not acquired.')
RuntimeError: Lock is not acquired.

Retarting the app worked.

faust.Record isodates argument doesn't work in 1.0.16

Getting the following error with varys code upon upgrading from faust 1.0.11 to faust 1.0.16:

Traceback (most recent call last):
  File "examples/psql.py", line 7, in <module>
    from varys.shovels.psql import PSQLShovel
  File "/Users/vineet/.virtualenvs/varys/lib/python3.6/site-packages/varys/shovels/psql/__init__.py", line 1, in <module>
    from .shovel import PSQLShovel, PSQLTask
  File "/Users/vineet/.virtualenvs/varys/lib/python3.6/site-packages/varys/shovels/psql/shovel.py", line 94, in <module>
    class PSQLTask(Task, isodates=True):
TypeError: __new__() got an unexpected keyword argument 'isodates'

Add support for SASL authentication

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

N/A

Expected behavior

Ideally, there would be the (documented) ability to connect to a remote Kafka broker via SASL authentication.

Actual behavior

I searched through the docs & spent a while greping, and I found no way to authenticate/connect to a remote Kafka broker.

Versions

N/A

Potential deadlock

I noticed that when new boxes came up for scroll, it when into a heartbeat failure loop which is typical of deadlocks during the rebalance. I was able to confirm that there was a deadlock in the on_partition_revoked as can be seen here:

[2018-06-06 17:38:41,498: INFO]: [^-App]: [Flight Recorder-3] (started at Wed Jun  6 17:37:41 2018) Replaying logs...
[2018-06-06 17:38:41,499: INFO]: [^-App]: [Flight Recorder-3] (Wed Jun  6 17:37:41 2018) fetcher.stop()
[2018-06-06 17:38:41,499: INFO]: [^-App]: [Flight Recorder-3] (Wed Jun  6 17:37:41 2018) topics.on_partitions_revoked()
[2018-06-06 17:38:41,499: INFO]: [^-App]: [Flight Recorder-3] (Wed Jun  6 17:37:41 2018) tables.on_partitions_revoked()
[2018-06-06 17:38:41,499: INFO]: [^-App]: [Flight Recorder-3] (Wed Jun  6 17:37:41 2018) +TABLES: maybe_abort_ongoing_recovery
[2018-06-06 17:38:41,499: INFO]: [^-App]: [Flight Recorder-3] -End of log-

Json load error with string values in topics

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

Run the following hello_world app:

import faust

app = faust.App('hello-world', url='kafka://localhost:9092')

greetings_topic = app.topic('greetings')

@app.actor(greetings_topic)
async def print_greetings(greetings):
    async for greeting in greetings:
        print(greeting)

Expected behavior

Print the greetings (strings)

Actual behavior

Got a json.load error

Full traceback

[2017-08-18 15:46:03,252: ERROR] Cannot decode value for key=None (b'sadkjfh'): ValueDecodeError('Expecting value: line 1 column 1 (char 0)',)
Traceback (most recent call last):
  File "/Users/vineet/faust/faust/topics.py", line 353, in deliver
    v = await loads_value(value_type, message.value)
  File "/Users/vineet/faust/faust/serializers/registry.py", line 94, in loads_value
    str(exc)).with_traceback(sys.exc_info()[2]) from None
  File "/Users/vineet/faust/faust/serializers/registry.py", line 88, in loads_value
    await self._loads(serializer, value))
  File "/Users/vineet/faust/faust/serializers/registry.py", line 71, in _loads
    return loads(serializer, data)
  File "/Users/vineet/faust/faust/serializers/codecs.py", line 321, in loads
    return get_codec(codec).loads(s) if codec else s
  File "/Users/vineet/faust/faust/serializers/codecs.py", line 215, in loads
    reversed(self.nodes), s)
  File "/Users/vineet/faust/faust/serializers/codecs.py", line 214, in <lambda>
    lambda s, d: cast(Codec, d)._loads(s),
  File "/Users/vineet/faust/faust/serializers/codecs.py", line 241, in _loads
    return _json.loads(want_str(s))
  File "/Users/vineet/faust/faust/utils/json.py", line 97, in loads
    return json.loads(s, **kwargs)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/json/__init__.py", line 354, in loads
    return _default_decoder.decode(s)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/json/decoder.py", line 339, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/json/decoder.py", line 357, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None

Versions

  • Python version - 3.6.1
  • Faust version - master
  • Operating system
  • Kafka version - 0.10.2.1
  • RocksDB version (if applicable)

Hang on stream wait empty

Our apps are currently using stream_wait_empty=False, so for the release we should either fix the problem or make it the default.

Recursion maximum depth reached error resulting in app crash

In trebuchet I found the following stack trace which had made the app crash:

[2018-07-06 22:18:51,007: ERROR]: [^-App]: Crashed reason=RecursionError('maximum recursion depth exceeded in comparison',)
Traceback (most recent call last):
  File "/home/robinhood/env/lib/python3.6/site-packages/faust/app/base.py", line 845, in _on_partitions_revoked
    await self.consumer.wait_empty()
  File "/home/robinhood/env/lib/python3.6/site-packages/mode/services.py", line 417, in _and_transition
    return await fun(self, *args, **kwargs)
  File "/home/robinhood/env/lib/python3.6/site-packages/faust/transport/consumer.py", line 305, in wait_empty
    await self.commit()
  File "/home/robinhood/env/lib/python3.6/site-packages/faust/transport/consumer.py", line 345, in commit
    return await self.force_commit(topics)
  File "/home/robinhood/env/lib/python3.6/site-packages/mode/services.py", line 417, in _and_transition
    return await fun(self, *args, **kwargs)
  File "/home/robinhood/env/lib/python3.6/site-packages/faust/transport/consumer.py", line 374, in force_commit
    did_commit = await self._commit_tps(commit_tps)
  File "/home/robinhood/env/lib/python3.6/site-packages/faust/transport/consumer.py", line 384, in _commit_tps
    await self._handle_attached(commit_offsets)
  File "/home/robinhood/env/lib/python3.6/site-packages/faust/transport/consumer.py", line 408, in _handle_attached
    pending = await attachments.publish_for_tp_offset(tp, offset)
  File "/home/robinhood/env/lib/python3.6/site-packages/faust/app/_attached.py", line 140, in publish_for_tp_offset
    for fut in attached
  File "/home/robinhood/env/lib/python3.6/site-packages/faust/app/_attached.py", line 140, in <listcomp>
    for fut in attached
  File "/home/robinhood/env/lib/python3.6/site-packages/faust/topics.py", line 303, in publish_message
    topic, key, value, partition=message.partition)
  File "/home/robinhood/env/lib/python3.6/site-packages/faust/transport/drivers/aiokafka.py", line 668, in send
    topic, value, key=key, partition=partition))
  File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/producer/producer.py", line 311, in send
    timestamp_ms=timestamp_ms)
  File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/producer/message_accumulator.py", line 257, in add_message
    tp, key, value, timeout, timestamp_ms))
  File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/producer/message_accumulator.py", line 257, in add_message
    tp, key, value, timeout, timestamp_ms))
  File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/producer/message_accumulator.py", line 257, in add_message
    tp, key, value, timeout, timestamp_ms))
  [Previous line repeated 934 more times]
  File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/producer/message_accumulator.py", line 252, in add_message
    yield from batch.wait_drain(timeout)
  File "/home/robinhood/python-3.6.3/lib/python3.6/asyncio/tasks.py", line 301, in wait
    if futures.isfuture(fs) or coroutines.iscoroutine(fs):
  File "/home/robinhood/python-3.6.3/lib/python3.6/asyncio/coroutines.py", line 270, in iscoroutine
    return isinstance(obj, _COROUTINE_TYPES)
  File "/home/robinhood/env/lib/python3.6/abc.py", line 188, in __instancecheck__
    subclass in cls._abc_negative_cache):
  File "/home/robinhood/env/lib/python3.6/_weakrefset.py", line 75, in __contains__
    return wr in self.data
RecursionError: maximum recursion depth exceeded in comparison

Standbys should write to rocksdb and persist offsets

Seems like upon reading standbys we do write to rocksdb but do not persist offsets. This results in us anyway going through the entire thing when promoted to active. We should persist offsets anytime we are writing standbys to rocksdb.

High commit latency + slow web server on faust>=0.9.41

Steps to reproduce

Checkout the trebuchet-local branch in trebuchet. After setting up trebuchet, run it with trebuchet worker -l info. Make sure the faust version is >= 0.9.41.

Then, feed data into Kafka with the following command:
kafka-console-producer --broker-list localhost:9092 --topic adjust_data < tests/sample/adjust_data.
Navigate browser to localhost:6066.

Expected behavior

In faust verions 0.9.39 and below, commit latencies stabilize at around 3-4 seconds when data is being consumed. The web server loads very quickly.

Actual behavior

When faust is consuming data, commit latency spikes to ~30 to 60, and the web server (localhost:6066) takes a long time to load.

Error upon start up on faust==1.0.12

I get the following error when running faust 1.0.12:

Traceback (most recent call last):
  File "/home/robinhood/env/lib/python3.6/site-packages/mode/services.py", line 685, in _execute_task
    await task
  File "/home/robinhood/env/lib/python3.6/site-packages/faust/transport/consumer.py", line 292, in _commit_handler
    await self.commit()
  File "/home/robinhood/env/lib/python3.6/site-packages/faust/transport/consumer.py", line 331, in commit
    return await self.force_commit(topics)
  File "/home/robinhood/env/lib/python3.6/site-packages/mode/services.py", line 417, in _and_transition
    return await fun(self, *args, **kwargs)
  File "/home/robinhood/env/lib/python3.6/site-packages/faust/transport/consumer.py", line 345, in force_commit
    did_commit = await self._commit_tps(commit_tps)
  File "/home/robinhood/env/lib/python3.6/site-packages/faust/transport/consumer.py", line 355, in _commit_tps
    await self._handle_attached(commit_offsets)
  File "/home/robinhood/env/lib/python3.6/site-packages/faust/transport/consumer.py", line 389, in _handle_attached
    await producer.wait_many(pending)
  File "/home/robinhood/env/lib/python3.6/site-packages/mode/services.py", line 600, in wait_many
    return await self._wait_one(coro, timeout=timeout)
  File "/home/robinhood/env/lib/python3.6/site-packages/mode/services.py", line 625, in _wait_one
    f.result()  # propagate exceptions
  File "/home/robinhood/python-3.6.3/lib/python3.6/asyncio/tasks.py", line 304, in wait
    raise ValueError('Set of coroutines/Futures is empty.')
ValueError: Set of coroutines/Futures is empty.

The app crashes after this error.

Application keeps crashing causing the group to continuously try and rebalance

Found the following error in the logs:

TypeError: __init__() got an unexpected keyword argument 'resolve'
[2018-06-05 16:33:01,436: ERROR]: ["/home/robinhood/python-3.6.3/lib/python3.6/asyncio/base_events.py:1411: RuntimeWarning: coroutine 'Fetcher._fetcher' was never awaited\n  handle = self._ready.popleft()"]
[2018-06-05 16:33:01,427: WARNING]: /home/robinhood/python-3.6.3/lib/python3.6/asyncio/base_events.py:1411: RuntimeWarning: coroutine 'Fetcher._fetcher' was never awaited
  handle = self._ready.popleft()

The app (trebuchet) was continuously crashing and trying to rebalance resulting in never actually forming a proper group.

Assertion error in aiokafka sender routine causing app to stall

The app seems to stall after the following errors. We should just make the app crash instead of stalling so that it is restarted by supervisor:

[2018-05-29 14:08:03,304: ERROR]: Future exception was never retrieved
future: <Future finished exception=NodeNotReadyError('Attempt to send a request to node which is not ready (node id 42).',)>
Traceback (most recent call last):
  File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/producer/producer.py", line 419, in _send_produce_req
    response = yield from self.client.send(node_id, request)
  File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/client.py", line 406, in send
    " which is not ready (node id {}).".format(node_id))
kafka.errors.NodeNotReadyError: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 42).
[2018-05-29 14:08:03,315: ERROR]: Unable connect to node with id 42: [Errno 111] Connect call failed ('10.1.30.204', 9092)
[2018-05-29 14:08:03,315: WARNING]: Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 42).
[2018-05-29 14:08:03,715: ERROR]: Unable connect to node with id 42: [Errno 111] Connect call failed ('10.1.30.204', 9092)
[2018-05-29 14:08:03,716: WARNING]: Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 42).
[2018-05-29 14:08:04,117: ERROR]: Unable connect to node with id 42: [Errno 111] Connect call failed ('10.1.30.204', 9092)
[2018-05-29 14:08:04,117: WARNING]: Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 42).
[2018-05-29 14:08:04,517: ERROR]: Unable connect to node with id 42: [Errno 111] Connect call failed ('10.1.30.204', 9092)
[2018-05-29 14:08:04,518: WARNING]: Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 42).
[2018-05-29 14:08:04,917: ERROR]: Unexpected error in sender routine
Traceback (most recent call last):
  File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/producer/producer.py", line 374, in _sender_routine
    task.result()
  File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/producer/producer.py", line 419, in _send_produce_req
    response = yield from self.client.send(node_id, request)
  File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/client.py", line 403, in send
    if not (yield from self.ready(node_id, group=group)):
  File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/client.py", line 381, in ready
    conn = yield from self._get_conn(node_id, group=group)
  File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/client.py", line 354, in _get_conn
    assert broker, 'Broker id %s not in current metadata' % node_id
AssertionError: Broker id 42 not in current metadata

Should not commit offsets upon crash

I added a random KafkaTimeoutError in App._commit_attached to simulate a KafkaTimeoutError while producing. I catched this error and triggered a crash: self.crash(exc). This should ideally crash the application. Looks like upon crash we end up committing offsets which can be seen in the log trace below. This is bad because a crash means something bad happened and we should not commit offsets.

[2018-03-03 17:51:07,840: INFO]: [^--TableManager]: Restore complete!
[2018-03-03 17:51:07,841: INFO]: [^--Consumer]: Waiting for lock to resume partitions
[2018-03-03 17:51:07,842: INFO]: [^--Consumer]: Acquired lock to resume partitions
[2018-03-03 17:51:07,842: INFO]: [^--Consumer]: Released resume partitions lock
[2018-03-03 17:51:07,842: INFO]: [^--TableManager]: Attempting to start standbys
[2018-03-03 17:51:07,842: INFO]: [^--TableManager]: New assignments handled
[2018-03-03 17:51:07,871: INFO]: [^--Table: user_to_total]: Starting...
[2018-03-03 17:51:07,871: INFO]: [^---Store: user_to_total]: Starting...
[2018-03-03 17:51:07,871: INFO]: [^--Table: country_to_total]: Starting...
[2018-03-03 17:51:07,871: INFO]: [^---Store: country_to_total]: Starting...
[2018-03-03 17:51:07,913: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 744: k=b'"country_4"' v=b'{"user": "user_434", "country": "country_4", "amount": 1832.8454878058342, "date": "2018-03-04T01:41:26.137348Z", "__faust": {"ns": "examples.withdrawals.Withdrawal"}}'
[2018-03-03 17:51:07,914: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 1428: k=b'"country_1"' v=b'{"user": "user_246", "country": "country_1", "amount": 16163.629349934505, "date": "2018-03-04T01:39:52.217088Z", "__faust": {"ns": "examples.withdrawals.Withdrawal"}}'
[2018-03-03 17:51:07,914: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 747: k=b'"country_2"' v=b'{"user": "user_114", "country": "country_2", "amount": 7532.398390989598, "date": "2018-03-04T01:41:42.050016Z", "__faust": {"ns": "examples.withdrawals.Withdrawal"}}'
[2018-03-03 17:51:07,914: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 32392: k=b'"country_0"' v=b'{"user": "user_239", "country": "country_0", "amount": 24807.034890744173, "date": "2018-03-04T01:40:31.821356Z", "__faust": {"ns": "examples.withdrawals.Withdrawal"}}'
[2018-03-03 17:51:07,914: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 8506: k=b'"user_220"' v=b'{"user": "user_220", "country": "country_1", "amount": 11502.542493932206, "date": "2018-03-04T01:41:49.087564Z", "__faust": {"ns": "examples.withdrawals.Withdrawal"}}'
[2018-03-03 17:51:07,914: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 8327: k=b'"user_243"' v=b'{"user": "user_243", "country": "country_0", "amount": 8941.192697487695, "date": "2018-03-04T01:41:51.035743Z", "__faust": {"ns": "examples.withdrawals.Withdrawal"}}'
[2018-03-03 17:51:07,914: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 8600: k=b'"user_410"' v=b'{"user": "user_410", "country": "country_0", "amount": 22052.862798686932, "date": "2018-03-04T01:40:21.664204Z", "__faust": {"ns": "examples.withdrawals.Withdrawal"}}'
[2018-03-03 17:51:07,914: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 8973: k=b'"user_396"' v=b'{"user": "user_396", "country": "country_2", "amount": 18885.378500303657, "date": "2018-03-04T01:40:22.124953Z", "__faust": {"ns": "examples.withdrawals.Withdrawal"}}'
[2018-03-03 17:51:07,919: INFO]: [^Worker]: Ready
[2018-03-03 17:51:08,770: INFO]: [^--Consumer]: COMMITTING OFFSETS:
┌Commit Offsets──────────────────────────────────────────────────────────────────────────────┬────────┬──────────┐
│ TP                                                                                         │ Offset │ Metadata │
├────────────────────────────────────────────────────────────────────────────────────────────┼────────┼──────────┤
│ TopicPartition(topic='withdrawals2-f-simple3-Withdrawal.country-repartition', partition=3) │ 751    │          │
│ TopicPartition(topic='withdrawals2-f-simple3-Withdrawal.country-repartition', partition=2) │ 1466   │          │
│ TopicPartition(topic='withdrawals2-f-simple3-Withdrawal.country-repartition', partition=1) │ 754    │          │
│ TopicPartition(topic='withdrawals2-f-simple3-Withdrawal.country-repartition', partition=0) │ 32546  │          │
│ TopicPartition(topic='withdrawals2', partition=3)                                          │ 8712   │          │
│ TopicPartition(topic='withdrawals2', partition=1)                                          │ 8532   │          │
│ TopicPartition(topic='withdrawals2', partition=2)                                          │ 8805   │          │
│ TopicPartition(topic='withdrawals2', partition=0)                                          │ 9178   │          │
└────────────────────────────────────────────────────────────────────────────────────────────┴────────┴──────────┘
[2018-03-03 17:51:11,787: ERROR]: [^-App]: Crashed reason=KafkaTimeoutError()
Traceback (most recent call last):
  File "/Users/vineet/faust/faust/app.py", line 1047, in _commit_attached
    raise KafkaTimeoutError
kafka.errors.KafkaTimeoutError: KafkaTimeoutError
[2018-03-03 17:51:11,806: INFO]: [^Worker]: Stopping...
[2018-03-03 17:51:11,806: INFO]: [^-App]: Stopping...
[2018-03-03 17:51:11,806: INFO]: [^--Fetcher]: Stopping...
[2018-03-03 17:51:11,807: INFO]: [^--Consumer]: Consumer shutting down for user cancel.
[2018-03-03 17:51:11,817: INFO]: [^--Fetcher]: -Stopped!
[2018-03-03 17:51:11,817: INFO]: [^--TableManager]: Stopping...
[2018-03-03 17:51:11,817: INFO]: [^--TableManager]: Aborting ongoing recovery to start over
[2018-03-03 17:51:11,817: INFO]: [^--Table: user_to_total]: Stopping...
[2018-03-03 17:51:11,818: INFO]: [^---Store: user_to_total]: Stopping...
[2018-03-03 17:51:11,818: INFO]: [^---Store: user_to_total]: -Stopped!
[2018-03-03 17:51:11,819: INFO]: [^--Table: user_to_total]: -Stopped!
[2018-03-03 17:51:11,819: INFO]: [^--Table: country_to_total]: Stopping...
[2018-03-03 17:51:11,819: INFO]: [^---Store: country_to_total]: Stopping...
[2018-03-03 17:51:11,819: INFO]: [^---Store: country_to_total]: -Stopped!
[2018-03-03 17:51:11,828: INFO]: [^--Table: country_to_total]: -Stopped!
[2018-03-03 17:51:11,831: INFO]: [^--TableManager]: -Stopped!
[2018-03-03 17:51:11,832: INFO]: [^--TopicConductor]: Stopping...
[2018-03-03 17:51:11,837: INFO]: [^--TopicConductor]: -Stopped!
[2018-03-03 17:51:11,837: INFO]: [^--Agent]: Stopping...
[2018-03-03 17:51:11,837: INFO]: [^---OneForOneSupervisor]: Stopping...
[2018-03-03 17:51:11,837: INFO]: [^---Agent*: examp[.]track_country_withdrawal]: Stopping...
[2018-03-03 17:51:11,840: INFO]: [^---Agent*: examp[.]track_country_withdrawal]: -Stopped!
[2018-03-03 17:51:11,841: INFO]: [^---OneForOneSupervisor]: -Stopped!
[2018-03-03 17:51:11,841: INFO]: [^--Agent]: -Stopped!
[2018-03-03 17:51:11,841: INFO]: [^--Agent]: Stopping...
[2018-03-03 17:51:11,841: INFO]: [^---OneForOneSupervisor]: Stopping...
[2018-03-03 17:51:11,842: INFO]: [^---Agent*: examples[.]track_user_withdrawal]: Stopping...
[2018-03-03 17:51:11,864: INFO]: [^---Agent*: examples[.]track_user_withdrawal]: -Stopped!
[2018-03-03 17:51:11,867: INFO]: [^---OneForOneSupervisor]: -Stopped!
[2018-03-03 17:51:11,867: INFO]: [^--Agent]: -Stopped!
[2018-03-03 17:51:11,867: INFO]: [^--ReplyConsumer]: Stopping...
[2018-03-03 17:51:11,868: INFO]: [^--ReplyConsumer]: -Stopped!
[2018-03-03 17:51:11,868: INFO]: [^--LeaderAssignor]: Stopping...
[2018-03-03 17:51:11,868: INFO]: [^--LeaderAssignor]: -Stopped!
[2018-03-03 17:51:11,868: INFO]: [^--Consumer]: Stopping...
[2018-03-03 17:51:12,082: INFO]: [^--Consumer]: COMMITTING OFFSETS:
┌Commit Offsets──────────────────────────────────────────────────────────────────────────────┬────────┬──────────┐
│ TP                                                                                         │ Offset │ Metadata │
├────────────────────────────────────────────────────────────────────────────────────────────┼────────┼──────────┤
│ TopicPartition(topic='withdrawals2-f-simple3-Withdrawal.country-repartition', partition=3) │ 767    │          │
│ TopicPartition(topic='withdrawals2-f-simple3-Withdrawal.country-repartition', partition=2) │ 1506   │          │
│ TopicPartition(topic='withdrawals2-f-simple3-Withdrawal.country-repartition', partition=1) │ 776    │          │
│ TopicPartition(topic='withdrawals2-f-simple3-Withdrawal.country-repartition', partition=0) │ 33289  │          │
│ TopicPartition(topic='withdrawals2', partition=3)                                          │ 9071   │          │
│ TopicPartition(topic='withdrawals2', partition=1)                                          │ 8833   │          │
│ TopicPartition(topic='withdrawals2', partition=2)                                          │ 9274   │          │
│ TopicPartition(topic='withdrawals2', partition=0)                                          │ 9652   │          │
└────────────────────────────────────────────────────────────────────────────────────────────┴────────┴──────────┘
[2018-03-03 17:51:12,084: INFO]: LeaveGroup request succeeded
[2018-03-03 17:51:12,085: INFO]: [^--Consumer]: -Stopped!
[2018-03-03 17:51:12,085: INFO]: [^--Producer]: Stopping...
[2018-03-03 17:51:12,085: INFO]: [^--Producer]: -Stopped!
[2018-03-03 17:51:12,085: INFO]: [^--MonitorService]: Stopping...
[2018-03-03 17:51:12,086: INFO]: [^--MonitorService]: -Stopped!
[2018-03-03 17:51:12,086: INFO]: [^-App]: -Stopped!
[2018-03-03 17:51:12,086: INFO]: [^-Website]: Stopping...
[2018-03-03 17:51:12,086: INFO]: [^--Web]: Stopping...
[2018-03-03 17:51:12,086: INFO]: [^---ServerThread]: Stopping...
[2018-03-03 17:51:12,087: INFO]: [^--Web]: Closing server
[2018-03-03 17:51:12,087: INFO]: [^--Web]: Waiting for server to close handle
[2018-03-03 17:51:12,087: INFO]: [^--Web]: Shutting down web application
[2018-03-03 17:51:12,087: INFO]: [^--Web]: Waiting for handler to shut down
[2018-03-03 17:51:12,087: INFO]: [^--Web]: Cleanup
[2018-03-03 17:51:12,087: INFO]: [^---ServerThread]: -Stopped!
[2018-03-03 17:51:12,087: INFO]: [^--Web]: -Stopped!
[2018-03-03 17:51:12,088: INFO]: [^-Website]: -Stopped!
[2018-03-03 17:51:12,088: INFO]: [^Worker]: -Stopped!
[2018-03-03 17:51:12,088: INFO]: [^Worker]: Gathering service tasks...
[2018-03-03 17:51:12,088: INFO]: [^Worker]: Gathering all futures...
[2018-03-03 17:51:17,227: INFO]: [^Worker]: Closing event loop
[2018-03-03 17:51:17,228: CRITICAL]: [^Worker]: We experienced a crash! Reraising original exception...
Traceback (most recent call last):
  File "/Users/vineet/.virtualenvs/faust/bin/faust", line 11, in <module>
    load_entry_point('faust', 'console_scripts', 'faust')()
  File "/Users/vineet/.virtualenvs/faust/lib/python3.6/site-packages/click-6.7-py3.6.egg/click/core.py", line 722, in __call__
    return self.main(*args, **kwargs)
  File "/Users/vineet/.virtualenvs/faust/lib/python3.6/site-packages/click-6.7-py3.6.egg/click/core.py", line 697, in main
    rv = self.invoke(ctx)
  File "/Users/vineet/.virtualenvs/faust/lib/python3.6/site-packages/click-6.7-py3.6.egg/click/core.py", line 1066, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/Users/vineet/.virtualenvs/faust/lib/python3.6/site-packages/click-6.7-py3.6.egg/click/core.py", line 895, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/Users/vineet/.virtualenvs/faust/lib/python3.6/site-packages/click-6.7-py3.6.egg/click/core.py", line 535, in invoke
    return callback(*args, **kwargs)
  File "/Users/vineet/.virtualenvs/faust/lib/python3.6/site-packages/click-6.7-py3.6.egg/click/decorators.py", line 17, in new_func
    return f(get_current_context(), *args, **kwargs)
  File "/Users/vineet/faust/faust/cli/base.py", line 317, in _inner
    return cmd()
  File "/Users/vineet/faust/faust/cli/worker.py", line 106, in __call__
    **{**self.kwargs, **kwargs})
  File "/Users/vineet/faust/faust/cli/worker.py", line 128, in start_worker
    return worker.execute_from_commandline()
  File "/Users/vineet/.virtualenvs/faust/lib/python3.6/site-packages/mode/worker.py", line 155, in execute_from_commandline
    self.stop_and_shutdown()
  File "/Users/vineet/.virtualenvs/faust/lib/python3.6/site-packages/mode/worker.py", line 160, in stop_and_shutdown
    self._shutdown_loop()
  File "/Users/vineet/.virtualenvs/faust/lib/python3.6/site-packages/mode/worker.py", line 188, in _shutdown_loop
    raise self._crash_reason from self._crash_reason
  File "/Users/vineet/faust/faust/app.py", line 1047, in _commit_attached
    raise KafkaTimeoutError
kafka.errors.KafkaTimeoutError: KafkaTimeoutError

Hard shutdown app on RebalanceError

Upon receiving a rebalance error, we should hard shutdown the application.

Currently, the application tries to recover which results in a zombie assignments.

The first message read after consumer group rebalance is potentially dropped

Checklist

  • I have included information about relevant versions
    We saw this issue on Faust 1.0.19.

  • I have verified that the issue persists when using the master branch of Faust.
    We have only seen this issue once and haven't reproduced it. But after talking to Vineet, he seems to think that it is possible that with the way that faust uses consumer offsets we may be dropping the first message that we read after a consumer rebalance.

Steps to reproduce

We saw this issue in the combiner which should hold onto messages it receives from a stream for long periods of time without committing it. Unfortunately, we have seen this issue only once but will provide an update if we see it again.

Expected behavior

This agent receives a special type of event which should trigger it to stop consuming and hold onto the event for a long period of time. This event should not be committed so if the consumer group is rebalanced or the agent is restarted, it should receive the message again and continue waiting.

Actual behavior

We saw this special event being dropped with a message "DROPPED MESSAGE ROFF: n". We believe this may be caused by how Faust manages and uses consumer offsets.

Full traceback

2018-06-29 21:04:57,722: INFO]: [^--TableManager]: Stopped restoring
[2018-06-29 21:04:57,723: INFO]: [^--TableManager]: Restore complete!
[2018-06-29 21:04:57,738: INFO]: [^--TableManager]: Attempting to start standbys
[2018-06-29 21:04:57,738: INFO]: [^--TableManager]: New assignments handled
[2018-06-29 21:04:57,739: INFO]: [^--Fetcher]: Starting...
[2018-06-29 21:04:57,739: INFO]: [^--TableManager]: Worker ready
[2018-06-29 21:04:57,740: INFO]: [^Worker]: Ready
[2018-06-29 21:04:57,741: INFO]: No forwarders are waiting. Going to sleep.
[2018-06-29 21:04:57,812: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 2: k=None v=b'{"details": {"date": "2018-06-29"}, "id": "d3a8acf5-eb24-461
0-abc1-ce0b6fff8254", "type": "end_of_day"}'
[2018-06-29 21:04:57,838: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 302: k=None v=b'{"order_id": 168, "id": "3e66bc59-49d3-4388-b9f9-81476f7f6
9bb", "key_field": "0", "batch_id": null, "created_at": "2018-06-30T00:02:59.192929+00:00", "updated_at": "2018-06-30T00:02:59.192943+00:00", "
type": "end_of_day", "sent": false, "account_id": null, "details": {"date": "2018-06-29"}, "meta": null}'
[2018-06-29 21:04:57,838: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 29: k=None v=b'{"type": "end_of_day", "created_at": "2018-06-30T00:03:06.8
22620+00:00", "updated_at": "2018-06-30T00:03:06.822649+00:00", "account": null, "batch_id": null, "id": "5817539f-2f34-46d4-b89b-be7be15cfdba"
, "key_field": "0", "order_id": 42, "trigger_id": null, "sent": false, "details": {"date": "2018-06-29"}}'
[2018-06-29 21:04:57,840: INFO]: Got funding event on topic cashier-transfers_fundingevent
[2018-06-29 21:04:57,841: INFO]: Got funding event on topic cashier-transfers_fundingevent
[2018-06-29 21:04:57,873: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 4069: k=None v=b'{"created_at": "2018-06-30T00:02:59.160290+00:00", "updat
ed_at": "2018-06-30T00:02:59.160321+00:00", "key_field": "8088457b-fdcb-44ed-a829-e59a12100eb0", "batch_id": null, "order_id": 65146, "type": "
end_of_day", "id": "001d8063-fb9d-4325-87c5-87ff11d57d09", "sent": false, "details": {"date": "2018-06-29"}}'
[2018-06-29 21:04:57,875: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 908: k=None v=b'{"type":"end_of_day","details":{"date":"2018-06-29"}}'
[2018-06-29 21:04:57,875: INFO]: Got end_of_day event on topic mainst_pinnacle_order_updates
[2018-06-29 21:04:57,876: INFO]: Received end of day event on topic mainst_pinnacle_order_updates
[2018-06-29 21:04:57,970: INFO]: Waiting for the day cycle to begin.
[2018-06-29 21:04:57,982: INFO]: Fetch offset 0 is out of range for partition TopicPartition(topic='nummus_transfers', partition=18), resetting offset
[2018-06-29 21:04:57,999: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 18: k=None v=b'{"type": "end_of_day", "id": "19c13f26-824b-4b37-8d9e-fd6775e405f1", "details": {"date": "2018-06-29"}}'
[2018-06-29 21:04:58,014: INFO]: [^--Consumer]: COMMITTING OFFSETS:
+Commit Offsets-------------------------------------------------------+--------+----------+
| TP | Offset | Metadata |
+---------------------------------------------------------------------+--------+----------+
| TopicPartition(topic='cashier-transfers_fundingevent', partition=0) | 304 | |
+---------------------------------------------------------------------+--------+----------+
[2018-06-29 21:05:57,973: INFO]: Waiting for the day cycle to begin.
[2018-06-29 21:06:57,976: INFO]: Waiting for the day cycle to begin.
[2018-06-29 21:07:57,978: INFO]: Waiting for the day cycle to begin.
[2018-06-29 21:08:57,981: INFO]: Waiting for the day cycle to begin.
[2018-06-29 21:09:35,888: INFO]: Got funding event on topic cashier-transfers_fundingevent
[2018-06-29 21:09:38,123: INFO]: [^--Consumer]: COMMITTING OFFSETS:
+Commit Offsets-------------------------------------------------------+--------+----------+
| TP | Offset | Metadata |
+---------------------------------------------------------------------+--------+----------+
| TopicPartition(topic='cashier-transfers_fundingevent', partition=0) | 305 | |
+---------------------------------------------------------------------+--------+----------+
[2018-06-29 21:09:57,983: INFO]: Waiting for the day cycle to begin.
[2018-06-29 21:10:57,987: INFO]: Waiting for the day cycle to begin.
[2018-06-29 21:11:57,990: INFO]: Waiting for the day cycle to begin.

Versions

  • Python version
  • Faust version: 1.0.19
  • Operating system:
  • Kafka version
  • RocksDB version (if applicable)

Windowed table expires memory leak

Currently we do not add window ends for keys into the timestamps heap upon changelog recovery. Hence, keys added upon recovery (and never updated) would never get garbage collected by the faust app.

Note: this isn't an issue with the actual kafka topic as it uses kafka's retention.

App cannot recover from NodeNotReadyError

The app is stuck with the following error logging:

9092)
[2018-06-19 10:18:54,652: ERROR]: Failed fetch messages from 102: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 102).
[2018-06-19 10:18:54,654: ERROR]: Unable connect to node with id 102: [Errno 111] Connect call failed ('10.1.65.249', 9092)
[2018-06-19 10:18:54,654: ERROR]: Failed fetch messages from 102: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 102).
[2018-06-19 10:18:54,656: ERROR]: Unable connect to node with id 102: [Errno 111] Connect call failed ('10.1.65.249', 9092)
[2018-06-19 10:18:54,656: ERROR]: Failed fetch messages from 102: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 102).
[2018-06-19 10:18:54,658: ERROR]: Unable connect to node with id 102: [Errno 111] Connect call failed ('10.1.65.249', 9092)
[2018-06-19 10:18:54,658: ERROR]: Failed fetch messages from 102: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 102).

These errors are potentially from aiokafka/fetcher. I see a bunch of these errors.

This is with faust 1.0.14

Table Partition Garbage Collection

Currently we do not garbage collect stale partition (in-memory) once a partition for a table is unassigned from a client. This may cause memory leaks in the event that partitions rotate a lot around different clients.

A proposed approach here is to divide a table internally by partition (implementing the table interface) multiple underlying tables. This might make the O(1) operations of the dictionary O(p) (p = number of partitions) but should make garbage collection fairly quick and simple.

The other approach could be to maintain a partition to key index but that would have a space complexity of O(n) (n = number of keys).

Create agent topics appropriately

Agent topics should have the following properties:

  • Should use the partition count from the app configuration (bug) - We should just create the topic using create_topic.
  • Should use the app id (with version) in the name (enhancement)

Cannot update tables in asyncio.wait

I get the following error if I try to update faust Table from an agent if I use await asyncio.gather(fut) or await asyncio.wait([fut]) when the fut updates the table.

Here is the exception traceback:

[2018-03-04 18:39:34,215: ERROR]: Task exception was never retrieved
future: <Task finished coro=<ActivitiesStore.add() done, defined at /Users/vineet/scroll/scroll/store.py:75> exception=RuntimeError('Cannot modify table outside of agent/stream.',)>
Traceback (most recent call last):
  File "/Users/vineet/scroll/scroll/store.py", line 76, in add
    self._add_activity(activity)
  File "/Users/vineet/scroll/scroll/store.py", line 85, in _add_activity
    self.activities[activity.key] = activity
  File "/Users/vineet/.virtualenvs/scroll/lib/python3.6/site-packages/faust/utils/collections.py", line 296, in __setitem__
    self.on_key_set(key, value)
  File "/Users/vineet/.virtualenvs/scroll/lib/python3.6/site-packages/faust/tables/table.py", line 58, in on_key_set
    self._send_changelog(key, value)
  File "/Users/vineet/.virtualenvs/scroll/lib/python3.6/site-packages/faust/tables/base.py", line 161, in _send_changelog
    raise RuntimeError('Cannot modify table outside of agent/stream.')
RuntimeError: Cannot modify table outside of agent/stream.

Garbage collect old windowed table entries by partition

Currently we garbage collect windowed tables entries based on the latest timestamp we have seen across all partitions. This should be changed to garbage collect entries for a partition based on the latest timestamp seen for that partition. This is useful in the case where a partition may be lagging behind others.

RebalanceInProgressError

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

Ran an application that has a relatively large (larger than the simple example) number of messages being transferred. Tried to rebalance the cluster of workers by adding another worker.

Expected behavior

Worker addition/removal should be graceful.

Actual behavior

Getting a RebalanceInProgress Exception from Kafka upon adding/removing workers.

Full traceback

[2017-08-08 22:34:18,945: ERROR] OffsetCommit failed for group referrals-device-check due to group error ([Error 27] RebalanceInProgressError: referrals-device-check), will rejoin
[2017-08-08 22:34:18,945: ERROR] User provided subscription listener <faust.transport.aiokafka.ConsumerRebalanceListener object at 0x1058c5ac8> for group referrals-device-check failed on_partitions_revoked
Traceback (most recent call last):
  File "/Users/vineet/.virtualenvs/faust/lib/python3.6/site-packages/aiokafka/group_coordinator.py", line 308, in _on_join_prepare
    yield from res
  File "/Users/vineet/faust/faust/transport/aiokafka.py", line 77, in on_partitions_revoked
    cast(Iterable[TopicPartition], revoked))
  File "/Users/vineet/faust/faust/transport/base.py", line 146, in on_partitions_revoked
    await self._on_partitions_revoked(revoked)
  File "/Users/vineet/faust/faust/app.py", line 647, in on_partitions_revoked
    await self.consumer.wait_empty()
  File "/Users/vineet/faust/faust/transport/base.py", line 180, in wait_empty
    await self.commit()
  File "/Users/vineet/faust/faust/transport/base.py", line 221, in commit
    await self._do_commit(tp, offset, meta='')
  File "/Users/vineet/faust/faust/transport/base.py", line 263, in _do_commit
    await self._commit({tp: self._new_offsetandmetadata(offset, meta)})
  File "/Users/vineet/faust/faust/transport/aiokafka.py", line 210, in _commit
    await self._consumer.commit(offsets)
  File "/Users/vineet/.virtualenvs/faust/lib/python3.6/site-packages/aiokafka/consumer.py", line 397, in commit
    yield from self._coordinator.commit_offsets(offsets)
  File "/Users/vineet/.virtualenvs/faust/lib/python3.6/site-packages/aiokafka/group_coordinator.py", line 583, in commit_offsets
    raise error
kafka.errors.RebalanceInProgressError: [Error 27] RebalanceInProgressError: referrals-device-check

Versions

  • Python version - 3.6.1
  • Faust version - 1.0.0 (master)
  • Operating system
  • Kafka version - 0.10.2.1

Sentry + Faust not playing well together

On the latest version of Faust, I get the following error upon enabling Sentry:

```[2018-06-05 16:31:39,400: ERROR]: Sentry responded with an error: __init__() got an unexpected keyword argument 'resolve' (url: https://sentry-internal.robinhood.com/api/38/store/)
Traceback (most recent call last):
  File "/home/robinhood/env/lib/python3.6/site-packages/raven/base.py", line 722, in send_remote
    transport = self.remote.get_transport()
  File "/home/robinhood/env/lib/python3.6/site-packages/raven/conf/remote.py", line 71, in get_transport
    self._transport = self._transport_cls(**self.options)
  File "/home/robinhood/env/lib/python3.6/site-packages/raven_aiohttp.py", line 174, in __init__
    super().__init__(*args, **kwargs)
  File "/home/robinhood/env/lib/python3.6/site-packages/raven_aiohttp.py", line 57, in __init__
    self._client_session = self._client_session_factory()
  File "/home/robinhood/env/lib/python3.6/site-packages/raven_aiohttp.py", line 77, in _client_session_factory
    loop=self._loop)
TypeError: __init__() got an unexpected keyword argument 'resolve'```

I am using the following versions:

raven==6.6.0
raven-aiohttp==0.6.0
aiohttp==3.1.3

Changelog should check earliest offset

Since we have tables expiring records in changelog according to log compaction as well as time based expiry, we should also max each offset with earliest available offset

Application unable to recover from Kafka outage/broker failure

This is reproducible by running the simple example locally and taking down a broker and bringing it back.

Looks like the consumer is able to come back up in most cases (except when there is an OffsetCommit error). However the producer stalls. We don't produce any more messages at this point.

These are some of the alternatives in mind:

  • Killing the application when the consumer/producer gets derped
    • For starters we can look at the message sent future for exceptions (these don't get handled by us)
    • See if any catchable exceptions are thrown, I did see a few getting logged
  • Restarting a new producer instance, not sure if that will help
    • In this case it is necessary we still send all the messages in the correct order? (aiokafka seems to have some issue around this)

Modify Offsets committed to next expected offset instead of current consumed offset

Steps to reproduce

Regarding the offset that faust commits to Kafka during consumption. Traditionally, most Kafka by consumers, by convention, write the offset it expects next back to the offset storage (zk or kf). Faust seems to write the last committed offset it has consumed back to offset storage. What we're afraid of here is that most tooling is written with this convention in mind in the open source world. One such tool, released by LinkedIn, (https://github.com/linkedin/burrow)[Burrow] for consumer group monitoring.
For reference from the javadoc of the Java Kafka reference consumer:
https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html

Offsets and Consumer Position
Kafka maintains a numerical offset for each record in a partition. 
This offset acts as a kind of unique identifier of a record within that partition, and also denotes the position of the consumer in the partition. 
That is, a consumer which has position 5 has consumed records with offsets 0 through 4 and will next receive the record with offset 5. 
There are actually two notions of position relevant to the user of the consumer.
The position of the consumer gives the offset of the next record that will be given out. 
It will be one larger than the highest offset the consumer has seen in that partition. 
It automatically advances every time the consumer receives data calls poll(long) and receives messages.

The committed position is the last offset that has been saved securely. 
Should the process fail and restart, this is the offset that it will recover to.
 The consumer can either automatically commit offsets periodically; 
or it can choose to control this committed position manually by calling commitSync, which will
 block until the offsets have been successfully committed or fatal error has happened during
 the commit process, or commitAsync which is non-blocking and will trigger 
OffsetCommitCallback upon either successfully committed or fatally failed.

This distinction gives the consumer control over when a record is considered consumed. It is discussed in further detail below.

Expected behavior

Offset written for each partition is the next offset it expects to read from not the offset that it has last consumed.

Actual behavior

Off by one. The current offset consumed is written not the current offset + 1, which equals the next offset the consumer expects to consume.

[Documentation] Overview: Faust vs. Celery

Although I have no experience with Kafka whatsoever, pretty sure the comparison to Celery meant to read

await add.send(value=AddOperation(2, 2))

But I could be wrong...

Make standby buffer size configurable

Currently, while reading from the changelog, we update the faust table only after reading 1000 events. While this is configurable currently, we should be able to have different configurations for changelog reader and standby reader.

Command to delete state for old app version

The app version flag makes it easy to start applications from scratch. However this results in a bunch of state in rocksdb/kafka for older app versions that stick around for a long time. We should try to fix this.

Faust consumes from different topics at different and highly variable rates

Actual Behavior

Trebuchet consumes from both the 'goku' and 'adjust_data' events. When running in production, however, the rate at which faust consumes from these topics vary wildly, sometimes even going to zero. See the graph below:

screen shot 2018-03-06 at 3 55 55 pm

What's notable about these graphs is that they are the exact inverses of each other. When one spikes, the other drops, and vice versa, so Faust is overall consuming at approximately a constant rate. Goku events sometimes even goes to zero for a while.

Expected Behavior

Faust should be consuming from both topics at a reasonable, constant rate.

Steps to reproduce

To reproduce this issue locally, set up trebuchet locally (https://github.com/robinhoodmarkets/trebuchet), and then add print("logging event") to line 14 and print("adjust event") to line 24 of trebuchet/agents.py. Also, change the replication factor in trebuchet/config.py to 1. After building and installing, start trebuchet with trebuchet worker -l info. tests/sample contains sample adjust_data and goku events. To produce sample events, run the following two commands on two separate shells at the same time:
kafka-console-producer --broker-list localhost:9092 --topic adjust_data < tests/local/adjust_data
kafka-console-producer --broker-list localhost:9092 --topic goku < tests/local/goku

This should start producing to kafka, and you will see a stream of print statements on Trebuchet. If reproduced correctly, you should see times when just adjust events (and no goku/logging events) being processed for a while, before processing both topics for a while (with a lower rate for adjust events), then repeating this cycle, similar to what the graphs above are showing.

Versions

  • Python version: 3.6.4
  • Faust version: 0.9.36
  • Operating system: macOS High Sierra
  • Kafka version: 0.11.0.1

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.