return new KafkaTopicOffsetManager(zkc, "si-offsets");
}
@Bean
public KafkaMessageListenerContainer container(OffsetManager offsetManager) throws Exception {
final KafkaMessageListenerContainer kafkaMessageListenerContainer = new KafkaMessageListenerContainer(
kafkaBrokerConnectionFactory(), this.topic/*topics.split(",")*/);
kafkaMessageListenerContainer.setOffsetManager(offsetManager);
kafkaMessageListenerContainer.setMaxFetch(100);
kafkaMessageListenerContainer.setConcurrency(1);
return kafkaMessageListenerContainer;
}
@Bean
public KafkaMessageDrivenChannelAdapter adapter(KafkaMessageListenerContainer container) {
KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter =
new KafkaMessageDrivenChannelAdapter(container);
StringDecoder decoder = new StringDecoder();
kafkaMessageDrivenChannelAdapter.setKeyDecoder(decoder);
kafkaMessageDrivenChannelAdapter.setPayloadDecoder(decoder);
kafkaMessageDrivenChannelAdapter.setOutputChannel(fromKafka());
return kafkaMessageDrivenChannelAdapter;
}
@Bean
public PollableChannel fromKafka() {
QueueChannel queueChannel = new QueueChannel();
return queueChannel;
}
@Bean
public TopicCreator topicCreator() {
return new TopicCreator(this.topic, this.zookeeperConnect);
}
@Override
public void run() {
logger.info("启动kafka消费线程");
while (true) {
receive();
}
}