kafka-dev / kafka Goto Github PK
View Code? Open in Web Editor NEWA distributed publish/subscribe messaging service
Home Page: http://sna-projects.com/kafka
License: Apache License 2.0
A distributed publish/subscribe messaging service
Home Page: http://sna-projects.com/kafka
License: Apache License 2.0
We have an MM cluster in our environment (2.7.2) that replicates data from the source to the target cluster. Whenever we alter any partition in the source, MM2 replicates the newer partition in the target cluster, but not the data. Sometimes we will have to perform a MM2 restart to get the data synced to the target. Is it the usual way to connect clusters, or is there a bug? Sometimes it is replicated, and sometimes not. And also, when checked in MM, jconsole tasks are not getting spinned up for the newer partitions unless we restart MM2.
Hi,
I cloned this repo with
project java-examples
current
Current project is java-examples 0.7
Current Scala version is 2.8.0
Current log level is info
Stack traces are enabled
when I run
run
I am seeing this error
"/root/kafka/core/src/main/scala/kafka/Kafka.scala:20: value log4j is not a member of package org.apache"
and compilation failed,
I am Installed kafka through clouderamanager, is it makes any problem to run this code ?
Could anybody help me to rid from this ?
Thanks.
I have many topics which can not have more than 1 partition and many consumers with the same consumer group. Apparently only one consumer reads messages from all topics, other consumers just do nothing.
How kafka broker can weight consumer load and distribute topics between them evenly?
To start zookeeper, I used the following command in Windows:
kafka>.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
But I got the follow error:
The filename, directory name, or volume label syntax is incorrect.
What is wrong with that?
Thanks!
I was testing Kafka's load tolerance. I am running Kafka in a multi-node, multi-broker setup. I have 3 machines (with Windows OS) connected in a cluster with one broker in each machine. When I close node 3, it can still send and receive messages, while node 2 cannot receive messages but can send messages. Why is this happening? The leader is node 1.
how to set kafka's logger dir
I want to read logger when i run producer and consumer .but i don't know how to do it!
Issue Summary:
When i send message with transaction and the message size exceeds the maximum limit, it can't catche RecordTooLargeException!And Kafka Server does receive messages.
What is the Business Impact you are facing?
Observed Behavior:
When i send message over size limit ,it has no any Exception,and there are no any message Kafka Server received.I expect an exception log to be printed.but nothing.
Please include screenshots. Yes
My code is the same as the example of the description on the official website;
version:3.1.0
I have come across this issue in my apache kafka environment where if I turn ON Kafka Authorizer in log4j and then work with logging level INFO or DEBUG this causes RequestHandlerPool and NetworkHandlerPool to crash. Our monitors show no activity on these pool handlers. As soon as Authorizer logging is turned OFF these handlers come back up and monitors start to show activity.
Please let me know if you need any further information I will be more than happy to provide.
I have configuration 5 nodes, 4 broker+controller and 1 controller.
Create topic with replication factor 5, and it is created, and describe show that topic partition have 5 replicas.
/opt/kafka/latest/bin/kafka-topics.sh --create --bootstrap-server=dc1-prod-sep-kafka-001-vs:9092,dc2-prod-sep-kafka-001-vs:9092 --replication-factor 5 --partitions 1 --topic test5
/opt/kafka/latest/bin/kafka-topics.sh --describe --topic test5 --bootstrap-server=dc1-prod-sep-kafka-001-vs:9092
Topic: test5 TopicId: amuqr8EgRmqeKryUHZwsMA PartitionCount: 1 ReplicationFactor: 5 Configs: segment.bytes=1073741824
Topic: test5 Partition: 0 Leader: 3 Replicas: 3,4,1,2,5 Isr: 3,4,1,2
Replicas 5 but ISR 4.
Why does kafka initially allow you to create a replica on the controller node, although in reality the replica is not created on the controller node and there are no topic files in the log directory.
Is this expected behavior or not? Thanks.
http://incubator.apache.org/kafka/index.html
not found!
I am trying to upgrade apache kafka-client and kafka-streams in my application from 2.8.0 tp 3.4.0.Please help me with the alrernative
hello! i need help. we use 2.5.0 of kafka, but we don't want to update the version of kafka, is there any other way to solve this problem?
Hi
We are regularly receiving the below error in our Kafka consumer. We are unable to fix this.
Local: Maximum application poll interval (max.poll.interval.ms) exceeded
Can you please help us in resolving this. Consumer config
BootstrapServers = xxxx,
GroupId = xxxxx,
EnableAutoCommit = false,
StatisticsIntervalMs = 5000,
SessionTimeoutMs = 6000,
MaxPollIntervalMs = 30000,
AutoOffsetReset = AutoOffsetReset.Earliest,
EnablePartitionEof = true
Code Snippet
using (var consumer = new ConsumerBuilder<Ignore, string>(KafkaConfig).SetErrorHandler(ErrorHandler)
.Build())
{
consumer.Subscribe(KafkaTopic);
I have a 3 Node Kafka cluster (Kafka Version 1.1.1) with SASL_SSL security enabled using pluggable JAAS configurations. When Kafka Producers and consumers with proper security configurations communicate with secured broker then write/read operations works without any issue, when I use Kafka streams then I am seeing a weird behaviour even if the Stream application is working fine on client side, I see an error log in the server side where threads corresponding to stream application are making a describe request to all other topics on the broker. My Stream application is very simple which copies data from a topic "testA" to another topic "testB". Why are Kafka stream threads on broker side trying to describe other topics which steam application is not even authorised to describe. Ideally the stream application should only describe the topics "testA" and "testB", if my understanding is correct.
The error log is :
[2020-01-16 09:28:02,235] ERROR Requested Operation : DESCRIBE for
Resource : Topic:dmp-tgd8uye1am cannot be authorized due to
invalid/missing claims in the JWT_TOKEN claims : {
Topic:testA=[READ, WRITE, DESCRIBE],
Topic:testB=[READ, WRITE, DESCRIBE],
Group:*=[READ, WRITE, DESCRIBE] } (com.fico.dmp.core.security.providers.kafka.authorization.DmpJwtAuthorizer)
The JWT_TOKEN is set by the Client side with permissions for "testA" and "testB". Why in the above error log stream thread is making describe call to "dmp-tgd8uye1am" topic ?
This token is validated on the server side involving 2 steps a) Authentication b) Authorisation
a) Authentication with SaslServer class : This step is successful.
b) Authorisation with DmpJwtAuthorizer class (implementation details below) : This is the step where the weird logs show up. I can see that in the broker process logs there are multiple threads corresponding to stream application doing authorisation. The threads which request describe access for "testA" and "testB" passes. And the remaining threads request for DESCRIBE access on other topics like "dmp-tgd8uye1am" which stream application has nothing to do with.
The broker security configurations are as follows :
export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka_2.11-1.1.1/kafka-config/kafka_server_jaas.conf"
KafkaServer {
com.fico.dmp.core.security.providers.kafka.authentication.DmpJwtLoginModule required
username="kafkabroker"
password="HWBcKgfL5bQN"
user_kafkabroker="HWBcKgfL5bQN";
};
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="kafka"
password="HWBcKgfL5bQN";
};
listeners=SSL://:9093,SASL_SSL://:9094
advertised.listeners=SSL://:9093,SASL_SSL://:9094
sasl.enabled.mechanisms=PLAIN
authorizer.class.name=com.core.security.providers.kafka.authorization.DmpJwtAuthorizer
principal.builder.class=com.core.security.providers.kafka.authentication.DmpJwtPrincipalBuilder
security.inter.broker.protocol=SSL
ssl.client.auth=required
ssl.truststore.location=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.222.b10-0.amzn2.0.1.x86_64/jre/lib/security/cacerts
ssl.truststore.password=******
ssl.keystore.location=/opt/kafka_2.11-1.1.1/kafka-config/keystore
ssl.keystore.password=*****
default.replication.factor=3
min.insync.replicas=2
auto.leader.rebalance.enable=true
The KafkaStream client configs and working logic are as follows :
configStreamProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().configStreamProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
configStreamProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * configStreamProperties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, configStreamProperties.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, configStreamProperties.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "2");
configProperties.put("security.protocol", "SASL_SSL");
configProperties.put("sasl.mechanism", "PLAIN");
configProperties.put("sasl.jaas.config", "com.core.security.providers.kafka.authentication.DmpJwtClient required;");
final StreamsBuilder builder = new StreamsBuilder();
String inputTopic = (String) inputOptions.get("testA");
String outputTopic = (String) outputOptions.get("testB");
builder.stream(inputTopic).to(outputTopic);
final KafkaStreams streams = new KafkaStreams(builder.build(), configStreamProperties);
streams.cleanUp();
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
DmpJwtAuthorizer Class on Server Side :
public class DmpJwtAuthorizer extends kafka.security.auth.SimpleAclAuthorizer {
// SimpleAclAuthorizer is part of Kafka libraries and extends the kafka.security.auth.Authorizer class
private static final Logger logger = LoggerFactory.getLogger(DmpJwtAuthorizer.class);
@Override
public boolean authorize(Session session, Operation operation, Resource resource) {
boolean authorized = false;
DmpKafkaPrincipal principal = (DmpKafkaPrincipal) session.principal();
String user = principal.getName();
Map<Resource, List<AclOperation>> allowedResources = principal.getAllowedResources();
if ((user != null) && (!user.isEmpty())) {
Iterator<Entry<Resource, List<AclOperation>>> iterator = allowedResources.entrySet().iterator();
while(iterator.hasNext() && !authorized) {
Entry<Resource, List<AclOperation>> entry = iterator.next();
Resource allowedResource = entry.getKey();
List<AclOperation> allowedOperations = entry.getValue();
AclOperation requestedACLOperation = operation.toJava();
if(resource.resourceType().name().equals(allowedResource.resourceType().name())
&& isMatchingResourceName(resource, allowedResource)
&& allowedOperations.contains(requestedACLOperation)){
authorized = true;
}
}
if (!authorized) {
logger.error("User : {} Requested Operation : {} for Resource : {} cannot be authorized due to invalid/missing claims in the JWT_TOKEN claims : {} THREAD NAME : {} THREAD ID : {}", user, operation.toJava(), resource, allowedResources, Thread.currentThread().getName(), Thread.currentThread().getId());
}
}
return authorized;
}
public boolean isMatchingResourceName(Resource requestedResource, Resource allowedResource){
String requestedResourceName = requestedResource.name();
String allowedResourceName = allowedResource.name();
String regex = ("\\Q" + allowedResourceName + "\\E").replace("*", "\\E.*\\Q");
return requestedResourceName.matches(regex);
}
}
Hi all, we want to monitor the storage used in kafka broker per topic if possible per message, in order to do that we are creating some dashboards using grafana (influxdb and prometheus), but we want to me sure the metrics we are using are showing the right values so we want to validate if the storage bytes used per kafka message matches with the values displayed in the dashboards, do you know how to check how many storage bytes are used per message? I know there are some kafka wrappers that use extra bytes we would like to know if there is a way to know the total bytes used per message and the total storage used per topic, do you know if that's possible in kafka?
For lulz, I'm trying to get the ancient kafka v0.05 running in Docker. When I run more recent versions, like 0.8.1, or 0.5.2, kafka prints some logging messages when it starts.
Strangely, kafka 0.05 shows no such startup messages. Is something wrong in the startup, or did 0.05 simply not have logging?
Source:
Could we publish git tags for legacy release versions, like 0.5.2, to make it easier to find these old versions?
On small container installations (such as alpine), /bin/bash is not installed. It appears the scripts in the /bin directory would mostly work with /bin/sh. Please use a simpler shell for shell scripts so that they are more portable.
I have deployed Kafka 2.5.0 with Strimzi in a Kubernetes cluster.
In the last 4 days I have seen an unexpected memory increase as there was very low activity to no activity at all
To be mentioned that end2end latency increased as well.
I am not certain that the memory increase alone justifies the latency increase and I would also like to hear some other ideas.
Could you please help me out?
ERROR org.apache.kafka.common.errors.TimeoutException: Call(callName=listTopics, deadlineMs=1706136118301, tries=1, nextAllowedTryMs=1706136118444) timed out at 1706136118344 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.DisconnectException: Cancelled listTopics request with correlation id 3 due to node 1 being disconnected
(kafka.admin.TopicCommand$)
This the error I get when I try to list the commands
the history of the situation I have a server standalone VM on OCI ( oracle cloud ) this is the source and the destination is normal 3 linux servers as a cluster with apache kafka 3.6 , when I try to list i get timeout, but what is confusing me when I try to create or produce or consume it works well , there is a firewall between OCI and servers but network team confirms that everything is allowed.
listing topics from the servers to OCI wasn't working also but when we changed the etc/hosts file in the servers, it could list the topics on OCI
I will post below the debugging I tried when I ran the command
Registered kafka:type=kafka.Log4jController MBean
AdminClientConfig values:
auto.include.jmx.reporter = true
bootstrap.servers = [xx.x.xxx:9092, xx.x.xxx:9092]
client.dns.lookup = use_all_dns_ips
client.id =
connections.max.idle.ms = 300000
default.api.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.connect.timeout.ms = null
sasl.login.read.timeout.ms = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.login.retry.backoff.max.ms = 10000
sasl.login.retry.backoff.ms = 100
sasl.mechanism = GSSAPI
sasl.oauthbearer.clock.skew.seconds = 30
sasl.oauthbearer.expected.audience = null
sasl.oauthbearer.expected.issuer = null
sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
sasl.oauthbearer.jwks.endpoint.url = null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url = null
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
[AdminClient clientId=adminclient-1] Setting bootstrap cluster metadata Cluster(id = null, nodes = [xx.x.xxx:9092 (id: -2 rack: null), xx.x.xxx:9092 (id: -1 rack: null)], partitions = [], controller = null).
Registered metric named MetricName [name=count, group=kafka-metrics-count, description=total number of registered metrics, tags={client-id=adminclient-1}]
Added sensor with name connections-closed:
Registered metric named MetricName [name=connection-close-total, group=admin-client-metrics, description=The total number of connections closed, tags={client-id=adminclient-1}]
Registered metric named MetricName [name=connection-close-rate, group=admin-client-metrics, description=The number of connections closed per second, tags={client-id=adminclient-1}]
Added sensor with name connections-created:
Registered metric named MetricName [name=connection-creation-total, group=admin-client-metrics, description=The total number of new connections established, tags={client-id=adminclient-1}]
Registered metric named MetricName [name=connection-creation-rate, group=admin-client-metrics, description=The number of new connections established per second, tags={client-id=adminclient-1}]
Added sensor with name successful-authentication:
Registered metric named MetricName [name=successful-authentication-total, group=admin-client-metrics, description=The total number of connections with successful authentication, tags={client-id=adminclient-1}]
Registered metric named MetricName [name=successful-authentication-rate, group=admin-client-metrics, description=The number of connections with successful authentication per second, tags={client-id=adminclient-1}]
Added sensor with name successful-reauthentication:
Registered metric named MetricName [name=successful-reauthentication-total, group=admin-client-metrics, description=The total number of successful re-authentication of connections, tags={client-id=adminclient-1}]
Registered metric named MetricName [name=successful-reauthentication-rate, group=admin-client-metrics, description=The number of successful re-authentication of connections per second, tags={client-id=adminclient-1}]
Added sensor with name successful-authentication-no-reauth:
Registered metric named MetricName [name=successful-authentication-no-reauth-total, group=admin-client-metrics, description=The total number of connections with successful authentication where the client does not support re-authentication, tags={client-id=adminclient-1}]
Added sensor with name failed-authentication:
Registered metric named MetricName [name=failed-authentication-total, group=admin-client-metrics, description=The total number of connections with failed authentication, tags={client-id=adminclient-1}]
Registered metric named MetricName [name=failed-authentication-rate, group=admin-client-metrics, description=The number of connections with failed authentication per second, tags={client-id=adminclient-1}]
Added sensor with name failed-reauthentication:
Registered metric named MetricName [name=failed-reauthentication-total, group=admin-client-metrics, description=The total number of failed re-authentication of connections, tags={client-id=adminclient-1}]
Registered metric named MetricName [name=failed-reauthentication-rate, group=admin-client-metrics, description=The number of failed re-authentication of connections per second, tags={client-id=adminclient-1}]
Added sensor with name reauthentication-latency:
Registered metric named MetricName [name=reauthentication-latency-max, group=admin-client-metrics, description=The max latency observed due to re-authentication, tags={client-id=adminclient-1}]
Registered metric named MetricName [name=reauthentication-latency-avg, group=admin-client-metrics, description=The average latency observed due to re-authentication, tags={client-id=adminclient-1}]
Added sensor with name bytes-sent-received:
Registered metric named MetricName [name=network-io-total, group=admin-client-metrics, description=The total number of network operations (reads or writes) on all connections, tags={client-id=adminclient-1}]
Registered metric named MetricName [name=network-io-rate, group=admin-client-metrics, description=The number of network operations (reads or writes) on all connections per second, tags={client-id=adminclient-1}]
Added sensor with name bytes-sent:
Registered metric named MetricName [name=outgoing-byte-total, group=admin-client-metrics, description=The total number of outgoing bytes sent to all servers, tags={client-id=adminclient-1}]
Registered metric named MetricName [name=outgoing-byte-rate, group=admin-client-metrics, description=The number of outgoing bytes sent to all servers per second, tags={client-id=adminclient-1}]
Added sensor with name requests-sent:
Registered metric named MetricName [name=request-total, group=admin-client-metrics, description=The total number of requests sent, tags={client-id=adminclient-1}]
Registered metric named MetricName [name=request-rate, group=admin-client-metrics, description=The number of requests sent per second, tags={client-id=adminclient-1}]
Registered metric named MetricName [name=request-size-avg, group=admin-client-metrics, description=The average size of requests sent., tags={client-id=adminclient-1}]
Registered metric named MetricName [name=request-size-max, group=admin-client-metrics, description=The maximum size of any request sent., tags={client-id=adminclient-1}]
Added sensor with name bytes-received:
Registered metric named MetricName [name=incoming-byte-total, group=admin-client-metrics, description=The total number of bytes read off all sockets, tags={client-id=adminclient-1}]
Registered metric named MetricName [name=incoming-byte-rate, group=admin-client-metrics, description=The number of bytes read off all sockets per second, tags={client-id=adminclient-1}]
Added sensor with name responses-received:
Registered metric named MetricName [name=response-total, group=admin-client-metrics, description=The total number of responses received, tags={client-id=adminclient-1}]
Registered metric named MetricName [name=response-rate, group=admin-client-metrics, description=The number of responses received per second, tags={client-id=adminclient-1}]
Added sensor with name select-time:
Registered metric named MetricName [name=select-total, group=admin-client-metrics, description=The total number of times the I/O layer checked for new I/O to perform, tags={client-id=adminclient-1}]
Registered metric named MetricName [name=select-rate, group=admin-client-metrics, description=The number of times the I/O layer checked for new I/O to perform per second, tags={client-id=adminclient-1}]
Registered metric named MetricName [name=io-wait-time-ns-avg, group=admin-client-metrics, description=The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds., tags={client-id=adminclient-1}]
Registered metric named MetricName [name=io-waittime-total, group=admin-client-metrics, description=Deprecated The total time the I/O thread spent waiting, tags={client-id=adminclient-1}]
Registered metric named MetricName [name=io-wait-ratio, group=admin-client-metrics, description=Deprecated The fraction of time the I/O thread spent waiting, tags={client-id=adminclient-1}]
Registered metric named MetricName [name=io-wait-time-ns-total, group=admin-client-metrics, description=The total time the I/O thread spent waiting, tags={client-id=adminclient-1}]
Added sensor with name io-time:
Registered metric named MetricName [name=io-time-ns-avg, group=admin-client-metrics, description=The average length of time for I/O per select call in nanoseconds., tags={client-id=adminclient-1}]
Registered metric named MetricName [name=iotime-total, group=admin-client-metrics, description=Deprecated The total time the I/O thread spent doing I/O, tags={client-id=adminclient-1}]
Registered metric named MetricName [name=io-ratio, group=admin-client-metrics, description=Deprecated The fraction of time the I/O thread spent doing I/O, tags={client-id=adminclient-1}]
Registered metric named MetricName [name=io-time-ns-total, group=admin-client-metrics, description=The total time the I/O thread spent doing I/O, tags={client-id=adminclient-1}]
[AdminClient clientId=adminclient-1] sslCiphers: created new gauge suite with maxEntries = 100.
[AdminClient clientId=adminclient-1] clients: created new gauge suite with maxEntries = 100.
Registered metric named MetricName [name=connection-count, group=admin-client-metrics, description=The current number of active connections., tags={client-id=adminclient-1}]
Kafka version: 3.6.1
Kafka commitId: 5e3c2b738d253ff5
Kafka startTimeMs: 1706133998344
Registered metric named MetricName [name=version, group=app-info, description=Metric indicating version, tags={client-id=adminclient-1}]
Registered metric named MetricName [name=commit-id, group=app-info, description=Metric indicating commit-id, tags={client-id=adminclient-1}]
Registered metric named MetricName [name=start-time-ms, group=app-info, description=Metric indicating start-time-ms, tags={client-id=adminclient-1}]
[AdminClient clientId=adminclient-1] Kafka admin client initialized
[AdminClient clientId=adminclient-1] Thread starting
[AdminClient clientId=adminclient-1] Trying to choose nodes for [] at 1706133998348
[AdminClient clientId=adminclient-1] Found least loaded node xx.x.xxx:9092 (id: -1 rack: null) with no active connection
[AdminClient clientId=adminclient-1] Assigned Call(callName=fetchMetadata, deadlineMs=1706134028348, tries=0, nextAllowedTryMs=0) to node xx.x.xxx:9092 (id: -1 rack: null)
Resolved host xx.x.xxx as xx.x.xxx
[AdminClient clientId=adminclient-1] Initiating connection to node xx.x.xxx:9092 (id: -1 rack: null) using address /xx.x.xxx
[AdminClient clientId=adminclient-1] Queueing Call(callName=listTopics, deadlineMs=1706134058350, tries=0, nextAllowedTryMs=0) with a timeout 30000 ms from now.
[AdminClient clientId=adminclient-1] Client is not ready to send to xx.x.xxx:9092 (id: -1 rack: null). Must delay 11950 ms
[AdminClient clientId=adminclient-1] Entering KafkaClient#poll(timeout=11950)
[AdminClient clientId=adminclient-1] KafkaClient#poll retrieved 0 response(s)
[AdminClient clientId=adminclient-1] Trying to choose nodes for [Call(callName=listTopics, deadlineMs=1706134058350, tries=0, nextAllowedTryMs=0)] at 1706133998358
[AdminClient clientId=adminclient-1] Metadata is not ready: we have not fetched metadata from the bootstrap nodes yet.
[AdminClient clientId=adminclient-1] Unable to assign Call(callName=listTopics, deadlineMs=1706134058350, tries=0, nextAllowedTryMs=0) to a node.
[AdminClient clientId=adminclient-1] Client is not ready to send to xx.x.xxx:9092 (id: -1 rack: null). Must delay 11950 ms
[AdminClient clientId=adminclient-1] Entering KafkaClient#poll(timeout=100)
Added sensor with name node--1.requests-sent
Registered metric named MetricName [name=request-total, group=admin-client-node-metrics, description=The total number of requests sent, tags={client-id=adminclient-1, node-id=node--1}]
Registered metric named MetricName [name=request-rate, group=admin-client-node-metrics, description=The number of requests sent per second, tags={client-id=adminclient-1, node-id=node--1}]
Registered metric named MetricName [name=request-size-avg, group=admin-client-node-metrics, description=The average size of requests sent., tags={client-id=adminclient-1, node-id=node--1}]
Registered metric named MetricName [name=request-size-max, group=admin-client-node-metrics, description=The maximum size of any request sent., tags={client-id=adminclient-1, node-id=node--1}]
Added sensor with name node--1.bytes-sent
Registered metric named MetricName [name=outgoing-byte-total, group=admin-client-node-metrics, description=The total number of outgoing bytes, tags={client-id=adminclient-1, node-id=node--1}]
Registered metric named MetricName [name=outgoing-byte-rate, group=admin-client-node-metrics, description=The number of outgoing bytes per second, tags={client-id=adminclient-1, node-id=node--1}]
Added sensor with name node--1.responses-received
Registered metric named MetricName [name=response-total, group=admin-client-node-metrics, description=The total number of responses received, tags={client-id=adminclient-1, node-id=node--1}]
Registered metric named MetricName [name=response-rate, group=admin-client-node-metrics, description=The number of responses received per second, tags={client-id=adminclient-1, node-id=node--1}]
Added sensor with name node--1.bytes-received
Registered metric named MetricName [name=incoming-byte-total, group=admin-client-node-metrics, description=The total number of incoming bytes, tags={client-id=adminclient-1, node-id=node--1}]
Registered metric named MetricName [name=incoming-byte-rate, group=admin-client-node-metrics, description=The number of incoming bytes per second, tags={client-id=adminclient-1, node-id=node--1}]
Added sensor with name node--1.latency
Registered metric named MetricName [name=request-latency-avg, group=admin-client-node-metrics, description=, tags={client-id=adminclient-1, node-id=node--1}]
Registered metric named MetricName [name=request-latency-max, group=admin-client-node-metrics, description=, tags={client-id=adminclient-1, node-id=node--1}]
[AdminClient clientId=adminclient-1] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
[AdminClient clientId=adminclient-1] Completed connection to node -1. Fetching API versions.
[AdminClient clientId=adminclient-1] Initiating API versions fetch from node -1.
[AdminClient clientId=adminclient-1] No version information found when sending API_VERSIONS with correlation id 0 to node -1. Assuming version 3.
[AdminClient clientId=adminclient-1] Sending API_VERSIONS request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=adminclient-1, correlationId=0, headerVersion=2) and timeout 3600000 to node -1: ApiVersionsRequestData(clientSoftwareName='apache-kafka-java', clientSoftwareVersion='3.6.1')
[AdminClient clientId=adminclient-1] KafkaClient#poll retrieved 0 response(s)
[AdminClient clientId=adminclient-1] Trying to choose nodes for [Call(callName=listTopics, deadlineMs=1706134058350, tries=0, nextAllowedTryMs=0)] at 1706133998652
[AdminClient clientId=adminclient-1] Metadata is not ready: we have not fetched metadata from the bootstrap nodes yet.
[AdminClient clientId=adminclient-1] Unable to assign Call(callName=listTopics, deadlineMs=1706134058350, tries=0, nextAllowedTryMs=0) to a node.
[AdminClient clientId=adminclient-1] Client is not ready to send to xx.x.xxx:9092 (id: -1 rack: null). Must delay 9223372036854775807 ms
[AdminClient clientId=adminclient-1] Entering KafkaClient#poll(timeout=100)
[AdminClient clientId=adminclient-1] KafkaClient#poll retrieved 0 response(s)
[AdminClient clientId=adminclient-1] Trying to choose nodes for [Call(callName=listTopics, deadlineMs=1706134058350, tries=0, nextAllowedTryMs=0)] at 1706133998654
[AdminClient clientId=adminclient-1] Metadata is not ready: we have not fetched metadata from the bootstrap nodes yet.
[AdminClient clientId=adminclient-1] Unable to assign Call(callName=listTopics, deadlineMs=1706134058350, tries=0, nextAllowedTryMs=0) to a node.
[AdminClient clientId=adminclient-1] Client is not ready to send to xx.x.xxx:9092 (id: -1 rack: null). Must delay 9223372036854775807 ms
[AdminClient clientId=adminclient-1] Entering KafkaClient#poll(timeout=100)
[AdminClient clientId=adminclient-1] Received API_VERSIONS response from node -1 for request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=adminclient-1, correlationId=0, headerVersion=2): ApiVersionsResponseData(errorCode=0, apiKeys=[ApiVersion(apiKey=0, minVersion=0, maxVersion=9), ApiVersion(apiKey=1, minVersion=0, maxVersion=15), ApiVersion(apiKey=2, minVersion=0, maxVersion=8), ApiVersion(apiKey=3, minVersion=0, maxVersion=12), ApiVersion(apiKey=4, minVersion=0, maxVersion=7), ApiVersion(apiKey=5, minVersion=0, maxVersion=4), ApiVersion(apiKey=6, minVersion=0, maxVersion=8), ApiVersion(apiKey=7, minVersion=0, maxVersion=3), ApiVersion(apiKey=8, minVersion=0, maxVersion=8), ApiVersion(apiKey=9, minVersion=0, maxVersion=8), ApiVersion(apiKey=10, minVersion=0, maxVersion=4), ApiVersion(apiKey=11, minVersion=0, maxVersion=9), ApiVersion(apiKey=12, minVersion=0, maxVersion=4), ApiVersion(apiKey=13, minVersion=0, maxVersion=5), ApiVersion(apiKey=14, minVersion=0, maxVersion=5), ApiVersion(apiKey=15, minVersion=0, maxVersion=5), ApiVersion(apiKey=16, minVersion=0, maxVersion=4), ApiVersion(apiKey=17, minVersion=0, maxVersion=1), ApiVersion(apiKey=18, minVersion=0, maxVersion=3), ApiVersion(apiKey=19, minVersion=0, maxVersion=7), ApiVersion(apiKey=20, minVersion=0, maxVersion=6), ApiVersion(apiKey=21, minVersion=0, maxVersion=2), ApiVersion(apiKey=22, minVersion=0, maxVersion=4), ApiVersion(apiKey=23, minVersion=0, maxVersion=4), ApiVersion(apiKey=24, minVersion=0, maxVersion=4), ApiVersion(apiKey=25, minVersion=0, maxVersion=3), ApiVersion(apiKey=26, minVersion=0, maxVersion=3), ApiVersion(apiKey=27, minVersion=0, maxVersion=1), ApiVersion(apiKey=28, minVersion=0, maxVersion=3), ApiVersion(apiKey=29, minVersion=0, maxVersion=3), ApiVersion(apiKey=30, minVersion=0, maxVersion=3), ApiVersion(apiKey=31, minVersion=0, maxVersion=3), ApiVersion(apiKey=32, minVersion=0, maxVersion=4), ApiVersion(apiKey=33, minVersion=0, maxVersion=2), ApiVersion(apiKey=34, minVersion=0, maxVersion=2), ApiVersion(apiKey=35, minVersion=0, maxVersion=4), ApiVersion(apiKey=36, minVersion=0, maxVersion=2), ApiVersion(apiKey=37, minVersion=0, maxVersion=3), ApiVersion(apiKey=38, minVersion=0, maxVersion=3), ApiVersion(apiKey=39, minVersion=0, maxVersion=2), ApiVersion(apiKey=40, minVersion=0, maxVersion=2), ApiVersion(apiKey=41, minVersion=0, maxVersion=3), ApiVersion(apiKey=42, minVersion=0, maxVersion=2), ApiVersion(apiKey=43, minVersion=0, maxVersion=2), ApiVersion(apiKey=44, minVersion=0, maxVersion=1), ApiVersion(apiKey=45, minVersion=0, maxVersion=0), ApiVersion(apiKey=46, minVersion=0, maxVersion=0), ApiVersion(apiKey=47, minVersion=0, maxVersion=0), ApiVersion(apiKey=48, minVersion=0, maxVersion=1), ApiVersion(apiKey=49, minVersion=0, maxVersion=1), ApiVersion(apiKey=50, minVersion=0, maxVersion=0), ApiVersion(apiKey=51, minVersion=0, maxVersion=0), ApiVersion(apiKey=56, minVersion=0, maxVersion=3), ApiVersion(apiKey=57, minVersion=0, maxVersion=1), ApiVersion(apiKey=58, minVersion=0, maxVersion=0), ApiVersion(apiKey=60, minVersion=0, maxVersion=0), ApiVersion(apiKey=61, minVersion=0, maxVersion=0), ApiVersion(apiKey=65, minVersion=0, maxVersion=0), ApiVersion(apiKey=66, minVersion=0, maxVersion=0), ApiVersion(apiKey=67, minVersion=0, maxVersion=0)], throttleTimeMs=0, supportedFeatures=[], finalizedFeaturesEpoch=0, finalizedFeatures=[], zkMigrationReady=false)
[AdminClient clientId=adminclient-1] Node -1 has finalized features epoch: 0, finalized features: [], supported features: [], ZK migration ready: false, API versions: (Produce(0): 0 to 9 [usable: 9], Fetch(1): 0 to 15 [usable: 15], ListOffsets(2): 0 to 8 [usable: 8], Metadata(3): 0 to 12 [usable: 12], LeaderAndIsr(4): 0 to 7 [usable: 7], StopReplica(5): 0 to 4 [usable: 4], UpdateMetadata(6): 0 to 8 [usable: 8], ControlledShutdown(7): 0 to 3 [usable: 3], OffsetCommit(8): 0 to 8 [usable: 8], OffsetFetch(9): 0 to 8 [usable: 8], FindCoordinator(10): 0 to 4 [usable: 4], JoinGroup(11): 0 to 9 [usable: 9], Heartbeat(12): 0 to 4 [usable: 4], LeaveGroup(13): 0 to 5 [usable: 5], SyncGroup(14): 0 to 5 [usable: 5], DescribeGroups(15): 0 to 5 [usable: 5], ListGroups(16): 0 to 4 [usable: 4], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 7 [usable: 7], DeleteTopics(20): 0 to 6 [usable: 6], DeleteRecords(21): 0 to 2 [usable: 2], InitProducerId(22): 0 to 4 [usable: 4], OffsetForLeaderEpoch(23): 0 to 4 [usable: 4], AddPartitionsToTxn(24): 0 to 4 [usable: 4], AddOffsetsToTxn(25): 0 to 3 [usable: 3], EndTxn(26): 0 to 3 [usable: 3], WriteTxnMarkers(27): 0 to 1 [usable: 1], TxnOffsetCommit(28): 0 to 3 [usable: 3], DescribeAcls(29): 0 to 3 [usable: 3], CreateAcls(30): 0 to 3 [usable: 3], DeleteAcls(31): 0 to 3 [usable: 3], DescribeConfigs(32): 0 to 4 [usable: 4], AlterConfigs(33): 0 to 2 [usable: 2], AlterReplicaLogDirs(34): 0 to 2 [usable: 2], DescribeLogDirs(35): 0 to 4 [usable: 4], SaslAuthenticate(36): 0 to 2 [usable: 2], CreatePartitions(37): 0 to 3 [usable: 3], CreateDelegationToken(38): 0 to 3 [usable: 3], RenewDelegationToken(39): 0 to 2 [usable: 2], ExpireDelegationToken(40): 0 to 2 [usable: 2], DescribeDelegationToken(41): 0 to 3 [usable: 3], DeleteGroups(42): 0 to 2 [usable: 2], ElectLeaders(43): 0 to 2 [usable: 2], IncrementalAlterConfigs(44): 0 to 1 [usable: 1], AlterPartitionReassignments(45): 0 [usable: 0], ListPartitionReassignments(46): 0 [usable: 0], OffsetDelete(47): 0 [usable: 0], DescribeClientQuotas(48): 0 to 1 [usable: 1], AlterClientQuotas(49): 0 to 1 [usable: 1], DescribeUserScramCredentials(50): 0 [usable: 0], AlterUserScramCredentials(51): 0 [usable: 0], DescribeQuorum(55): UNSUPPORTED, AlterPartition(56): 0 to 3 [usable: 3], UpdateFeatures(57): 0 to 1 [usable: 1], Envelope(58): 0 [usable: 0], DescribeCluster(60): 0 [usable: 0], DescribeProducers(61): 0 [usable: 0], UnregisterBroker(64): UNSUPPORTED, DescribeTransactions(65): 0 [usable: 0], ListTransactions(66): 0 [usable: 0], AllocateProducerIds(67): 0 [usable: 0], ConsumerGroupHeartbeat(68): UNSUPPORTED).
[AdminClient clientId=adminclient-1] KafkaClient#poll retrieved 0 response(s)
[AdminClient clientId=adminclient-1] Trying to choose nodes for [Call(callName=listTopics, deadlineMs=1706134058350, tries=0, nextAllowedTryMs=0)] at 1706133998742
[AdminClient clientId=adminclient-1] Metadata is not ready: we have not fetched metadata from the bootstrap nodes yet.
[AdminClient clientId=adminclient-1] Unable to assign Call(callName=listTopics, deadlineMs=1706134058350, tries=0, nextAllowedTryMs=0) to a node.
[AdminClient clientId=adminclient-1] Sending MetadataRequestData(topics=[], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to xx.x.xxx:9092 (id: -1 rack: null). correlationId=1, timeoutMs=29606
[AdminClient clientId=adminclient-1] Sending METADATA request with header RequestHeader(apiKey=METADATA, apiVersion=12, clientId=adminclient-1, correlationId=1, headerVersion=2) and timeout 29606 to node -1: MetadataRequestData(topics=[], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false)
[AdminClient clientId=adminclient-1] Entering KafkaClient#poll(timeout=100)
[AdminClient clientId=adminclient-1] KafkaClient#poll retrieved 0 response(s)
[AdminClient clientId=adminclient-1] Trying to choose nodes for [Call(callName=listTopics, deadlineMs=1706134058350, tries=0, nextAllowedTryMs=0)] at 1706133998744
[AdminClient clientId=adminclient-1] Metadata is not ready: we have not fetched metadata from the bootstrap nodes yet.
[AdminClient clientId=adminclient-1] Unable to assign Call(callName=listTopics, deadlineMs=1706134058350, tries=0, nextAllowedTryMs=0) to a node.
[AdminClient clientId=adminclient-1] Entering KafkaClient#poll(timeout=100)
[AdminClient clientId=adminclient-1] Received METADATA response from node -1 for request with header RequestHeader(apiKey=METADATA, apiVersion=12, clientId=adminclient-1, correlationId=1, headerVersion=2): MetadataResponseData(throttleTimeMs=0, brokers=[MetadataResponseBroker(nodeId=0, host='xx.x.xxx', port=9092, rack=null), MetadataResponseBroker(nodeId=1, host='xx.x.xxx', port=9092, rack=null)], clusterId='xCDtlxlOT-uxomG0SSnBFQ', controllerId=0, topics=[], clusterAuthorizedOperations=-2147483648)
[AdminClient clientId=adminclient-1] KafkaClient#poll retrieved 1 response(s)
[AdminClient clientId=adminclient-1] Updating cluster metadata to Cluster(id = xCDtlxlOT-uxomG0SSnBFQ, nodes = [xx.x.xxx:9092 (id: 0 rack: null), xx.x.xxx:9092 (id: 1 rack: null)], partitions = [], controller = xx.x.xxx:9092 (id: 0 rack: null))
[AdminClient clientId=adminclient-1] Call(callName=fetchMetadata, deadlineMs=1706134028348, tries=0, nextAllowedTryMs=0) got response MetadataResponseData(throttleTimeMs=0, brokers=[MetadataResponseBroker(nodeId=0, host='xx.x.xxx', port=9092, rack=null), MetadataResponseBroker(nodeId=1, host='xx.x.xxx', port=9092, rack=null)], clusterId='xCDtlxlOT-uxomG0SSnBFQ', controllerId=0, topics=[], clusterAuthorizedOperations=-2147483648)
[AdminClient clientId=adminclient-1] Trying to choose nodes for [Call(callName=listTopics, deadlineMs=1706134058350, tries=0, nextAllowedTryMs=0)] at 1706133998769
[AdminClient clientId=adminclient-1] Metadata is ready to use.
[AdminClient clientId=adminclient-1] Found least loaded node xx.x.xxx:9092 (id: 1 rack: null) with no active connection
[AdminClient clientId=adminclient-1] Assigned Call(callName=listTopics, deadlineMs=1706134058350, tries=0, nextAllowedTryMs=0) to node xx.x.xxx:9092 (id: 1 rack: null)
Resolved host xx.x.xxx as xx.x.xxx
[AdminClient clientId=adminclient-1] Initiating connection to node xx.x.xxx:9092 (id: 1 rack: null) using address /xx.x.xxx
[AdminClient clientId=adminclient-1] Client is not ready to send to xx.x.xxx:9092 (id: 1 rack: null). Must delay 9698 ms
[AdminClient clientId=adminclient-1] Entering KafkaClient#poll(timeout=9698)
Added sensor with name node-1.requests-sent
Registered metric named MetricName [name=request-total, group=admin-client-node-metrics, description=The total number of requests sent, tags={client-id=adminclient-1, node-id=node-1}]
Registered metric named MetricName [name=request-rate, group=admin-client-node-metrics, description=The number of requests sent per second, tags={client-id=adminclient-1, node-id=node-1}]
Registered metric named MetricName [name=request-size-avg, group=admin-client-node-metrics, description=The average size of requests sent., tags={client-id=adminclient-1, node-id=node-1}]
Registered metric named MetricName [name=request-size-max, group=admin-client-node-metrics, description=The maximum size of any request sent., tags={client-id=adminclient-1, node-id=node-1}]
Added sensor with name node-1.bytes-sent
Registered metric named MetricName [name=outgoing-byte-total, group=admin-client-node-metrics, description=The total number of outgoing bytes, tags={client-id=adminclient-1, node-id=node-1}]
Registered metric named MetricName [name=outgoing-byte-rate, group=admin-client-node-metrics, description=The number of outgoing bytes per second, tags={client-id=adminclient-1, node-id=node-1}]
Added sensor with name node-1.responses-received
Registered metric named MetricName [name=response-total, group=admin-client-node-metrics, description=The total number of responses received, tags={client-id=adminclient-1, node-id=node-1}]
Registered metric named MetricName [name=response-rate, group=admin-client-node-metrics, description=The number of responses received per second, tags={client-id=adminclient-1, node-id=node-1}]
Added sensor with name node-1.bytes-received
Registered metric named MetricName [name=incoming-byte-total, group=admin-client-node-metrics, description=The total number of incoming bytes, tags={client-id=adminclient-1, node-id=node-1}]
Registered metric named MetricName [name=incoming-byte-rate, group=admin-client-node-metrics, description=The number of incoming bytes per second, tags={client-id=adminclient-1, node-id=node-1}]
Added sensor with name node-1.latency
Registered metric named MetricName [name=request-latency-avg, group=admin-client-node-metrics, description=, tags={client-id=adminclient-1, node-id=node-1}]
Registered metric named MetricName [name=request-latency-max, group=admin-client-node-metrics, description=, tags={client-id=adminclient-1, node-id=node-1}]
[AdminClient clientId=adminclient-1] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 1
[AdminClient clientId=adminclient-1] Completed connection to node 1. Fetching API versions.
[AdminClient clientId=adminclient-1] Initiating API versions fetch from node 1.
[AdminClient clientId=adminclient-1] No version information found when sending API_VERSIONS with correlation id 2 to node 1. Assuming version 3.
[AdminClient clientId=adminclient-1] Sending API_VERSIONS request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=adminclient-1, correlationId=2, headerVersion=2) and timeout 3600000 to node 1: ApiVersionsRequestData(clientSoftwareName='apache-kafka-java', clientSoftwareVersion='3.6.1')
[AdminClient clientId=adminclient-1] KafkaClient#poll retrieved 0 response(s)
[AdminClient clientId=adminclient-1] Trying to choose nodes for [] at 1706133998795
[AdminClient clientId=adminclient-1] Client is not ready to send to xx.x.xxx:9092 (id: 1 rack: null). Must delay 9223372036854775807 ms
[AdminClient clientId=adminclient-1] Entering KafkaClient#poll(timeout=29974)
[AdminClient clientId=adminclient-1] KafkaClient#poll retrieved 0 response(s)
[AdminClient clientId=adminclient-1] Trying to choose nodes for [] at 1706133998796
[AdminClient clientId=adminclient-1] Client is not ready to send to xx.x.xxx:9092 (id: 1 rack: null). Must delay 9223372036854775807 ms
[AdminClient clientId=adminclient-1] Entering KafkaClient#poll(timeout=29973)
[AdminClient clientId=adminclient-1] Received API_VERSIONS response from node 1 for request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=adminclient-1, correlationId=2, headerVersion=2): ApiVersionsResponseData(errorCode=0, apiKeys=[ApiVersion(apiKey=0, minVersion=0, maxVersion=9), ApiVersion(apiKey=1, minVersion=0, maxVersion=15), ApiVersion(apiKey=2, minVersion=0, maxVersion=8), ApiVersion(apiKey=3, minVersion=0, maxVersion=12), ApiVersion(apiKey=4, minVersion=0, maxVersion=7), ApiVersion(apiKey=5, minVersion=0, maxVersion=4), ApiVersion(apiKey=6, minVersion=0, maxVersion=8), ApiVersion(apiKey=7, minVersion=0, maxVersion=3), ApiVersion(apiKey=8, minVersion=0, maxVersion=8), ApiVersion(apiKey=9, minVersion=0, maxVersion=8), ApiVersion(apiKey=10, minVersion=0, maxVersion=4), ApiVersion(apiKey=11, minVersion=0, maxVersion=9), ApiVersion(apiKey=12, minVersion=0, maxVersion=4), ApiVersion(apiKey=13, minVersion=0, maxVersion=5), ApiVersion(apiKey=14, minVersion=0, maxVersion=5), ApiVersion(apiKey=15, minVersion=0, maxVersion=5), ApiVersion(apiKey=16, minVersion=0, maxVersion=4), ApiVersion(apiKey=17, minVersion=0, maxVersion=1), ApiVersion(apiKey=18, minVersion=0, maxVersion=3), ApiVersion(apiKey=19, minVersion=0, maxVersion=7), ApiVersion(apiKey=20, minVersion=0, maxVersion=6), ApiVersion(apiKey=21, minVersion=0, maxVersion=2), ApiVersion(apiKey=22, minVersion=0, maxVersion=4), ApiVersion(apiKey=23, minVersion=0, maxVersion=4), ApiVersion(apiKey=24, minVersion=0, maxVersion=4), ApiVersion(apiKey=25, minVersion=0, maxVersion=3), ApiVersion(apiKey=26, minVersion=0, maxVersion=3), ApiVersion(apiKey=27, minVersion=0, maxVersion=1), ApiVersion(apiKey=28, minVersion=0, maxVersion=3), ApiVersion(apiKey=29, minVersion=0, maxVersion=3), ApiVersion(apiKey=30, minVersion=0, maxVersion=3), ApiVersion(apiKey=31, minVersion=0, maxVersion=3), ApiVersion(apiKey=32, minVersion=0, maxVersion=4), ApiVersion(apiKey=33, minVersion=0, maxVersion=2), ApiVersion(apiKey=34, minVersion=0, maxVersion=2), ApiVersion(apiKey=35, minVersion=0, maxVersion=4), ApiVersion(apiKey=36, minVersion=0, maxVersion=2), ApiVersion(apiKey=37, minVersion=0, maxVersion=3), ApiVersion(apiKey=38, minVersion=0, maxVersion=3), ApiVersion(apiKey=39, minVersion=0, maxVersion=2), ApiVersion(apiKey=40, minVersion=0, maxVersion=2), ApiVersion(apiKey=41, minVersion=0, maxVersion=3), ApiVersion(apiKey=42, minVersion=0, maxVersion=2), ApiVersion(apiKey=43, minVersion=0, maxVersion=2), ApiVersion(apiKey=44, minVersion=0, maxVersion=1), ApiVersion(apiKey=45, minVersion=0, maxVersion=0), ApiVersion(apiKey=46, minVersion=0, maxVersion=0), ApiVersion(apiKey=47, minVersion=0, maxVersion=0), ApiVersion(apiKey=48, minVersion=0, maxVersion=1), ApiVersion(apiKey=49, minVersion=0, maxVersion=1), ApiVersion(apiKey=50, minVersion=0, maxVersion=0), ApiVersion(apiKey=51, minVersion=0, maxVersion=0), ApiVersion(apiKey=56, minVersion=0, maxVersion=3), ApiVersion(apiKey=57, minVersion=0, maxVersion=1), ApiVersion(apiKey=58, minVersion=0, maxVersion=0), ApiVersion(apiKey=60, minVersion=0, maxVersion=0), ApiVersion(apiKey=61, minVersion=0, maxVersion=0), ApiVersion(apiKey=65, minVersion=0, maxVersion=0), ApiVersion(apiKey=66, minVersion=0, maxVersion=0), ApiVersion(apiKey=67, minVersion=0, maxVersion=0)], throttleTimeMs=0, supportedFeatures=[], finalizedFeaturesEpoch=0, finalizedFeatures=[], zkMigrationReady=false)
[AdminClient clientId=adminclient-1] Node 1 has finalized features epoch: 0, finalized features: [], supported features: [], ZK migration ready: false, API versions: (Produce(0): 0 to 9 [usable: 9], Fetch(1): 0 to 15 [usable: 15], ListOffsets(2): 0 to 8 [usable: 8], Metadata(3): 0 to 12 [usable: 12], LeaderAndIsr(4): 0 to 7 [usable: 7], StopReplica(5): 0 to 4 [usable: 4], UpdateMetadata(6): 0 to 8 [usable: 8], ControlledShutdown(7): 0 to 3 [usable: 3], OffsetCommit(8): 0 to 8 [usable: 8], OffsetFetch(9): 0 to 8 [usable: 8], FindCoordinator(10): 0 to 4 [usable: 4], JoinGroup(11): 0 to 9 [usable: 9], Heartbeat(12): 0 to 4 [usable: 4], LeaveGroup(13): 0 to 5 [usable: 5], SyncGroup(14): 0 to 5 [usable: 5], DescribeGroups(15): 0 to 5 [usable: 5], ListGroups(16): 0 to 4 [usable: 4], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 7 [usable: 7], DeleteTopics(20): 0 to 6 [usable: 6], DeleteRecords(21): 0 to 2 [usable: 2], InitProducerId(22): 0 to 4 [usable: 4], OffsetForLeaderEpoch(23): 0 to 4 [usable: 4], AddPartitionsToTxn(24): 0 to 4 [usable: 4], AddOffsetsToTxn(25): 0 to 3 [usable: 3], EndTxn(26): 0 to 3 [usable: 3], WriteTxnMarkers(27): 0 to 1 [usable: 1], TxnOffsetCommit(28): 0 to 3 [usable: 3], DescribeAcls(29): 0 to 3 [usable: 3], CreateAcls(30): 0 to 3 [usable: 3], DeleteAcls(31): 0 to 3 [usable: 3], DescribeConfigs(32): 0 to 4 [usable: 4], AlterConfigs(33): 0 to 2 [usable: 2], AlterReplicaLogDirs(34): 0 to 2 [usable: 2], DescribeLogDirs(35): 0 to 4 [usable: 4], SaslAuthenticate(36): 0 to 2 [usable: 2], CreatePartitions(37): 0 to 3 [usable: 3], CreateDelegationToken(38): 0 to 3 [usable: 3], RenewDelegationToken(39): 0 to 2 [usable: 2], ExpireDelegationToken(40): 0 to 2 [usable: 2], DescribeDelegationToken(41): 0 to 3 [usable: 3], DeleteGroups(42): 0 to 2 [usable: 2], ElectLeaders(43): 0 to 2 [usable: 2], IncrementalAlterConfigs(44): 0 to 1 [usable: 1], AlterPartitionReassignments(45): 0 [usable: 0], ListPartitionReassignments(46): 0 [usable: 0], OffsetDelete(47): 0 [usable: 0], DescribeClientQuotas(48): 0 to 1 [usable: 1], AlterClientQuotas(49): 0 to 1 [usable: 1], DescribeUserScramCredentials(50): 0 [usable: 0], AlterUserScramCredentials(51): 0 [usable: 0], DescribeQuorum(55): UNSUPPORTED, AlterPartition(56): 0 to 3 [usable: 3], UpdateFeatures(57): 0 to 1 [usable: 1], Envelope(58): 0 [usable: 0], DescribeCluster(60): 0 [usable: 0], DescribeProducers(61): 0 [usable: 0], UnregisterBroker(64): UNSUPPORTED, DescribeTransactions(65): 0 [usable: 0], ListTransactions(66): 0 [usable: 0], AllocateProducerIds(67): 0 [usable: 0], ConsumerGroupHeartbeat(68): UNSUPPORTED).
[AdminClient clientId=adminclient-1] KafkaClient#poll retrieved 0 response(s)
[AdminClient clientId=adminclient-1] Trying to choose nodes for [] at 1706133998819
[AdminClient clientId=adminclient-1] Sending MetadataRequestData(topics=null, allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to xx.x.xxx:9092 (id: 1 rack: null). correlationId=3, timeoutMs=29950
[AdminClient clientId=adminclient-1] Sending METADATA request with header RequestHeader(apiKey=METADATA, apiVersion=12, clientId=adminclient-1, correlationId=3, headerVersion=2) and timeout 29950 to node 1: MetadataRequestData(topics=null, allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false)
[AdminClient clientId=adminclient-1] Entering KafkaClient#poll(timeout=59531)
[AdminClient clientId=adminclient-1] KafkaClient#poll retrieved 0 response(s)
[AdminClient clientId=adminclient-1] Trying to choose nodes for [] at 1706133998820
[AdminClient clientId=adminclient-1] Entering KafkaClient#poll(timeout=59530)
so any help on that or something i could discover more ?
I encountered a problem these days that mtu was changed when Kafka started. Is there any case could do this action in Kafka?
While performing a rolling upgrading in kafka, from 1.1.0 to 2.3.0, I faced this particular issue where I could see that the consumer group path in zookeeper has been changed from offset
to offsets
which is causing the entire data reprocess from the beginning.
Is there a simple solution to resolve this?
Is there a way to hardcode the zookeeper consumer path?
The topic you want to automatically rebuild does not have the previous backlog
I am using Kafka 2.0.0
There are some partitions of the __consumer_offsets topic that are 500-700 GB and more than 5000-7000 segments. These segments are older than 2-3 months.
There aren't errors in the logs and that topic is COMPACT as default.
What could be the problem?
Maybe a config or a consumer problem? or maybe a bug of kafka 2.0.0?
What checks could I do?
My settings:
log.cleaner.enable=true
log.cleanup.policy = [delete]
log.retention.bytes = -1
log.segment.bytes = 268435456
log.retention.hours = 72
log.retention.check.interval.ms = 300000
...
offsets.commit.required.acks = -1
offsets.commit.timeout.ms = 5000
offsets.load.buffer.size = 5242880
offsets.retention.check.interval.ms = 600000
offsets.retention.minutes = 10080
offsets.topic.compression.codec = 0
offsets.topic.num.partitions = 50
offsets.topic.replication.factor = 3
offsets.topic.segment.bytes = 104857600
Hi All,
I have a use case that I want to enforce the KAFKA Quotas per client but on per topic basis. The documentations say that the built-in kafka quotas cannot be applied on per topic basis, instead they will be applied on per broker basis only.
So we also have custom quota callback mechanism in place through which we can customize the Quota mechanism, as part of this KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-257+-+Configurable+Quota+Management but I can't figure out 2 things:
I am unable to figure out answers to above 2 questions and any help would be really really appreciated. Thanks in advance.
Hi,
I am using kafka appender library to push logs to logstash.
I am using kerberos authentication method to connect to kafka broker. At the start of my app, everything works fine and I am able to connect to kafka brokers. Below are few producer configs I am providing.
sasl.jaas.config=com.sun.security.auth.module.krb5LoginModule
required
ticketCache="path-to-my-ticket-cache"
useTicketCache="true"
refreshKrb5Config="true"
doNotPrompt="true"
principal="myprincipal@realm";
When I hit this command - klist -c path-to-my-ticket-cache , I see ticket is getting renewed after every 2 hours but the same renewed time is not reflecting in my java app. The app still shows the initial expiry time loaded at startup and it destroys the ticket after expiry and authentication starts failing.
Few Things I tried,
Added renewTgt=true and sasl.kerberos.kinit.cmd=/usr/bin/kinit -c path-to-my-ticket-cache -R property to producer config, it tried to renew but failed with error-IOException: error=2, No such file or directory. When i hit same command from shell, it works fine and renew ticket. how I can pass cache location parameter in this command?
How we can force the app to read the ticket from the main cache which is getting renewed but kafka client seems not aware of it?
I tried latest tier storage from apache 3.6 branch, one thing I notice is broker local segments are not cleaned up after uploaded to remote storage. This is show as local disk usage is keep increasing and local segment never get deleted. Anyone seeing similar behavior? I wonder if there are some new configurations I should add, thanks
topic configuration: bin/kafka-topics.sh --bootstrap-server kafka-test-51-kafka-bootstrap:9092 --topic dfi-test6 --create --config remote.storage.enable=true --config local.retention.bytes=104857600
local disk usage: [kafka@kafka-test-51-kafka-7 /]$ du -sh /var/lib/kafka/data-0/kafka-log7/dfi*
12G /var/lib/kafka/data-0/kafka-log7/dfi-test-10
offset 0 still exist and disk available is decreasing with incoming traffic.
Hi,
I am new to mirrormaker2, and i did some test but it seems mm2 doesn't work very well. Hope someone can help me solve the issue, thanks!
I set up two one node kafka clusters, kafka version: 3.4.0.
I created two topics (test1 , test3) in TEST1 cluster, and created one topic (test3) in TEST3 cluster.
Then I run "bin/connect-mirror-maker.sh mm2.properties", I expect there would be one new topic TEST3.test3 topic in TEST1 cluster, and there would be two new topics (TEST1.test1 and TEST1.test3) in TEST3 cluster.
However, I found mm2 created a lot duplicated TEST1.test3 in both clusters which is not correct.
in TEST1 cluster:
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
TEST1.TEST1.TEST1.TEST1.test3
TEST1.TEST1.TEST1.heartbeats
TEST1.TEST1.TEST1.test3
TEST1.TEST1.heartbeats
TEST1.TEST1.test3
TEST1.heartbeats
TEST1.test3
__consumer_offsets
heartbeats
mm2-configs.TEST3.internal
mm2-offsets.TEST3.internal
mm2-status.TEST3.internal
test1
test3
in TEST3 cluster:
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
TEST1.TEST1.TEST1.TEST1.test3
TEST1.TEST1.TEST1.heartbeats
TEST1.TEST1.TEST1.test3
TEST1.TEST1.heartbeats
TEST1.TEST1.test3
TEST1.checkpoints.internal
TEST1.heartbeats
TEST1.test3
__consumer_offsets
heartbeats
mm2-configs.TEST1.internal
mm2-configs.TEST3.internal
mm2-offset-syncs.TEST3.internal
mm2-offsets.TEST1.internal
mm2-offsets.TEST3.internal
mm2-status.TEST1.internal
mm2-status.TEST3.internal
test3
And mm2 was creating more and more TEST1.test3, then I killed mm2 process. So anyone can explain me why this happened?
Thanks!
========================= mm2 properties file=========================
clusters = TEST3, TEST1
TEST3.bootstrap.servers = 192.168.122.172:9092
TEST1.bootstrap.servers = 192.168.122.126:9092
TEST1->TEST3.enabled = true
TEST1->TEST3.topics = .*
replication.factor=1
checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1
Hi everyone
I would like to know if is there's any way to have log for per connector that's mean each connector with its own log file
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 31
Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -7843832
Getting this issue while deserializing avro msg with Generic record. while msg contain some special character. for
one type of schema. ex-
{
"op": "create",
"id": "test-l广告oc-378621",
"provenance": {
"sourceDatasetId": "TEST"
},
"properties": {
"mg_display_label": "LOCA广告广告TION",
"mg_entity_type": "location"
},
"label": "location"
}
but same process for another schema not throwing any exception.
ex-
{
"op": "crea广告te",
"id": "test-ind-lc广告-372386213",
"provenance": {
"sourceDatasetId": "TEST"
},
"properties": {
"mg_display_label": "hasLocation",
"type": "home"
},
"src": {
"id": "test-ind-378621",
"label": "individual"
},
"dst": {
"id": "test-loc-378621",
"label": "location"
},
"label": "hasLocation"
}
what could be the reason,
Can anybody help me on this. stocked here from last couple of days.
I just downloaded the official 0.6 archive:
https://github.com/downloads/kafka-dev/kafka/kafka-0.6.zip
and tried starting zookeeper / kafka.
The above archive will extract the deps into a dir called "libs", but in bin/kafka-run-class.sh there's a loop to add the jars in "lib" to the classpath:
for file in $base_dir/lib/*.jar;
do
CLASSPATH=$CLASSPATH:$file
done
when i used mirrormaker2 to sync, if the topic count is small, no issues,
but if topic count is 200+, connect.log shows
Request cannot contain more than 5 topics.
[2023-04-20 10:05:42,382] WARN [MirrorSourceConnector|worker] Could not create topic S17_inner_transcode_response. (org.apache.kafka.connect.mirror.MirrorSourceConnector:371)
java.util.concurrent.CompletionException: org.apache.kafka.common.errors.InvalidRequestException: Request cannot contain more than 5 topics.
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
at org.apache.kafka.common.internals.KafkaCompletableFuture.kafkaCompleteExceptionally(KafkaCompletableFuture.java:49)
at org.apache.kafka.common.internals.KafkaFutureImpl.completeExceptionally(KafkaFutureImpl.java:130)
at org.apache.kafka.clients.admin.KafkaAdminClient$1.handleResponse(KafkaAdminClient.java:1616)
at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.handleResponses(KafkaAdminClient.java:1255)
at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1408)
at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1331)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.kafka.common.errors.InvalidRequestException: Request cannot contain more than 5 topics.
[2023-04-20 10:05:42,382] WARN [MirrorSourceConnector|worker] Could not create topic N9M_0_MEDIASTREAMMODEL_MEDIATASKSTART. (org.apache.kafka.connect.mirror.MirrorSourceConnector:371)
I was attempting to use the StringDecoder to pull some JSON strings off a queue. During testing I am sending the same message through 1k times and see the following exception about 80% of the time:
java.nio.BufferUnderflowException
at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:127)
at kafka.serializer.StringDecoder.toEvent(Decoder.scala:34)
at com.tumblr.motherboy.workers.Consumer$$anonfun$act$1$$anonfun$apply$2$$anonfun$apply$3.apply(Main.scala:17)
at com.tumblr.motherboy.workers.Consumer$$anonfun$act$1$$anonfun$apply$2$$anonfun$apply$3.apply(Main.scala:15)
at scala.collection.Iterator$class.foreach(Iterator.scala:631)
at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:29)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
at kafka.consumer.KafkaMessageStream.foreach(KafkaMessageStream.scala:28)
at com.tumblr.motherboy.workers.Consumer$$anonfun$act$1$$anonfun$apply$2.apply(Main.scala:15)
at com.tumblr.motherboy.workers.Consumer$$anonfun$act$1$$anonfun$apply$2.apply(Main.scala:13)
at scala.actors.ReactorTask.run(ReactorTask.scala:34)
at scala.actors.ReactorTask.compute(ReactorTask.scala:66)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:147)
at scala.concurrent.forkjoin.ForkJoinTask.quietlyExec(ForkJoinTask.java:422)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.mainLoop(ForkJoinWorkerThread.java:340)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:325)
The remaining 20% of the time, the string deserializes just fine (again, all messages are identical). Discussion on IRC led me to implement my own StringDecoder
as follows:
class StringDecoder extends Decoder[String] {
def toEvent(message: Message):String = {
val buf = message.payload
val arr = new Array[Byte](buf.remaining)
buf.get(arr)
new String(arr)
}
}
The above code does not throw an exception.
Platform: OS X 10.6.8
Java Version:
java version "1.6.0_26"
Java(TM) SE Runtime Environment (build 1.6.0_26-b03-384-10M3425)
Java HotSpot(TM) 64-Bit Server VM (build 20.1-b02-384, mixed mode)
Scala Version: Scala code runner version 2.8.1.final -- Copyright 2002-2010, LAMP/EPFL
Kafka Version: master/trunk
Issue Summary:
the warning in data flow worker is misleading, I tried to dig deep into Beam KafkaIO, looks like this is a dataflow runner
What is the Business Impact you are facing?
Observed Behavior:
I m able to read the data from confluent Kafka with the provided Consumer group development.opp-ds.svc-app-event-ingest.omni-cg But the warning I'm getting is misleading but the Kafka Consumer Lag is going down as intended.
I was successfully able to read the 98,637+1,190,217 events from two different dataflow jobs and the consumer group Lag went down to Zero.
But the warning in the data flow worker is misleading, I tried to dig deep into Beam KafkaIO.
Warning :
"org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: Reader-7_offset_consumer_136115595_development.opp-ds.svc-app-event-ingest.omni-cg-kafka-test-14
Expected Behavior (jobID if applicable): We don't have a JobID but I should not be seeing these unauthorized exceptions.
Please include screenshots. Yes
If Applicable:
Hi,
After deploying a 3 node Kafka 2.4.0 cluster, with Strimzi operator in Kubernetes, we've observed that the memory consumption increases for a while and then stabilizes with that much consumed memory. Find below the screenshot for the same. And there is no traffic sent to the cluster.
And another observation is that,
Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
In my application, I get data which gets queued using kafka and saved on the disk and the consumer which gets this data from kafka and does the processing. But When my consumer is trying to read data from kafka I am getting below exceptions :
2017-06-09 10:57:24,733 ERROR NetworkClient Uncaught error in request completion:
org.apache.kafka.common.KafkaException: Error deserializing key/value for partition TcpMessage-1 at offset 155884487
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:628) ~[kafka-clients-0.9.0.1.jar:?]
at org.apache.kafka.clients.consumer.internals.Fetcher.handleFetchResponse(Fetcher.java:566) ~[kafka-clients-0.9.0.1.jar:?]
at org.apache.kafka.clients.consumer.internals.Fetcher.access$000(Fetcher.java:69) ~[kafka-clients-0.9.0.1.jar:?]
at org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:139) ~[kafka-clients-0.9.0.1.jar:?]
at org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:136) ~[kafka-clients-0.9.0.1.jar:?]
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) ~[kafka-clients-0.9.0.1.jar:?]
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) ~[kafka-clients-0.9.0.1.jar:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:380) ~[kafka-clients-0.9.0.1.jar:?]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:274) [kafka-clients-0.9.0.1.jar:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) [kafka-clients-0.9.0.1.jar:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213) [kafka-clients-0.9.0.1.jar:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) [kafka-clients-0.9.0.1.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908) [kafka-clients-0.9.0.1.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853) [kafka-clients-0.9.0.1.jar:?]
at com.affirmed.mediation.edr.kafka.tcpMessage.TcpMessageConsumer.doWork(TcpMessageConsumer.java:190) [EdrServer.jar:?]
at com.affirmed.mediation.edr.kafka.tcpMessage.TcpMessageConsumer.run(TcpMessageConsumer.java:248) [EdrServer.jar:?]
## Caused by: org.apache.kafka.common.record.InvalidRecordException: Record is corrupt (stored crc = 2016852547, computed crc = 1399853379)
at org.apache.kafka.common.record.Record.ensureValid(Record.java:226) ~[kafka-clients-0.9.0.1.jar:?]
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:617) ~[kafka-clients-0.9.0.1.jar:?]
... 15 more
Could anyone please help me with this. I got stuck with it and not able to figure out the root.
When this occurs is there any way to catch this exception and move the offset? Currently, consumer is keep polling for the same range of records in the next poll as
result never moving forward.
The kafka version is 0.10.0.1, has two way to save offset are zookeeper and kafka, depend from your kafka client. If you are choice connect kafka broker, The actually, consumer group name save in default kafka topic '__consumer_offsets', at least I think so. delete or reset offset from zookeeper is easy, but delete from '__consumer_offsets' is difficult. how to delete it from '__consumer_offsets' ?
It has been observed that after a machine/VM re-start the Kafka pods are crashing and zookeeper pod reports an Unresolved address exception when we have single zookeeper and single kafka replicas.
This is occuring after we moved from Kafka version 3.4.0 to 3.7.0 which is supported with Strimzi Operator version 0.40(0.40.0-kafka-3.7.0). In the Strimzi-operator log we can see Session lost/Expired exception.
This issue is not very consistent it happens say 6/10 times after a machine re-start but we have seen this issue only after moving kafka from 3.4.0 to a higher version and we had to do this upgrade as stimzi operator 0.40.0-kafka-3.7.0 doesn't supports using previous Kafka version 3.4.0. As this issue is more prominent with newer version of Strimzi/Kafka, please can this be looked upon ?
The workaround we used was to re-start zookeeper pods and then kafka pods(if they are not up automatically). We had to re-start zookeeper pod multiple times. Another workaround was to uninstall and re-install kafka.
Kafka pods should not crash and zookeeper should not report unresolved address exception after a machine/VM re-start
3.7.0
0.40
v1.29.1+rke2r1
Helm Chart
AWS EC2
Zookeeper Exception:
2024-05-31 06:22:46,870 INFO Created server with tickTime 500 ms minSessionTimeout 1000 ms maxSessionTimeout 10000 ms clientPortListenBacklog -1 datadir /var/lib/zookeeper/data/version-2 snapdir /var/lib/zookeeper/data/version-2 (org.apache.zookeeper.server.ZooKeeperServer) [QuorumPeermyid=1(secure=[0:0:0:0:0:0:0:0]:2181)]
2024-05-31 06:22:46,870 ERROR Couldn't bind to kafka-cluster-zookeeper-0.kafka-cluster-zookeeper-nodes.foundation-env-default.svc/:2888 (org.apache.zookeeper.server.quorum.Leader) [QuorumPeermyid=1(secure=[0:0:0:0:0:0:0:0]:2181)]
java.net.SocketException: Unresolved address
at java.base/java.net.ServerSocket.bind(ServerSocket.java:380)
at java.base/java.net.ServerSocket.bind(ServerSocket.java:342)
at org.apache.zookeeper.server.quorum.Leader.createServerSocket(Leader.java:322)
at org.apache.zookeeper.server.quorum.Leader.lambda$new$0(Leader.java:301)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at java.base/java.util.concurrent.ConcurrentHashMap$KeySpliterator.forEachRemaining(ConcurrentHashMap.java:3573)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
at org.apache.zookeeper.server.quorum.Leader.(Leader.java:304)
at org.apache.zookeeper.server.quorum.QuorumPeer.makeLeader(QuorumPeer.java:1340)
at org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:1551)
2024-05-31 06:22:46,870 WARN Unexpected exception (org.apache.zookeeper.server.quorum.QuorumPeer) [QuorumPeermyid=1(secure=[0:0:0:0:0:0:0:0]:2181)]
java.io.IOException: Leader failed to initialize any of the following sockets: [kafka-cluster-zookeeper-0.kafka-cluster-zookeeper-nodes.foundation-env-default.svc/:2888]
at org.apache.zookeeper.server.quorum.Leader.(Leader.java:307)
at org.apache.zookeeper.server.quorum.QuorumPeer.makeLeader(QuorumPeer.java:1340)
at org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:1551)
2024-05-31 06:22:46,870 INFO Peer state changed: looking (org.apache.zookeeper.server.quorum.QuorumPeer) [QuorumPeermyid=1(secure=[0:0:0:0:0:0:0:0]:2181)]
The below snippet runs without issues with scala-sdk-2.13.3
, but throws NoSuchMethodError
for scala-sdk-2.13.4
:
val authorize = new AclAuthorizer()
val acls = authorize.acls(AclBindingFilter.ANY)
The error is:
Exception in thread "main" java.lang.NoSuchMethodError: 'scala.collection.immutable.RedBlackTree$Tree scala.collection.immutable.TreeMap.scala$collection$immutable$TreeMap$$tree()'
at kafka.security.authorizer.AclAuthorizer.acls(AclAuthorizer.scala:293)
Context :
Kafka Producer 0.8.x
Problem:
Kafka Producer emits metrics regarding request size stats, request latency and rate stats.
But the inherent meaning of the these metrics are not clear. What does this measure?
Is for each producer send request(which contains batches of messages per broker)? OR Is it for a batch of messages defined according to user batching policy? What happens when some application code has multiple async producers to increase performance (how are rate and percentiles measured?)?
We have implemented fluentd-kafka plugin to collect information from kafka topics and post the formatted data to Elastic search database.
However, recently, after upgrading from kafka-6.0.0-1090 to kafka-7.0.0-1250 (these RPMs are part of our organization's internal packaging), we have observed that the custom micro-service alarm-collector build using fluentd-kafka plugin 0.16.0 version, was able to process many indexes but one of them - alarmhistoryindex - keeps failing to connect to kafka.
But after restarting the micro-service pods, the connection was successful.
We request a possible explanation for this.
This feels some bug to us - if true, may we know the possible fix please?
Thank you all in advance.
Here is the log from our microservice...
alarm-collector-belk-fluentd-statefulset-0(1).log
Hi,
I am using uReplicator with kafka client version 0.9.0.1 in production and faced "Memory records is not writable in MirrorMaker".
I understood that this issue is resolved by providing the solution for issue KAFKA-3147 and released a new version of kafka client jar "0.9.0-kafka-2.0.2".
I have updated uReplicator with kafka client jar version "0.9.0-kafka-2.0.2" and still i am facing the issue.
Could you please help me in resolving this issue.
Exception Details:
java.lang.IllegalStateException: Memory records is not writable
org.apache.kafka.common.record.MemoryRecords.append(MemoryRecords.java:93)
org.apache.kafka.clients.producer.internals.RecordBatch.tryAppend(RecordBatch.java:69)
org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:168)
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:435)
kafka.mirrormaker.MirrorMakerWorker$MirrorMakerProducer.send(MirrorMakerWorker.scala:357)
kafka.mirrormaker.MirrorMakerWorker$MirrorMakerThread.run(MirrorMakerWorker.scala:298)
Hi,
I am using kafka 0.8 in an application where the client initiates kafka message consumption. The client does not have a handle to the consumerconnector instance created in the server application. I need a way for the client to initiate a shutdown call to the server, which then tries to look up the connector by consumergroup id and invoke shutdown on the consumerconnector. Is this possible with kafka 0.8 APIs?
Hi,
I am using kafka 8.0( kafka_2.8.0-0.8.0), zookeeper 3.4.5. If kafka and zookeeper exist in same machine, it's ok.
[2014-01-17 17:55:58,468] ERROR fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(id:0,host:192.168.195.177,port:9092)] failed (kafka.utils.Utils$)
kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(id:0,host:192.168.195.177,port:9092)] failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
at kafka.utils.Utils$.swallow(Utils.scala:187)
at kafka.utils.Logging$class.swallowError(Logging.scala:105)
at kafka.utils.Utils$.swallowError(Utils.scala:46)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
at scala.collection.immutable.Stream.foreach(Stream.scala:254)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
Caused by: java.net.ConnectException: Connection refused
at sun.nio.ch.Net.connect(Native Method)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:500)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
... 12 more
[2014-01-17 17:55:58,471] ERROR Producer connection to 192.168.195.177:9092 unsuccessful (kafka.producer.SyncProducer)
java.net.ConnectException: Connection refused
at sun.nio.ch.Net.connect(Native Method)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:500)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186)
at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149)
at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
at scala.collection.immutable.Stream.foreach(Stream.scala:254)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
[2014-01-17 17:55:58,472] WARN Fetching topic metadata with correlation id 1 for topics [Set(test)] from broker [id:0,host:192.168.195.177,port:9092] failed (kafka.client.ClientUtils$)
java.net.ConnectException: Connection refused
at sun.nio.ch.Net.connect(Native Method)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:500)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186)
at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149)
at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
at scala.collection.immutable.Stream.foreach(Stream.scala:254)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
[2014-01-17 17:55:58,474] ERROR Failed to collate messages by topic, partition due to: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(id:0,host:192.168.195.177,port:9092)] failed (kafka.producer.async.DefaultEventHandler)
[2014-01-17 17:55:58,605] ERROR Producer connection to 192.168.195.177:9092 unsuccessful (kafka.producer.SyncProducer)
java.net.ConnectException: Connection refused
at sun.nio.ch.Net.connect(Native Method)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:500)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
at kafka.utils.Utils$.swallow(Utils.scala:187)
at kafka.utils.Logging$class.swallowError(Logging.scala:105)
at kafka.utils.Utils$.swallowError(Utils.scala:46)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
at scala.collection.immutable.Stream.foreach(Stream.scala:254)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
[2014-01-17 17:55:58,606] WARN Fetching topic metadata with correlation id 2 for topics [Set(test)] from broker [id:0,host:192.168.195.177,port:9092] failed (kafka.client.ClientUtils$)
java.net.ConnectException: Connection refused
at sun.nio.ch.Net.connect(Native Method)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:500)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
at kafka.utils.Utils$.swallow(Utils.scala:187)
at kafka.utils.Logging$class.swallowError(Logging.scala:105)
at kafka.utils.Utils$.swallowError(Utils.scala:46)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
at scala.collection.immutable.Stream.foreach(Stream.scala:254)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
trying to run kafka client from windows client throws exception
i am trying to run kafka client from windows maching both from java 7 and 6
throws exception :
Exception in thread "main" org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 10000
at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:876)
at org.I0Itec.zkclient.ZkClient.(ZkClient.java:98)
at org.I0Itec.zkclient.ZkClient.(ZkClient.java:84)
at kafka.producer.ZKBrokerPartitionInfo.(ZKBrokerPartitionInfo.scala:62)
at kafka.producer.Producer.(Producer.scala:47)
at kafka.javaapi.producer.Producer.(Producer.scala:33)
at kafka.javaapi.producer.Producer.(Producer.scala:40)
at kafka.examples.Test.main(Test.java:16)
Our Application is Flink application , There is a kafka connector is using internally by flink . We are using below dependency
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>1.15.0</version> </dependency>
Which contains Kafka-client 3.0.0. So Flink sink will send msg to topic by kafka producer internally , this case we are getting the below error. Because of that we are getting the delay (30-60 secs) in processing . This will causes the performance issue. This error is intermittent not every time.
Please help me out if you have any solution for this.
2022-10-19 07:33:14.240 [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-1] Got error produce response with correlation id 38 on topic-partition ccs-fsm-ingress-0, retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION. Error Message: Disconnected from node 0
I was testing Kafka on Windows and found a limitation / problem in how the classpath is constructed with the batch file "kafka-run-class.bat" (which is used when starting zookeeper/kafka). In this batch file the classpath is constructed jar by jar with the absolute path for each jar. Because of the huge number of jar files, the Windows cmd.exe environment limit of 8191 character is reached very fast under real life installations (see: https://support.microsoft.com/de-de/help/830473/command-prompt-cmd-exe-command-line-string-limitation).
If this batch-file would use the java classpath syntax with wildcards, the problem should be solvable.
e.g changing the following lines in the "kafka-run-class.bat" from
rem Classpath addition for release
for %%i in ("%BASE_DIR%\libs\*") do (
call :concat "%%i"
)
to
rem Classpath addition for release
call :concat "%BASE_DIR%\libs\*;"
would solve the problem.
Thanks,
Frank
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.