Comments (4)
You can start consuming without group from the beginning (see seek_to_beginning
method) or any other offset (e.g. see offsets_for_times
and seek
methods) in the past.
from aiokafka.
Thanks for your reply! Actually, I`ve tried this method, but I have stuck using it. I create test code snippet like this:
import asyncio
from aiokafka import AIOKafkaConsumer, TopicPartition
async def test():
consumer = AIOKafkaConsumer("test", bootstrap_servers="localhost:9092")
try:
tp = TopicPartition(topic="test", partition=0)
consumer.assign([tp])
await consumer.seek_to_beginning(tp)
finally:
await consumer.stop()
if __name__ == "__main__":
asyncio.run(test())
Here I manually create partition, because consumer.assignment()
returns an empty set(I don`t know because)
And this snippet returns the following error:
raise IllegalStateError(
aiokafka.errors.IllegalStateError: IllegalStateError: Subscription to topics, partitions and pattern are mutually exclusive
I don`t understand what I do incorrectly
from aiokafka.
You don't need assignment if you don't use groups and not going to distribute messages among workers. But you need to seek for all partitions you have. Use partitions_for_topic
method to get list of all available partitions.
from aiokafka.
I have also tried this method. It returns None, but my test topic has been created
from aiokafka.
Related Issues (20)
- [QUESTION] How to check readiness of kafka to receive msgs from producer?
- AIOKafkaAdminClient.create_topics fails randomly because it selects a node at random HOT 1
- performance degradation of producer when having many topics HOT 1
- Can I use Azure Event Hubs with `aiokafka`? HOT 1
- 0.9.0 zstd codec depends on cramjam but missing in documentation HOT 1
- Let's put `aiokafka` under the `aio-libs` org on PyPI HOT 2
- Add create_acls function for kafka admin client
- AIOKafkaProducer failed to produce message with headers HOT 3
- I keep getting MessageSizeTooLargeError, error message gives size much bigger than actual message were given to producer. HOT 9
- Add delete_records to the admin client HOT 5
- Can't connect to kafka docker HOT 1
- asyncio.exceptions.CancelledError
- Regarding Kafka Connection
- invalid Type AioKafkaAdminClient create_partitions
- Proposal to Add Type Hints HOT 12
- Consumer stopped consuming, task Fetcher._fetch_task has finished HOT 8
- High Incoming request sum on Azure Event Hub
- [QUESTION] Unable connect to node with id: X: [Errno 111]: Connection refused
- admin client - failure to create topics (error code 41) HOT 3
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from aiokafka.