Code Monkey home page Code Monkey logo

Comments (21)

edenhill avatar edenhill commented on September 13, 2024

Can you provide a backtrace of the crash by running it in gdb?

Are you really supposed to destroy me->kafka_handler in sendmsg_kafka()? I.e., is sendmsg only called once?
If the function is called again it will use the destroyed kafka_handler pointer, which will cause a crash.
Also, before destroying your kafka handle you will need to make sure that all messages have been sent by checking
rd_kafka_outq_len(rk) == 0), something like:

while (rd_kafka_outq_len(rk) > 0)
    rd_kafka_poll(rk, 10);

from librdkafka.

xianping avatar xianping commented on September 13, 2024

Based on your suggestion, I modify the sendmsg code below( by using gdb find a stupid mistake which make process exit, I have solved it). The new issue is nothing sent out with below code. Could you give more advise?

int mercury_sendmsg_kafka(mercury_t *me, const char * topic, const char *msg)
{
if (!me || !msg) return -1;

init_error();

size_t msg_len = strlen(msg);

char *opbuf = malloc(msg_len + 1);
strcpy(opbuf, msg);

if (NULL == me->kafka_topic_handler)  {
rd_kafka_topic_conf_t * topic_conf = rd_kafka_topic_conf_new();
rd_kafka_topic_t * rkt = rd_kafka_topic_new(me->kafka_handler, topic, topic_conf);
me->kafka_topic_handler = rkt;
}
while(rd_kafka_produce(me->kafka_topic_handler, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_FREE, opbuf, msg_len, NULL, 0, NULL) == -1) {
    fprintf(stderr, "produce error; resend!");

    }
     /* Poll to handle delivery reports */
  while (rd_kafka_outq_len(me->kafka_handler) > 0){ //add here...

   //program can hit here repeatly.......
    rd_kafka_poll(me->kafka_handler, 10); 
    }


return 0;

}

from librdkafka.

edenhill avatar edenhill commented on September 13, 2024

Could you print errno if rd_kafka_produce() fails?

fprintf(stderr, "produce error: %s: resend\n", strerror(errno));

also, try enabling rdkafka debugging by setting conf property "debug" to "all":

rd_kafka_conf_set(conf, "debug", "all", errstr, sizeof(errstr));

from librdkafka.

xianping avatar xianping commented on September 13, 2024

edenhill,
There is no 'produce error' message out, but find following error message, it says dont find partition.
error messages in error.log:
%7|1389320111.305|TOPIC|rdkafka#producer-0| New local topic: item.010001
%7|1389320111.305|PART|rdkafka#producer-0| item.010001 has no partitions
adidfdf
%7|1389320111.305|QSERVE|rdkafka#producer-0| Serving 1 ops

from librdkafka.

xianping avatar xianping commented on September 13, 2024

edenhill,
I wonder if my usage in nginx is right. Or if you have examples. I tested by using one kafka server and fixed partition with 0, still encounter 'no partition' error.
thanks for help

from librdkafka.

edenhill avatar edenhill commented on September 13, 2024

Can you check with the bin/kafka-topic.sh tool (official apache kafka package) that topic item.010001 exists?

Can you provide more/all debugging output?

from librdkafka.

xianping avatar xianping commented on September 13, 2024

As its weekend I will retest next Monday. Report here later

from librdkafka.

xianping avatar xianping commented on September 13, 2024

There is no corresponding topic in kafka server. Restested then following error messages generated by nginx ( I thought they are from libkafka client):
%7|1389599212.380|TOPIC|rdkafka#producer-0| New local topic: m.100000
%7|1389599212.380|METADATA|rdkafka#producer-0| 192.168.195.178:9092/bootstrap: Request metadata for m.100000: leader query
%7|1389599212.380|METADATA|rdkafka#producer-0| 192.168.195.178:9092/bootstrap: Request metadata: scheduled: not in broker thread
%7|1389599212.380|PART|rdkafka#producer-0| m.100000 has no partitions
%equest metadata: scheduled: not in broker threadhas no partitions

It seems that client can't get the leader partition.

Alternative experiment:
For each request in nginx, we create a new kafka handle, then add brokers and send message if a new topic arrives, we can produce message to the topic, but memory usage keeps hiking up, no release. So I wonder if this is right way to initialise handle and add brokers once in nignx. Could you give more guide?

from librdkafka.

edenhill avatar edenhill commented on September 13, 2024

Do you have auto.create.topics.enable=true configured on your broker(s)?

Can you provide some more debug log output?

from librdkafka.

edenhill avatar edenhill commented on September 13, 2024

Creating a new kafka handle for each new connection in nginx seems kind of expensive since it will:

  • connect to each of the provided/configured bootstrap brokers
  • request metadata from each bootstrap broker
  • connect once to each broker reported in the metadata info.

So there is at least B+N (B=bootstrap brokers, N=brokers) connections made and similar number of metadata requests before it will be able to start producing messages.

After that your message will be sent to one of the brokers and it will wait for a reply (if request.required.acks is not 0).

I dont know the internals of nginx, but would it be possible to share the kafka and topic handles between requests?
The rdkafka API is thread safe so you can easily produce from different threads.

from librdkafka.

xianping avatar xianping commented on September 13, 2024

Only the first record can be written to topic.
In kafka server.log, the bufferunderflowexception keep writing, see below:

[2014-01-14 17:06:54,232] INFO New leader is 2 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2014-01-14 17:06:54,235] INFO [ReplicaFetcherManager on broker 2] Removing fetcher for partition test,0
[2014-01-14 17:06:54,283] INFO [Kafka Server 2], Started (kafka.server.KafkaServer)
[2014-01-14 17:06:54,297] INFO [ReplicaFetcherManager on broker 2] Removing fetcher for partition m.100000,0
[2014-01-14 17:06:54,299] INFO [Replica Manager on Broker 2]: Handled leader and isr request Name:LeaderAndIsrRequest;Version:0;Controller:2;ControllerEpoch:3;Correlati
onId:0;ClientId:id_2-host_127.0.0.1-port_9092;PartitionState:(test,0) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:1),AllRepl
icas:2),(m.100000,0) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:1),AllReplicas:2);Leaders:id:2,host:127.0.0.1,port:9092 (ka
fka.server.ReplicaManager)
[2014-01-14 17:09:27,015] ERROR Closing socket for /127.0.0.1 because of error (kafka.network.Processor)
java.nio.BufferUnderflowException
at java.nio.Buffer.nextGetIndex(Buffer.java:480)
at java.nio.HeapByteBuffer.getShort(HeapByteBuffer.java:285)
at kafka.api.ApiUtils$.readShortString(ApiUtils.scala:34)
at kafka.api.TopicMetadataRequest$$anonfun$readFrom$1.apply(TopicMetadataRequest.scala:44)
at kafka.api.TopicMetadataRequest$$anonfun$readFrom$1.apply(TopicMetadataRequest.scala:43)
at scala.collection.immutable.Range$ByOne$class.foreach(Range.scala:285)
at scala.collection.immutable.Range$$anon$2.foreach(Range.scala:265)
at kafka.api.TopicMetadataRequest$.readFrom(TopicMetadataRequest.scala:43)
at kafka.api.RequestKeys$$anonfun$4.apply(RequestKeys.scala:37)
at kafka.api.RequestKeys$$anonfun$4.apply(RequestKeys.scala:37)
at kafka.network.RequestChannel$Request.(RequestChannel.scala:49)
at kafka.network.Processor.read(SocketServer.scala:353)
at kafka.network.Processor.run(SocketServer.scala:245)
at java.lang.Thread.run(Thread.java:662)

In nginx logs ,there are still many 'has no partitions' for new coming topics, only the first record (with topic) can write rightly. see below:
%7|1389690565.986|BROKER|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: Added new broker with NodeId -1
%7|1389690565.986|BRKMAIN|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: Enter main broker thread
mercury_sendmsg_kafka:m.100000
%7|1389690565.986|CONNECT|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: broker in state DOWN connecting
%7|1389690565.986|TOPIC|rdkafka#producer-0| New local topic: m.100000
%7|1389690565.986|PART|rdkafka#producer-0| m.100000 has no partitions
%7|1389690565.986|QSERVE|rdkafka#producer-0| Serving 1 ops
%7|1389690565.986|STATE|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: Broker changed state DOWN -> CONNECTING
%7|1389690565.987|CONNECTED|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: connected to localhost:9092
%7|1389690565.987|STATE|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: Broker changed state CONNECTING -> UP
%7|1389690565.987|METADATA|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: Request metadata for all topics: connected
%7|1389690565.987|METADATA|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: Requesting metadata for all topics
%7|1389690565.987|SEND|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: Sent 1 bufs
%7|1389690565.987|SEND|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: Sent 0 bufs
%7|1389690566.011|METADATA|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: ===== Received metadata from 127.0.0.1:9092/bootstrap =====
%7|1389690566.011|METADATA|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: 1 brokers
%7|1389690566.011|METADATA|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: Broker #0/1: 127.0.0.1:9092 NodeId 2
%7|1389690566.012|BROKER|rdkafka#producer-0| 127.0.0.1:9092/2: Added new broker with NodeId 2
%7|1389690566.012|BRKMAIN|rdkafka#producer-0| 127.0.0.1:9092/2: Enter main broker thread
%7|1389690566.012|PARTCNT|rdkafka#producer-0| Topic m.100000 partition count changed from 0 to 1
%7|1389690566.012|CONNECT|rdkafka#producer-0| 127.0.0.1:9092/2: broker in state DOWN connecting
%7|1389690566.012|METADATA|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: Topic #0/2: m.100000 partition 0 Leader 2
%7|1389690566.012|BRKDELGT|rdkafka#producer-0| Broker 127.0.0.1:9092/2 is now leader for topic m.100000 [0] with 0 messages (0 bytes) queued
%7|1389690566.012|PARTCNT|rdkafka#producer-0| Partitioning 1 unassigned messages in topic m.100000 to 1 partitions
%7|1389690566.012|UAS|rdkafka#producer-0| 1/1 messages were partitioned
%7|1389690566.012|PARTCNT|rdkafka#producer-0| Ignore unknown topic test
%7|1389690566.012|METADATA|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: Topic #1/2: test partition 0 Leader 2
%7|1389690566.012|STATE|rdkafka#producer-0| 127.0.0.1:9092/2: Broker changed state DOWN -> CONNECTING
%7|1389690566.012|CONNECTED|rdkafka#producer-0| 127.0.0.1:9092/2: connected to localhost:9092
%7|1389690566.012|STATE|rdkafka#producer-0| 127.0.0.1:9092/2: Broker changed state CONNECTING -> UP
%7|1389690566.012|METADATA|rdkafka#producer-0| 127.0.0.1:9092/2: Request metadata for all topics: connected
%7|1389690566.012|METADATA|rdkafka#producer-0| 127.0.0.1:9092/2: Requesting metadata for all topics
%7|1389690566.012|TOPPAR|rdkafka#producer-0| 127.0.0.1:9092/2: m.100000 [0] 1+0 msgs
.............................
.................................
%7|1389690566.040|TOPIC|rdkafka#producer-0| New local topic: www.100000
%7|1389690566.040|METADATA|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: Request metadata for www.100000: leader query
%7|1389690566.040|METADATA|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: Request metadata: scheduled: not in broker thread
%7|1389690566.040|PART|rdkafka#producer-0| www.100000 has no partitions
%7|1389690566.986|METADATA|rdkafka#producer-0| rd_kafka_topic_scan_all partition_cnt = 0
%7|1389690566.986|METADATA|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: Request metadata for www.100000: leader query
%7|1389690566.986|METADATA|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: Request metadata: scheduled: not in broker thread
%7|1389690566.986|QSERVE|rdkafka#producer-0| Serving 1 ops
%7|1389690567.013|BRKOP|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: Serve broker op type 4
%7|1389690567.013|METADATA|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: Request metadata for www.100000:
%7|1389690567.013|METADATA|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: Requesting metadata for known topics
%7|1389690567.013|BRKOP|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: Serve broker op type 4
%7|1389690567.013|METADATA|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: Request metadata for www.100000:
%7|1389690567.013|METADATA|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: Requesting metadata for known topics
%7|1389690567.013|SEND|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: Sent 2 bufs
%7|1389690567.013|SEND|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: Sent 0 bufs
%7|1389690567.017|BROKERFAIL|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: failed: err: Local: Broker transport failure: (errno: Resource temporarily unavailable)
%7|1389690567.017|STATE|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: Broker changed state UP -> DOWN
%3|1389690567.017|FAIL|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: 127.0.0.1:9092/bootstrap: Receive failed: Disconnected
%7|1389690567.018|BUFQ|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: Purging bufq
%7|1389690567.018|METADATA|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: ===== Received metadata from 127.0.0.1:9092/bootstrap =====
%4|1389690567.018|METADATA|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: Metadata request failed: -195
%7|1389690567.018|METADATA|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: ===== Received metadata from 127.0.0.1:9092/bootstrap =====
%4|1389690567.018|METADATA|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: Metadata request failed: -195
%7|1389690567.018|BUFQ|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: Purging bufq
%7|1389690567.018|METADATA|rdkafka#producer-0| 127.0.0.1:9092/2: Request metadata for locally known topics: leader query
%7|1389690567.018|METADATA|rdkafka#producer-0| 127.0.0.1:9092/2: Request metadata: scheduled: not in broker thread
%7|1389690567.018|CONNECT|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: broker in state DOWN connecting
%7|1389690567.018|STATE|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: Broker changed state DOWN -> CONNECTING
%7|1389690567.018|QSERVE|rdkafka#producer-0| Serving 1 ops
ERROR CALLBACK: rdkafka#producer-0: Local: Communication failure with broker: 127.0.0.1:9092/bootstrap: Receive failed: Disconnected
%7|1389690567.018|CONNECTED|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: connected to localhost:9092
%7|1389690567.018|STATE|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: Broker changed state CONNECTING -> UP
%7|1389690567.018|METADATA|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: Request metadata for all topics: connected
%7|1389690567.018|METADATA|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: Requesting metadata for all topics
%7|1389690567.018|SEND|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: Sent 1 bufs
%7|1389690567.018|SEND|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: Sent 0 bufs
%7|1389690567.022|METADATA|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: ===== Received metadata from 127.0.0.1:9092/bootstrap =====
%7|1389690567.022|METADATA|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: 1 brokers
%7|1389690567.022|METADATA|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: Broker #0/1: 127.0.0.1:9092 NodeId 2
%7|1389690567.022|PARTCNT|rdkafka#producer-0| No change in partition count for topic m.100000
%7|1389690567.022|METADATA|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: Topic #0/2: m.100000 partition 0 Leader 2
%7|1389690567.022|TOPICUPD|rdkafka#producer-0| No leader change for topic m.100000 [0] with leader 2
%7|1389690567.022|PARTCNT|rdkafka#producer-0| Partitioning 0 unassigned messages in topic m.100000 to 1 partitions
%7|1389690567.022|UAS|rdkafka#producer-0| 0/0 messages were partitioned

BTW, I use one nginx process to test for simplicity. Why the resting records can't get topic metadata?

from librdkafka.

xianping avatar xianping commented on September 13, 2024

I modify my code for above test as follows:
int mercury_sendmsg_kafka(mercury_t *me, const char * topic, const char *msg)
{
if (!me || !msg) return -1;
mercury_add_kafkabroker(me, "127.0.0.1:9092");
fprintf(stderr,"mercury_sendmsg_kafka:%s\n ",topic);

    init_error();

    size_t msg_len = strlen(msg);

    char *opbuf = malloc(msg_len + 1);
    strcpy(opbuf, msg);

    rd_kafka_topic_conf_t * topic_conf = rd_kafka_topic_conf_new();
    rd_kafka_topic_t * rkt = rd_kafka_topic_new(me->kafka_handler, topic, topic_conf);
    me->kafka_topic_handler = rkt;

    while(rd_kafka_produce(me->kafka_topic_handler, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_FREE, opbuf, msg_len, NULL, 0, NULL) == -1) {
        //fprintf(stderr, "produce error; resend!");
        //test xp...
        fprintf(stderr, "produce error: %s: resend\n", strerror(errno));
        /* Poll to handle delivery reports */

    }
    //rd_kafka_poll(me->kafka_handler, 0);
    while (rd_kafka_outq_len(me->kafka_handler) > 0){
          rd_kafka_poll(me->kafka_handler, 10);
    }
    if (NULL != me->kafka_topic_handler)  {
            rd_kafka_topic_destroy(me->kafka_topic_handler);
        //rd_kafka_destroy(me->kafka_handler);
    }


    return 0;

}

int mercury_add_kafkabroker(mercury_t *me, const char *kafka_brokers)
{
if(NULL != me->kafka_handler){
return 0;
}

    if (!me || !kafka_brokers) return -1;

    init_error();

    char errstr[100];
    char test_errstr[200];
    rd_kafka_conf_t *conf = rd_kafka_conf_new();
    rd_kafka_conf_set_error_cb(conf, err_cb);
    rd_kafka_conf_set_dr_cb(conf, msg_delivered);
    //test xp..
    rd_kafka_conf_set(conf, "debug", "all", test_errstr, sizeof(test_errstr));

    char err[100];
    rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
    me->kafka_handler = rk;

   if (rk == NULL) {
            set_error("rd_kafka_new fail");
            return -1;
    }

    /* Add brokers */
    if (rd_kafka_brokers_add(me->kafka_handler, kafka_brokers) == 0) {
            fprintf(stderr, "%% No valid brokers specified\n");
            return -1;
    }
    return 0;

}

from librdkafka.

edenhill avatar edenhill commented on September 13, 2024

Thanks for the extended debug info.
What version of librdkafka are you using? latest master?
There was a bug in 0.8.1 where some metadata requests would be badly formatted and cause an exception on the broker.

It was fixed in 16915a2

In any case using the latest master is favourable.

from librdkafka.

xianping avatar xianping commented on September 13, 2024

we use kafka_2.8.0-0.8.0 and librdkafka 0.8 version. which version should be ok?

from librdkafka.

xianping avatar xianping commented on September 13, 2024

when using master code, after 'make' ,there is no *.so in current path. Errors in makefile?

from librdkafka.

edenhill avatar edenhill commented on September 13, 2024

I would like you to try the master branch.

Uhm, there should be .so files built.
What platform are you building on?
Can you provide the output of 'make clean all' ?

from librdkafka.

xianping avatar xianping commented on September 13, 2024

CentOS release 6.3 (Final)
-rw-r--r--. 1 root root 723784 Jan 15 09:49 librdkafka.a
-rw-r--r--. 1 root root 83344 Jan 15 09:49 snappy.o
-rw-r--r--. 1 root root 2849 Jan 15 09:49 snappy.d
-rw-r--r--. 1 root root 12592 Jan 15 09:49 rdlog.o
-rw-r--r--. 1 root root 3931 Jan 15 09:49 rdlog.d
-rw-r--r--. 1 root root 18680 Jan 15 09:49 rdqueue.o
-rw-r--r--. 1 root root 3873 Jan 15 09:49 rdqueue.d
-rw-r--r--. 1 root root 22400 Jan 15 09:49 rdthread.o
-rw-r--r--. 1 root root 4026 Jan 15 09:49 rdthread.d
-rw-r--r--. 1 root root 7736 Jan 15 09:49 rdrand.o
-rw-r--r--. 1 root root 3243 Jan 15 09:49 rdrand.d
-rw-r--r--. 1 root root 19288 Jan 15 09:49 rdaddr.o
-rw-r--r--. 1 root root 4105 Jan 15 09:49 rdaddr.d
-rw-r--r--. 1 root root 11272 Jan 15 09:49 rdgz.o
-rw-r--r--. 1 root root 3321 Jan 15 09:49 rdgz.d
-rw-r--r--. 1 root root 6560 Jan 15 09:49 rdcrc32.o
-rw-r--r--. 1 root root 1501 Jan 15 09:49 rdcrc32.d
-rw-r--r--. 1 root root 63656 Jan 15 09:49 rdkafka_defaultconf.o
-rw-r--r--. 1 root root 4820 Jan 15 09:49 rdkafka_defaultconf.d
-rw-r--r--. 1 root root 81232 Jan 15 09:49 rdkafka_topic.o
-rw-r--r--. 1 root root 4881 Jan 15 09:49 rdkafka_topic.d
-rw-r--r--. 1 root root 44280 Jan 15 09:49 rdkafka_msg.o
-rw-r--r--. 1 root root 4861 Jan 15 09:49 rdkafka_msg.d
-rw-r--r--. 1 root root 234704 Jan 15 09:49 rdkafka_broker.o
-rw-r--r--. 1 root root 5757 Jan 15 09:49 rdkafka_broker.d
-rw-r--r--. 1 root root 114056 Jan 15 09:49 rdkafka.o
-rw-r--r--. 1 root root 4933 Jan 15 09:49 rdkafka.d

from librdkafka.

edenhill avatar edenhill commented on September 13, 2024

Thanks.
Can you provide the output from 'uname -s' ?

And also preferably the output from: 'make clean all'

from librdkafka.

edenhill avatar edenhill commented on September 13, 2024

Issue #61 regarding .so generation is now fixed

from librdkafka.

xianping avatar xianping commented on September 13, 2024

Thanks for your patient, it can work now by using master branch.
One more question, in above 'mercury_sendmsg_kafka' function code, it will be called once for each message arrival. I think creating and destroying topic handler for each message is a heavy way. Because we have limit topic number, any way we can reuse the topic handler? I think this will be more effective, right?

Thanks

from librdkafka.

edenhill avatar edenhill commented on September 13, 2024

You should typically hold on to the rkt (rd_kafka_topic_t *) and reuse it for producing your messages, so I would store it with the rk pointer (rd_kafka_t *) in your own struct and then reuse it as necessary.

from librdkafka.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.