robinhood / faust Goto Github PK
View Code? Open in Web Editor NEWPython Stream Processing
License: Other
Python Stream Processing
License: Other
faust/extra
It would be nice to be able to configure a faust app to send logging into Sentry by simply adding a Sentry dsn.
A new generation may have formed in the consumer groups and we see this error:
[2018-05-09 09:01:27,183: ERROR]: [^--Consumer]: Committing raised exception: CommitFailedError('Commit cannot be completed since the group has already rebalanced and may have assigned the partitions to another member',)
Traceback (most recent call last):
File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/consumer/group_coordinator.py", line 820, in commit_offsets
loop=self._loop)
File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/consumer/group_coordinator.py", line 940, in _do_commit_offsets
raise first_error
kafka.errors.RebalanceInProgressError: [Error 27] RebalanceInProgressError: risk-faust-v15
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/robinhood/env/lib/python3.6/site-packages/faust/transport/drivers/aiokafka.py", line 332, in _commit
await self._consumer.commit(commitable)
File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/consumer/consumer.py", line 436, in commit
yield from self._coordinator.commit_offsets(assignment, offsets)
File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/consumer/group_coordinator.py", line 825, in commit_offsets
"Commit cannot be completed since the group has already "
kafka.errors.CommitFailedError: CommitFailedError: Commit cannot be completed since the group has already rebalanced and may have assigned the partitions to another member
We should die hard on this error.
Saw this issue in one of the production boxes. This is not reproducible.
[2018-05-09 13:51:45,694: INFO]: [^--Consumer]: Waiting for lock to pause partitions
[2018-05-09 13:51:45,694: INFO]: [^--Consumer]: Acquired lock to pause partitions
[2018-05-09 13:51:45,694: INFO]: [^--Consumer]: Released pause partitions lock
[2018-05-09 13:51:45,694: INFO]: [^--Fetcher]: Starting...
[2018-05-09 13:51:45,695: INFO]: [^-App]: Restarted fetcher
[2018-05-09 13:51:45,696: INFO]: [^--TableManager]: Triggered recovery in background
[2018-05-09 13:51:45,697: INFO]: [^--TableManager]: New assignments found
[2018-05-09 13:51:45,697: INFO]: [^--Consumer]: Waiting for lock to pause partitions
[2018-05-09 13:51:45,697: INFO]: [^--Consumer]: Acquired lock to pause partitions
[2018-05-09 13:51:45,697: INFO]: [^--Consumer]: Released pause partitions lock
[2018-05-09 13:51:45,698: INFO]: [^--TableManager]: Restoring state from changelog topics...
[2018-05-09 13:51:45,698: INFO]: [^--TableManager]: Waiting for restore to finish...
[2018-05-09 13:51:45,698: INFO]: [^--TableManager]: Done reading all changelogs
[2018-05-09 13:51:45,698: INFO]: [^--TableManager]: Done reading from changelog topics
[2018-05-09 13:51:45,699: INFO]: [^--TableManager]: Stopped restoring
[2018-05-09 13:51:45,699: INFO]: [^--TableManager]: Restore complete!
[2018-05-09 13:51:45,706: INFO]: [^--TableManager]: Attempting to start standbys
[2018-05-09 13:51:45,706: INFO]: [^--TableManager]: New assignments handled
[2018-05-09 13:51:45,706: INFO]: [^--Consumer]: Waiting for lock to resume partitions
[2018-05-09 13:51:45,707: INFO]: [^--Consumer]: Acquired lock to resume partitions
[2018-05-09 13:51:45,707: INFO]: [^--Consumer]: Released resume partitions lock
[2018-05-09 13:51:45,729: ERROR]: Unexpected error in fetcher routine
Traceback (most recent call last):
File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/consumer/fetcher.py", line 328, in _fetch_requests_routine
yield from task
File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/consumer/fetcher.py", line 624, in _update_fetch_positions
node_id, topic_data)
File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/consumer/fetcher.py", line 747, in _proc_offset_request
response = yield from self._client.send(node_id, request)
File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/client.py", line 403, in send
if not (yield from self.ready(node_id, group=group)):
File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/client.py", line 381, in ready
conn = yield from self._get_conn(node_id, group=group)
File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/client.py", line 369, in _get_conn
max_idle_ms=self._connections_max_idle_ms)
File "/home/robinhood/python-3.6.3/lib/python3.6/asyncio/locks.py", line 38, in __exit__
self._lock.release()
File "/home/robinhood/python-3.6.3/lib/python3.6/asyncio/locks.py", line 201, in release
raise RuntimeError('Lock is not acquired.')
RuntimeError: Lock is not acquired.
Retarting the app worked.
Changelog topics should be created as log compacted topics.
Getting the following error with varys code upon upgrading from faust 1.0.11 to faust 1.0.16:
Traceback (most recent call last):
File "examples/psql.py", line 7, in <module>
from varys.shovels.psql import PSQLShovel
File "/Users/vineet/.virtualenvs/varys/lib/python3.6/site-packages/varys/shovels/psql/__init__.py", line 1, in <module>
from .shovel import PSQLShovel, PSQLTask
File "/Users/vineet/.virtualenvs/varys/lib/python3.6/site-packages/varys/shovels/psql/shovel.py", line 94, in <module>
class PSQLTask(Task, isodates=True):
TypeError: __new__() got an unexpected keyword argument 'isodates'
Got the following error:
KafkaErroraiokafka.consumer.group_coordinator in _send_sync_group_request
Looks like the faust workers were not able to rebalance the consumer group post the above error.
The error was thrown here: https://github.com/aio-libs/aiokafka/blob/v0.4.1/aiokafka/consumer/group_coordinator.py#L1217-L1221
master
branch of Faust.N/A
Ideally, there would be the (documented) ability to connect to a remote Kafka broker via SASL authentication.
I searched through the docs & spent a while grep
ing, and I found no way to authenticate/connect to a remote Kafka broker.
N/A
I have the log, but it's too big to post here
I noticed that when new boxes came up for scroll, it when into a heartbeat failure loop which is typical of deadlocks during the rebalance. I was able to confirm that there was a deadlock in the on_partition_revoked
as can be seen here:
[2018-06-06 17:38:41,498: INFO]: [^-App]: [Flight Recorder-3] (started at Wed Jun 6 17:37:41 2018) Replaying logs...
[2018-06-06 17:38:41,499: INFO]: [^-App]: [Flight Recorder-3] (Wed Jun 6 17:37:41 2018) fetcher.stop()
[2018-06-06 17:38:41,499: INFO]: [^-App]: [Flight Recorder-3] (Wed Jun 6 17:37:41 2018) topics.on_partitions_revoked()
[2018-06-06 17:38:41,499: INFO]: [^-App]: [Flight Recorder-3] (Wed Jun 6 17:37:41 2018) tables.on_partitions_revoked()
[2018-06-06 17:38:41,499: INFO]: [^-App]: [Flight Recorder-3] (Wed Jun 6 17:37:41 2018) +TABLES: maybe_abort_ongoing_recovery
[2018-06-06 17:38:41,499: INFO]: [^-App]: [Flight Recorder-3] -End of log-
Take example apps from Spark/Kafka Streams and write it in Faust
group_by doesn't use it.
master
branch of Faust.Run the following hello_world app:
import faust
app = faust.App('hello-world', url='kafka://localhost:9092')
greetings_topic = app.topic('greetings')
@app.actor(greetings_topic)
async def print_greetings(greetings):
async for greeting in greetings:
print(greeting)
Print the greetings (strings)
Got a json.load error
[2017-08-18 15:46:03,252: ERROR] Cannot decode value for key=None (b'sadkjfh'): ValueDecodeError('Expecting value: line 1 column 1 (char 0)',)
Traceback (most recent call last):
File "/Users/vineet/faust/faust/topics.py", line 353, in deliver
v = await loads_value(value_type, message.value)
File "/Users/vineet/faust/faust/serializers/registry.py", line 94, in loads_value
str(exc)).with_traceback(sys.exc_info()[2]) from None
File "/Users/vineet/faust/faust/serializers/registry.py", line 88, in loads_value
await self._loads(serializer, value))
File "/Users/vineet/faust/faust/serializers/registry.py", line 71, in _loads
return loads(serializer, data)
File "/Users/vineet/faust/faust/serializers/codecs.py", line 321, in loads
return get_codec(codec).loads(s) if codec else s
File "/Users/vineet/faust/faust/serializers/codecs.py", line 215, in loads
reversed(self.nodes), s)
File "/Users/vineet/faust/faust/serializers/codecs.py", line 214, in <lambda>
lambda s, d: cast(Codec, d)._loads(s),
File "/Users/vineet/faust/faust/serializers/codecs.py", line 241, in _loads
return _json.loads(want_str(s))
File "/Users/vineet/faust/faust/utils/json.py", line 97, in loads
return json.loads(s, **kwargs)
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/json/__init__.py", line 354, in loads
return _default_decoder.decode(s)
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/json/decoder.py", line 339, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/json/decoder.py", line 357, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None
Our apps are currently using stream_wait_empty=False
, so for the release we should either fix the problem or make it the default.
In trebuchet I found the following stack trace which had made the app crash:
[2018-07-06 22:18:51,007: ERROR]: [^-App]: Crashed reason=RecursionError('maximum recursion depth exceeded in comparison',)
Traceback (most recent call last):
File "/home/robinhood/env/lib/python3.6/site-packages/faust/app/base.py", line 845, in _on_partitions_revoked
await self.consumer.wait_empty()
File "/home/robinhood/env/lib/python3.6/site-packages/mode/services.py", line 417, in _and_transition
return await fun(self, *args, **kwargs)
File "/home/robinhood/env/lib/python3.6/site-packages/faust/transport/consumer.py", line 305, in wait_empty
await self.commit()
File "/home/robinhood/env/lib/python3.6/site-packages/faust/transport/consumer.py", line 345, in commit
return await self.force_commit(topics)
File "/home/robinhood/env/lib/python3.6/site-packages/mode/services.py", line 417, in _and_transition
return await fun(self, *args, **kwargs)
File "/home/robinhood/env/lib/python3.6/site-packages/faust/transport/consumer.py", line 374, in force_commit
did_commit = await self._commit_tps(commit_tps)
File "/home/robinhood/env/lib/python3.6/site-packages/faust/transport/consumer.py", line 384, in _commit_tps
await self._handle_attached(commit_offsets)
File "/home/robinhood/env/lib/python3.6/site-packages/faust/transport/consumer.py", line 408, in _handle_attached
pending = await attachments.publish_for_tp_offset(tp, offset)
File "/home/robinhood/env/lib/python3.6/site-packages/faust/app/_attached.py", line 140, in publish_for_tp_offset
for fut in attached
File "/home/robinhood/env/lib/python3.6/site-packages/faust/app/_attached.py", line 140, in <listcomp>
for fut in attached
File "/home/robinhood/env/lib/python3.6/site-packages/faust/topics.py", line 303, in publish_message
topic, key, value, partition=message.partition)
File "/home/robinhood/env/lib/python3.6/site-packages/faust/transport/drivers/aiokafka.py", line 668, in send
topic, value, key=key, partition=partition))
File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/producer/producer.py", line 311, in send
timestamp_ms=timestamp_ms)
File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/producer/message_accumulator.py", line 257, in add_message
tp, key, value, timeout, timestamp_ms))
File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/producer/message_accumulator.py", line 257, in add_message
tp, key, value, timeout, timestamp_ms))
File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/producer/message_accumulator.py", line 257, in add_message
tp, key, value, timeout, timestamp_ms))
[Previous line repeated 934 more times]
File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/producer/message_accumulator.py", line 252, in add_message
yield from batch.wait_drain(timeout)
File "/home/robinhood/python-3.6.3/lib/python3.6/asyncio/tasks.py", line 301, in wait
if futures.isfuture(fs) or coroutines.iscoroutine(fs):
File "/home/robinhood/python-3.6.3/lib/python3.6/asyncio/coroutines.py", line 270, in iscoroutine
return isinstance(obj, _COROUTINE_TYPES)
File "/home/robinhood/env/lib/python3.6/abc.py", line 188, in __instancecheck__
subclass in cls._abc_negative_cache):
File "/home/robinhood/env/lib/python3.6/_weakrefset.py", line 75, in __contains__
return wr in self.data
RecursionError: maximum recursion depth exceeded in comparison
Seems like upon reading standbys we do write to rocksdb but do not persist offsets. This results in us anyway going through the entire thing when promoted to active. We should persist offsets anytime we are writing standbys to rocksdb.
Checkout the trebuchet-local branch in trebuchet. After setting up trebuchet, run it with trebuchet worker -l info. Make sure the faust version is >= 0.9.41.
Then, feed data into Kafka with the following command:
kafka-console-producer --broker-list localhost:9092 --topic adjust_data < tests/sample/adjust_data
.
Navigate browser to localhost:6066.
In faust verions 0.9.39 and below, commit latencies stabilize at around 3-4 seconds when data is being consumed. The web server loads very quickly.
When faust is consuming data, commit latency spikes to ~30 to 60, and the web server (localhost:6066) takes a long time to load.
I get the following error when running faust 1.0.12:
Traceback (most recent call last):
File "/home/robinhood/env/lib/python3.6/site-packages/mode/services.py", line 685, in _execute_task
await task
File "/home/robinhood/env/lib/python3.6/site-packages/faust/transport/consumer.py", line 292, in _commit_handler
await self.commit()
File "/home/robinhood/env/lib/python3.6/site-packages/faust/transport/consumer.py", line 331, in commit
return await self.force_commit(topics)
File "/home/robinhood/env/lib/python3.6/site-packages/mode/services.py", line 417, in _and_transition
return await fun(self, *args, **kwargs)
File "/home/robinhood/env/lib/python3.6/site-packages/faust/transport/consumer.py", line 345, in force_commit
did_commit = await self._commit_tps(commit_tps)
File "/home/robinhood/env/lib/python3.6/site-packages/faust/transport/consumer.py", line 355, in _commit_tps
await self._handle_attached(commit_offsets)
File "/home/robinhood/env/lib/python3.6/site-packages/faust/transport/consumer.py", line 389, in _handle_attached
await producer.wait_many(pending)
File "/home/robinhood/env/lib/python3.6/site-packages/mode/services.py", line 600, in wait_many
return await self._wait_one(coro, timeout=timeout)
File "/home/robinhood/env/lib/python3.6/site-packages/mode/services.py", line 625, in _wait_one
f.result() # propagate exceptions
File "/home/robinhood/python-3.6.3/lib/python3.6/asyncio/tasks.py", line 304, in wait
raise ValueError('Set of coroutines/Futures is empty.')
ValueError: Set of coroutines/Futures is empty.
The app crashes after this error.
add an option in record::asdict() to try to type cast to a specific primitive type https://github.com/robinhoodmarkets/faust/blob/master/faust/models/record.py#L249
I would like to use asdict to convert values to string
In my specific case, I'd like to cast datetime
objects to string by passing an option to asdict
Found the following error in the logs:
TypeError: __init__() got an unexpected keyword argument 'resolve'
[2018-06-05 16:33:01,436: ERROR]: ["/home/robinhood/python-3.6.3/lib/python3.6/asyncio/base_events.py:1411: RuntimeWarning: coroutine 'Fetcher._fetcher' was never awaited\n handle = self._ready.popleft()"]
[2018-06-05 16:33:01,427: WARNING]: /home/robinhood/python-3.6.3/lib/python3.6/asyncio/base_events.py:1411: RuntimeWarning: coroutine 'Fetcher._fetcher' was never awaited
handle = self._ready.popleft()
The app (trebuchet) was continuously crashing and trying to rebalance resulting in never actually forming a proper group.
The app seems to stall after the following errors. We should just make the app crash instead of stalling so that it is restarted by supervisor:
[2018-05-29 14:08:03,304: ERROR]: Future exception was never retrieved
future: <Future finished exception=NodeNotReadyError('Attempt to send a request to node which is not ready (node id 42).',)>
Traceback (most recent call last):
File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/producer/producer.py", line 419, in _send_produce_req
response = yield from self.client.send(node_id, request)
File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/client.py", line 406, in send
" which is not ready (node id {}).".format(node_id))
kafka.errors.NodeNotReadyError: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 42).
[2018-05-29 14:08:03,315: ERROR]: Unable connect to node with id 42: [Errno 111] Connect call failed ('10.1.30.204', 9092)
[2018-05-29 14:08:03,315: WARNING]: Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 42).
[2018-05-29 14:08:03,715: ERROR]: Unable connect to node with id 42: [Errno 111] Connect call failed ('10.1.30.204', 9092)
[2018-05-29 14:08:03,716: WARNING]: Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 42).
[2018-05-29 14:08:04,117: ERROR]: Unable connect to node with id 42: [Errno 111] Connect call failed ('10.1.30.204', 9092)
[2018-05-29 14:08:04,117: WARNING]: Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 42).
[2018-05-29 14:08:04,517: ERROR]: Unable connect to node with id 42: [Errno 111] Connect call failed ('10.1.30.204', 9092)
[2018-05-29 14:08:04,518: WARNING]: Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 42).
[2018-05-29 14:08:04,917: ERROR]: Unexpected error in sender routine
Traceback (most recent call last):
File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/producer/producer.py", line 374, in _sender_routine
task.result()
File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/producer/producer.py", line 419, in _send_produce_req
response = yield from self.client.send(node_id, request)
File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/client.py", line 403, in send
if not (yield from self.ready(node_id, group=group)):
File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/client.py", line 381, in ready
conn = yield from self._get_conn(node_id, group=group)
File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/client.py", line 354, in _get_conn
assert broker, 'Broker id %s not in current metadata' % node_id
AssertionError: Broker id 42 not in current metadata
Faust breaks if input messages have extra fields that we don't specify in our model. This means if our upstream adds an extra field, our app breaks.
I added a random KafkaTimeoutError in App._commit_attached
to simulate a KafkaTimeoutError
while producing. I catched this error and triggered a crash: self.crash(exc)
. This should ideally crash the application. Looks like upon crash we end up committing offsets which can be seen in the log trace below. This is bad because a crash means something bad happened and we should not commit offsets.
[2018-03-03 17:51:07,840: INFO]: [^--TableManager]: Restore complete!
[2018-03-03 17:51:07,841: INFO]: [^--Consumer]: Waiting for lock to resume partitions
[2018-03-03 17:51:07,842: INFO]: [^--Consumer]: Acquired lock to resume partitions
[2018-03-03 17:51:07,842: INFO]: [^--Consumer]: Released resume partitions lock
[2018-03-03 17:51:07,842: INFO]: [^--TableManager]: Attempting to start standbys
[2018-03-03 17:51:07,842: INFO]: [^--TableManager]: New assignments handled
[2018-03-03 17:51:07,871: INFO]: [^--Table: user_to_total]: Starting...
[2018-03-03 17:51:07,871: INFO]: [^---Store: user_to_total]: Starting...
[2018-03-03 17:51:07,871: INFO]: [^--Table: country_to_total]: Starting...
[2018-03-03 17:51:07,871: INFO]: [^---Store: country_to_total]: Starting...
[2018-03-03 17:51:07,913: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 744: k=b'"country_4"' v=b'{"user": "user_434", "country": "country_4", "amount": 1832.8454878058342, "date": "2018-03-04T01:41:26.137348Z", "__faust": {"ns": "examples.withdrawals.Withdrawal"}}'
[2018-03-03 17:51:07,914: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 1428: k=b'"country_1"' v=b'{"user": "user_246", "country": "country_1", "amount": 16163.629349934505, "date": "2018-03-04T01:39:52.217088Z", "__faust": {"ns": "examples.withdrawals.Withdrawal"}}'
[2018-03-03 17:51:07,914: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 747: k=b'"country_2"' v=b'{"user": "user_114", "country": "country_2", "amount": 7532.398390989598, "date": "2018-03-04T01:41:42.050016Z", "__faust": {"ns": "examples.withdrawals.Withdrawal"}}'
[2018-03-03 17:51:07,914: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 32392: k=b'"country_0"' v=b'{"user": "user_239", "country": "country_0", "amount": 24807.034890744173, "date": "2018-03-04T01:40:31.821356Z", "__faust": {"ns": "examples.withdrawals.Withdrawal"}}'
[2018-03-03 17:51:07,914: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 8506: k=b'"user_220"' v=b'{"user": "user_220", "country": "country_1", "amount": 11502.542493932206, "date": "2018-03-04T01:41:49.087564Z", "__faust": {"ns": "examples.withdrawals.Withdrawal"}}'
[2018-03-03 17:51:07,914: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 8327: k=b'"user_243"' v=b'{"user": "user_243", "country": "country_0", "amount": 8941.192697487695, "date": "2018-03-04T01:41:51.035743Z", "__faust": {"ns": "examples.withdrawals.Withdrawal"}}'
[2018-03-03 17:51:07,914: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 8600: k=b'"user_410"' v=b'{"user": "user_410", "country": "country_0", "amount": 22052.862798686932, "date": "2018-03-04T01:40:21.664204Z", "__faust": {"ns": "examples.withdrawals.Withdrawal"}}'
[2018-03-03 17:51:07,914: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 8973: k=b'"user_396"' v=b'{"user": "user_396", "country": "country_2", "amount": 18885.378500303657, "date": "2018-03-04T01:40:22.124953Z", "__faust": {"ns": "examples.withdrawals.Withdrawal"}}'
[2018-03-03 17:51:07,919: INFO]: [^Worker]: Ready
[2018-03-03 17:51:08,770: INFO]: [^--Consumer]: COMMITTING OFFSETS:
┌Commit Offsets──────────────────────────────────────────────────────────────────────────────┬────────┬──────────┐
│ TP │ Offset │ Metadata │
├────────────────────────────────────────────────────────────────────────────────────────────┼────────┼──────────┤
│ TopicPartition(topic='withdrawals2-f-simple3-Withdrawal.country-repartition', partition=3) │ 751 │ │
│ TopicPartition(topic='withdrawals2-f-simple3-Withdrawal.country-repartition', partition=2) │ 1466 │ │
│ TopicPartition(topic='withdrawals2-f-simple3-Withdrawal.country-repartition', partition=1) │ 754 │ │
│ TopicPartition(topic='withdrawals2-f-simple3-Withdrawal.country-repartition', partition=0) │ 32546 │ │
│ TopicPartition(topic='withdrawals2', partition=3) │ 8712 │ │
│ TopicPartition(topic='withdrawals2', partition=1) │ 8532 │ │
│ TopicPartition(topic='withdrawals2', partition=2) │ 8805 │ │
│ TopicPartition(topic='withdrawals2', partition=0) │ 9178 │ │
└────────────────────────────────────────────────────────────────────────────────────────────┴────────┴──────────┘
[2018-03-03 17:51:11,787: ERROR]: [^-App]: Crashed reason=KafkaTimeoutError()
Traceback (most recent call last):
File "/Users/vineet/faust/faust/app.py", line 1047, in _commit_attached
raise KafkaTimeoutError
kafka.errors.KafkaTimeoutError: KafkaTimeoutError
[2018-03-03 17:51:11,806: INFO]: [^Worker]: Stopping...
[2018-03-03 17:51:11,806: INFO]: [^-App]: Stopping...
[2018-03-03 17:51:11,806: INFO]: [^--Fetcher]: Stopping...
[2018-03-03 17:51:11,807: INFO]: [^--Consumer]: Consumer shutting down for user cancel.
[2018-03-03 17:51:11,817: INFO]: [^--Fetcher]: -Stopped!
[2018-03-03 17:51:11,817: INFO]: [^--TableManager]: Stopping...
[2018-03-03 17:51:11,817: INFO]: [^--TableManager]: Aborting ongoing recovery to start over
[2018-03-03 17:51:11,817: INFO]: [^--Table: user_to_total]: Stopping...
[2018-03-03 17:51:11,818: INFO]: [^---Store: user_to_total]: Stopping...
[2018-03-03 17:51:11,818: INFO]: [^---Store: user_to_total]: -Stopped!
[2018-03-03 17:51:11,819: INFO]: [^--Table: user_to_total]: -Stopped!
[2018-03-03 17:51:11,819: INFO]: [^--Table: country_to_total]: Stopping...
[2018-03-03 17:51:11,819: INFO]: [^---Store: country_to_total]: Stopping...
[2018-03-03 17:51:11,819: INFO]: [^---Store: country_to_total]: -Stopped!
[2018-03-03 17:51:11,828: INFO]: [^--Table: country_to_total]: -Stopped!
[2018-03-03 17:51:11,831: INFO]: [^--TableManager]: -Stopped!
[2018-03-03 17:51:11,832: INFO]: [^--TopicConductor]: Stopping...
[2018-03-03 17:51:11,837: INFO]: [^--TopicConductor]: -Stopped!
[2018-03-03 17:51:11,837: INFO]: [^--Agent]: Stopping...
[2018-03-03 17:51:11,837: INFO]: [^---OneForOneSupervisor]: Stopping...
[2018-03-03 17:51:11,837: INFO]: [^---Agent*: examp[.]track_country_withdrawal]: Stopping...
[2018-03-03 17:51:11,840: INFO]: [^---Agent*: examp[.]track_country_withdrawal]: -Stopped!
[2018-03-03 17:51:11,841: INFO]: [^---OneForOneSupervisor]: -Stopped!
[2018-03-03 17:51:11,841: INFO]: [^--Agent]: -Stopped!
[2018-03-03 17:51:11,841: INFO]: [^--Agent]: Stopping...
[2018-03-03 17:51:11,841: INFO]: [^---OneForOneSupervisor]: Stopping...
[2018-03-03 17:51:11,842: INFO]: [^---Agent*: examples[.]track_user_withdrawal]: Stopping...
[2018-03-03 17:51:11,864: INFO]: [^---Agent*: examples[.]track_user_withdrawal]: -Stopped!
[2018-03-03 17:51:11,867: INFO]: [^---OneForOneSupervisor]: -Stopped!
[2018-03-03 17:51:11,867: INFO]: [^--Agent]: -Stopped!
[2018-03-03 17:51:11,867: INFO]: [^--ReplyConsumer]: Stopping...
[2018-03-03 17:51:11,868: INFO]: [^--ReplyConsumer]: -Stopped!
[2018-03-03 17:51:11,868: INFO]: [^--LeaderAssignor]: Stopping...
[2018-03-03 17:51:11,868: INFO]: [^--LeaderAssignor]: -Stopped!
[2018-03-03 17:51:11,868: INFO]: [^--Consumer]: Stopping...
[2018-03-03 17:51:12,082: INFO]: [^--Consumer]: COMMITTING OFFSETS:
┌Commit Offsets──────────────────────────────────────────────────────────────────────────────┬────────┬──────────┐
│ TP │ Offset │ Metadata │
├────────────────────────────────────────────────────────────────────────────────────────────┼────────┼──────────┤
│ TopicPartition(topic='withdrawals2-f-simple3-Withdrawal.country-repartition', partition=3) │ 767 │ │
│ TopicPartition(topic='withdrawals2-f-simple3-Withdrawal.country-repartition', partition=2) │ 1506 │ │
│ TopicPartition(topic='withdrawals2-f-simple3-Withdrawal.country-repartition', partition=1) │ 776 │ │
│ TopicPartition(topic='withdrawals2-f-simple3-Withdrawal.country-repartition', partition=0) │ 33289 │ │
│ TopicPartition(topic='withdrawals2', partition=3) │ 9071 │ │
│ TopicPartition(topic='withdrawals2', partition=1) │ 8833 │ │
│ TopicPartition(topic='withdrawals2', partition=2) │ 9274 │ │
│ TopicPartition(topic='withdrawals2', partition=0) │ 9652 │ │
└────────────────────────────────────────────────────────────────────────────────────────────┴────────┴──────────┘
[2018-03-03 17:51:12,084: INFO]: LeaveGroup request succeeded
[2018-03-03 17:51:12,085: INFO]: [^--Consumer]: -Stopped!
[2018-03-03 17:51:12,085: INFO]: [^--Producer]: Stopping...
[2018-03-03 17:51:12,085: INFO]: [^--Producer]: -Stopped!
[2018-03-03 17:51:12,085: INFO]: [^--MonitorService]: Stopping...
[2018-03-03 17:51:12,086: INFO]: [^--MonitorService]: -Stopped!
[2018-03-03 17:51:12,086: INFO]: [^-App]: -Stopped!
[2018-03-03 17:51:12,086: INFO]: [^-Website]: Stopping...
[2018-03-03 17:51:12,086: INFO]: [^--Web]: Stopping...
[2018-03-03 17:51:12,086: INFO]: [^---ServerThread]: Stopping...
[2018-03-03 17:51:12,087: INFO]: [^--Web]: Closing server
[2018-03-03 17:51:12,087: INFO]: [^--Web]: Waiting for server to close handle
[2018-03-03 17:51:12,087: INFO]: [^--Web]: Shutting down web application
[2018-03-03 17:51:12,087: INFO]: [^--Web]: Waiting for handler to shut down
[2018-03-03 17:51:12,087: INFO]: [^--Web]: Cleanup
[2018-03-03 17:51:12,087: INFO]: [^---ServerThread]: -Stopped!
[2018-03-03 17:51:12,087: INFO]: [^--Web]: -Stopped!
[2018-03-03 17:51:12,088: INFO]: [^-Website]: -Stopped!
[2018-03-03 17:51:12,088: INFO]: [^Worker]: -Stopped!
[2018-03-03 17:51:12,088: INFO]: [^Worker]: Gathering service tasks...
[2018-03-03 17:51:12,088: INFO]: [^Worker]: Gathering all futures...
[2018-03-03 17:51:17,227: INFO]: [^Worker]: Closing event loop
[2018-03-03 17:51:17,228: CRITICAL]: [^Worker]: We experienced a crash! Reraising original exception...
Traceback (most recent call last):
File "/Users/vineet/.virtualenvs/faust/bin/faust", line 11, in <module>
load_entry_point('faust', 'console_scripts', 'faust')()
File "/Users/vineet/.virtualenvs/faust/lib/python3.6/site-packages/click-6.7-py3.6.egg/click/core.py", line 722, in __call__
return self.main(*args, **kwargs)
File "/Users/vineet/.virtualenvs/faust/lib/python3.6/site-packages/click-6.7-py3.6.egg/click/core.py", line 697, in main
rv = self.invoke(ctx)
File "/Users/vineet/.virtualenvs/faust/lib/python3.6/site-packages/click-6.7-py3.6.egg/click/core.py", line 1066, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/Users/vineet/.virtualenvs/faust/lib/python3.6/site-packages/click-6.7-py3.6.egg/click/core.py", line 895, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/Users/vineet/.virtualenvs/faust/lib/python3.6/site-packages/click-6.7-py3.6.egg/click/core.py", line 535, in invoke
return callback(*args, **kwargs)
File "/Users/vineet/.virtualenvs/faust/lib/python3.6/site-packages/click-6.7-py3.6.egg/click/decorators.py", line 17, in new_func
return f(get_current_context(), *args, **kwargs)
File "/Users/vineet/faust/faust/cli/base.py", line 317, in _inner
return cmd()
File "/Users/vineet/faust/faust/cli/worker.py", line 106, in __call__
**{**self.kwargs, **kwargs})
File "/Users/vineet/faust/faust/cli/worker.py", line 128, in start_worker
return worker.execute_from_commandline()
File "/Users/vineet/.virtualenvs/faust/lib/python3.6/site-packages/mode/worker.py", line 155, in execute_from_commandline
self.stop_and_shutdown()
File "/Users/vineet/.virtualenvs/faust/lib/python3.6/site-packages/mode/worker.py", line 160, in stop_and_shutdown
self._shutdown_loop()
File "/Users/vineet/.virtualenvs/faust/lib/python3.6/site-packages/mode/worker.py", line 188, in _shutdown_loop
raise self._crash_reason from self._crash_reason
File "/Users/vineet/faust/faust/app.py", line 1047, in _commit_attached
raise KafkaTimeoutError
kafka.errors.KafkaTimeoutError: KafkaTimeoutError
Upon receiving a rebalance error, we should hard shutdown the application.
Currently, the application tries to recover which results in a zombie assignments.
I have included information about relevant versions
We saw this issue on Faust 1.0.19.
I have verified that the issue persists when using the master
branch of Faust.
We have only seen this issue once and haven't reproduced it. But after talking to Vineet, he seems to think that it is possible that with the way that faust uses consumer offsets we may be dropping the first message that we read after a consumer rebalance.
We saw this issue in the combiner which should hold onto messages it receives from a stream for long periods of time without committing it. Unfortunately, we have seen this issue only once but will provide an update if we see it again.
This agent receives a special type of event which should trigger it to stop consuming and hold onto the event for a long period of time. This event should not be committed so if the consumer group is rebalanced or the agent is restarted, it should receive the message again and continue waiting.
We saw this special event being dropped with a message "DROPPED MESSAGE ROFF: n". We believe this may be caused by how Faust manages and uses consumer offsets.
2018-06-29 21:04:57,722: INFO]: [^--TableManager]: Stopped restoring
[2018-06-29 21:04:57,723: INFO]: [^--TableManager]: Restore complete!
[2018-06-29 21:04:57,738: INFO]: [^--TableManager]: Attempting to start standbys
[2018-06-29 21:04:57,738: INFO]: [^--TableManager]: New assignments handled
[2018-06-29 21:04:57,739: INFO]: [^--Fetcher]: Starting...
[2018-06-29 21:04:57,739: INFO]: [^--TableManager]: Worker ready
[2018-06-29 21:04:57,740: INFO]: [^Worker]: Ready
[2018-06-29 21:04:57,741: INFO]: No forwarders are waiting. Going to sleep.
[2018-06-29 21:04:57,812: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 2: k=None v=b'{"details": {"date": "2018-06-29"}, "id": "d3a8acf5-eb24-461
0-abc1-ce0b6fff8254", "type": "end_of_day"}'
[2018-06-29 21:04:57,838: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 302: k=None v=b'{"order_id": 168, "id": "3e66bc59-49d3-4388-b9f9-81476f7f6
9bb", "key_field": "0", "batch_id": null, "created_at": "2018-06-30T00:02:59.192929+00:00", "updated_at": "2018-06-30T00:02:59.192943+00:00", "
type": "end_of_day", "sent": false, "account_id": null, "details": {"date": "2018-06-29"}, "meta": null}'
[2018-06-29 21:04:57,838: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 29: k=None v=b'{"type": "end_of_day", "created_at": "2018-06-30T00:03:06.8
22620+00:00", "updated_at": "2018-06-30T00:03:06.822649+00:00", "account": null, "batch_id": null, "id": "5817539f-2f34-46d4-b89b-be7be15cfdba"
, "key_field": "0", "order_id": 42, "trigger_id": null, "sent": false, "details": {"date": "2018-06-29"}}'
[2018-06-29 21:04:57,840: INFO]: Got funding event on topic cashier-transfers_fundingevent
[2018-06-29 21:04:57,841: INFO]: Got funding event on topic cashier-transfers_fundingevent
[2018-06-29 21:04:57,873: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 4069: k=None v=b'{"created_at": "2018-06-30T00:02:59.160290+00:00", "updat
ed_at": "2018-06-30T00:02:59.160321+00:00", "key_field": "8088457b-fdcb-44ed-a829-e59a12100eb0", "batch_id": null, "order_id": 65146, "type": "
end_of_day", "id": "001d8063-fb9d-4325-87c5-87ff11d57d09", "sent": false, "details": {"date": "2018-06-29"}}'
[2018-06-29 21:04:57,875: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 908: k=None v=b'{"type":"end_of_day","details":{"date":"2018-06-29"}}'
[2018-06-29 21:04:57,875: INFO]: Got end_of_day event on topic mainst_pinnacle_order_updates
[2018-06-29 21:04:57,876: INFO]: Received end of day event on topic mainst_pinnacle_order_updates
[2018-06-29 21:04:57,970: INFO]: Waiting for the day cycle to begin.
[2018-06-29 21:04:57,982: INFO]: Fetch offset 0 is out of range for partition TopicPartition(topic='nummus_transfers', partition=18), resetting offset
[2018-06-29 21:04:57,999: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 18: k=None v=b'{"type": "end_of_day", "id": "19c13f26-824b-4b37-8d9e-fd6775e405f1", "details": {"date": "2018-06-29"}}'
[2018-06-29 21:04:58,014: INFO]: [^--Consumer]: COMMITTING OFFSETS:
+Commit Offsets-------------------------------------------------------+--------+----------+
| TP | Offset | Metadata |
+---------------------------------------------------------------------+--------+----------+
| TopicPartition(topic='cashier-transfers_fundingevent', partition=0) | 304 | |
+---------------------------------------------------------------------+--------+----------+
[2018-06-29 21:05:57,973: INFO]: Waiting for the day cycle to begin.
[2018-06-29 21:06:57,976: INFO]: Waiting for the day cycle to begin.
[2018-06-29 21:07:57,978: INFO]: Waiting for the day cycle to begin.
[2018-06-29 21:08:57,981: INFO]: Waiting for the day cycle to begin.
[2018-06-29 21:09:35,888: INFO]: Got funding event on topic cashier-transfers_fundingevent
[2018-06-29 21:09:38,123: INFO]: [^--Consumer]: COMMITTING OFFSETS:
+Commit Offsets-------------------------------------------------------+--------+----------+
| TP | Offset | Metadata |
+---------------------------------------------------------------------+--------+----------+
| TopicPartition(topic='cashier-transfers_fundingevent', partition=0) | 305 | |
+---------------------------------------------------------------------+--------+----------+
[2018-06-29 21:09:57,983: INFO]: Waiting for the day cycle to begin.
[2018-06-29 21:10:57,987: INFO]: Waiting for the day cycle to begin.
[2018-06-29 21:11:57,990: INFO]: Waiting for the day cycle to begin.
Currently we do not add window ends for keys into the timestamps heap upon changelog recovery. Hence, keys added upon recovery (and never updated) would never get garbage collected by the faust app.
Note: this isn't an issue with the actual kafka topic as it uses kafka's retention.
The app is stuck with the following error logging:
9092)
[2018-06-19 10:18:54,652: ERROR]: Failed fetch messages from 102: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 102).
[2018-06-19 10:18:54,654: ERROR]: Unable connect to node with id 102: [Errno 111] Connect call failed ('10.1.65.249', 9092)
[2018-06-19 10:18:54,654: ERROR]: Failed fetch messages from 102: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 102).
[2018-06-19 10:18:54,656: ERROR]: Unable connect to node with id 102: [Errno 111] Connect call failed ('10.1.65.249', 9092)
[2018-06-19 10:18:54,656: ERROR]: Failed fetch messages from 102: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 102).
[2018-06-19 10:18:54,658: ERROR]: Unable connect to node with id 102: [Errno 111] Connect call failed ('10.1.65.249', 9092)
[2018-06-19 10:18:54,658: ERROR]: Failed fetch messages from 102: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 102).
These errors are potentially from aiokafka/fetcher. I see a bunch of these errors.
This is with faust 1.0.14
Currently serializing models with attributes which are Iterable[ModelT
do not work.
Currently we do not garbage collect stale partition (in-memory) once a partition for a table is unassigned from a client. This may cause memory leaks in the event that partitions rotate a lot around different clients.
A proposed approach here is to divide a table internally by partition (implementing the table interface) multiple underlying tables. This might make the O(1) operations of the dictionary O(p) (p = number of partitions) but should make garbage collection fairly quick and simple.
The other approach could be to maintain a partition to key index but that would have a space complexity of O(n) (n = number of keys).
Agent topics should have the following properties:
I get the following error if I try to update faust Table from an agent if I use await asyncio.gather(fut)
or await asyncio.wait([fut])
when the fut
updates the table.
Here is the exception traceback:
[2018-03-04 18:39:34,215: ERROR]: Task exception was never retrieved
future: <Task finished coro=<ActivitiesStore.add() done, defined at /Users/vineet/scroll/scroll/store.py:75> exception=RuntimeError('Cannot modify table outside of agent/stream.',)>
Traceback (most recent call last):
File "/Users/vineet/scroll/scroll/store.py", line 76, in add
self._add_activity(activity)
File "/Users/vineet/scroll/scroll/store.py", line 85, in _add_activity
self.activities[activity.key] = activity
File "/Users/vineet/.virtualenvs/scroll/lib/python3.6/site-packages/faust/utils/collections.py", line 296, in __setitem__
self.on_key_set(key, value)
File "/Users/vineet/.virtualenvs/scroll/lib/python3.6/site-packages/faust/tables/table.py", line 58, in on_key_set
self._send_changelog(key, value)
File "/Users/vineet/.virtualenvs/scroll/lib/python3.6/site-packages/faust/tables/base.py", line 161, in _send_changelog
raise RuntimeError('Cannot modify table outside of agent/stream.')
RuntimeError: Cannot modify table outside of agent/stream.
Currently we garbage collect windowed tables entries based on the latest timestamp we have seen across all partitions. This should be changed to garbage collect entries for a partition based on the latest timestamp seen for that partition. This is useful in the case where a partition may be lagging behind others.
master
branch of Faust.Ran an application that has a relatively large (larger than the simple example) number of messages being transferred. Tried to rebalance the cluster of workers by adding another worker.
Worker addition/removal should be graceful.
Getting a RebalanceInProgress Exception from Kafka upon adding/removing workers.
[2017-08-08 22:34:18,945: ERROR] OffsetCommit failed for group referrals-device-check due to group error ([Error 27] RebalanceInProgressError: referrals-device-check), will rejoin
[2017-08-08 22:34:18,945: ERROR] User provided subscription listener <faust.transport.aiokafka.ConsumerRebalanceListener object at 0x1058c5ac8> for group referrals-device-check failed on_partitions_revoked
Traceback (most recent call last):
File "/Users/vineet/.virtualenvs/faust/lib/python3.6/site-packages/aiokafka/group_coordinator.py", line 308, in _on_join_prepare
yield from res
File "/Users/vineet/faust/faust/transport/aiokafka.py", line 77, in on_partitions_revoked
cast(Iterable[TopicPartition], revoked))
File "/Users/vineet/faust/faust/transport/base.py", line 146, in on_partitions_revoked
await self._on_partitions_revoked(revoked)
File "/Users/vineet/faust/faust/app.py", line 647, in on_partitions_revoked
await self.consumer.wait_empty()
File "/Users/vineet/faust/faust/transport/base.py", line 180, in wait_empty
await self.commit()
File "/Users/vineet/faust/faust/transport/base.py", line 221, in commit
await self._do_commit(tp, offset, meta='')
File "/Users/vineet/faust/faust/transport/base.py", line 263, in _do_commit
await self._commit({tp: self._new_offsetandmetadata(offset, meta)})
File "/Users/vineet/faust/faust/transport/aiokafka.py", line 210, in _commit
await self._consumer.commit(offsets)
File "/Users/vineet/.virtualenvs/faust/lib/python3.6/site-packages/aiokafka/consumer.py", line 397, in commit
yield from self._coordinator.commit_offsets(offsets)
File "/Users/vineet/.virtualenvs/faust/lib/python3.6/site-packages/aiokafka/group_coordinator.py", line 583, in commit_offsets
raise error
kafka.errors.RebalanceInProgressError: [Error 27] RebalanceInProgressError: referrals-device-check
On the latest version of Faust, I get the following error upon enabling Sentry:
```[2018-06-05 16:31:39,400: ERROR]: Sentry responded with an error: __init__() got an unexpected keyword argument 'resolve' (url: https://sentry-internal.robinhood.com/api/38/store/)
Traceback (most recent call last):
File "/home/robinhood/env/lib/python3.6/site-packages/raven/base.py", line 722, in send_remote
transport = self.remote.get_transport()
File "/home/robinhood/env/lib/python3.6/site-packages/raven/conf/remote.py", line 71, in get_transport
self._transport = self._transport_cls(**self.options)
File "/home/robinhood/env/lib/python3.6/site-packages/raven_aiohttp.py", line 174, in __init__
super().__init__(*args, **kwargs)
File "/home/robinhood/env/lib/python3.6/site-packages/raven_aiohttp.py", line 57, in __init__
self._client_session = self._client_session_factory()
File "/home/robinhood/env/lib/python3.6/site-packages/raven_aiohttp.py", line 77, in _client_session_factory
loop=self._loop)
TypeError: __init__() got an unexpected keyword argument 'resolve'```
I am using the following versions:
raven==6.6.0
raven-aiohttp==0.6.0
aiohttp==3.1.3
When reading from changelog, if we see a message with value None
this should delete the key from the table. We instead set the key to None
and still keep it around.
Since we have tables expiring records in changelog according to log compaction as well as time based expiry, we should also max each offset with earliest available offset
To delete keys from tables and have them deleted in the log compacted topic, we need to set the key's value to an empty value (null). However, currently we end up serializing None
hence the key is never deleted from the log compacted topic.
This is reproducible by running the simple example locally and taking down a broker and bringing it back.
Looks like the consumer is able to come back up in most cases (except when there is an OffsetCommit error). However the producer stalls. We don't produce any more messages at this point.
These are some of the alternatives in mind:
Regarding the offset that faust commits to Kafka during consumption. Traditionally, most Kafka by consumers, by convention, write the offset it expects next back to the offset storage (zk or kf). Faust seems to write the last committed offset it has consumed back to offset storage. What we're afraid of here is that most tooling is written with this convention in mind in the open source world. One such tool, released by LinkedIn, (https://github.com/linkedin/burrow)[Burrow] for consumer group monitoring.
For reference from the javadoc of the Java Kafka reference consumer:
https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
Offsets and Consumer Position
Kafka maintains a numerical offset for each record in a partition.
This offset acts as a kind of unique identifier of a record within that partition, and also denotes the position of the consumer in the partition.
That is, a consumer which has position 5 has consumed records with offsets 0 through 4 and will next receive the record with offset 5.
There are actually two notions of position relevant to the user of the consumer.
The position of the consumer gives the offset of the next record that will be given out.
It will be one larger than the highest offset the consumer has seen in that partition.
It automatically advances every time the consumer receives data calls poll(long) and receives messages.
The committed position is the last offset that has been saved securely.
Should the process fail and restart, this is the offset that it will recover to.
The consumer can either automatically commit offsets periodically;
or it can choose to control this committed position manually by calling commitSync, which will
block until the offsets have been successfully committed or fatal error has happened during
the commit process, or commitAsync which is non-blocking and will trigger
OffsetCommitCallback upon either successfully committed or fatally failed.
This distinction gives the consumer control over when a record is considered consumed. It is discussed in further detail below.
Offset written for each partition is the next offset it expects to read from not the offset that it has last consumed.
Off by one. The current offset consumed is written not the current offset + 1, which equals the next offset the consumer expects to consume.
Since #35 won't be solved for 1.0, we need to document how users can work around it.
The app currently never starts consuming if the source topics aren't available upon app startup.
We should ideally refresh subscriptions to check for newly available topics that we are subscribed to.
Although I have no experience with Kafka whatsoever, pretty sure the comparison to Celery meant to read
await add.send(value=AddOperation(2, 2))
But I could be wrong...
Currently, while reading from the changelog, we update the faust table only after reading 1000 events. While this is configurable currently, we should be able to have different configurations for changelog reader and standby reader.
We might want to allow custom configs to table changelogs for advanced usage. For example, more aggressive log compaction etc.
The app version flag makes it easy to start applications from scratch. However this results in a bunch of state in rocksdb/kafka for older app versions that stick around for a long time. We should try to fix this.
Trebuchet consumes from both the 'goku' and 'adjust_data' events. When running in production, however, the rate at which faust consumes from these topics vary wildly, sometimes even going to zero. See the graph below:
What's notable about these graphs is that they are the exact inverses of each other. When one spikes, the other drops, and vice versa, so Faust is overall consuming at approximately a constant rate. Goku events sometimes even goes to zero for a while.
Faust should be consuming from both topics at a reasonable, constant rate.
To reproduce this issue locally, set up trebuchet locally (https://github.com/robinhoodmarkets/trebuchet), and then add print("logging event")
to line 14 and print("adjust event")
to line 24 of trebuchet/agents.py. Also, change the replication factor in trebuchet/config.py to 1. After building and installing, start trebuchet with trebuchet worker -l info
. tests/sample contains sample adjust_data and goku events. To produce sample events, run the following two commands on two separate shells at the same time:
kafka-console-producer --broker-list localhost:9092 --topic adjust_data < tests/local/adjust_data
kafka-console-producer --broker-list localhost:9092 --topic goku < tests/local/goku
This should start producing to kafka, and you will see a stream of print statements on Trebuchet. If reproduced correctly, you should see times when just adjust events (and no goku/logging events) being processed for a while, before processing both topics for a while (with a lower rate for adjust events), then repeating this cycle, similar to what the graphs above are showing.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.