Code Monkey home page Code Monkey logo

librdkafka's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

librdkafka's Issues

Question about error call backs

Hi,

I've registered for error call backs (and have done so as early in the program as possible), but I find that if I give an intentionally wrong broker hostname, I don't get called back for it, until I attempt to publish. However, I can see with debug turned on, that the library knows that the host is wrong much earlier (and reports this in the log). Is there a way to have this called back to my error handler so that I can know earlier and exit/complain?

Thanks.

Program received signal SIGPIPE, Broken pipe.

I'm getting the following when trying the example on Ubuntu 12.10:
f
Kafka 0.7.2 (installed from https://github.com/downloads/wmf-analytics/kafka-debian/kafka_0.7.2_all.deb)

Program received signal SIGPIPE, Broken pipe.
[Switching to Thread 0xb7dc8b40 (LWP 8168)]
0xb7fdd424 in __kernel_vsyscall ()
(gdb) bt
#0  0xb7fdd424 in __kernel_vsyscall ()
#1  0xb7fabf08 in sendmsg () from /lib/i386-linux-gnu/libpthread.so.0
#2  0x0804a7ca in rd_kafka_send (rk=0x80518f8, msg=<optimized out>)
    at rdkafka.c:413
#3  0x0804a95a in rd_kafka_send_request (rk=rk@entry=0x80518f8, 
    msgtype=msgtype@entry=0, topicpart=0xb7dc7124) at rdkafka.c:477
#4  0x0804aca6 in rd_kafka_produce_send (rko=0x8051d70, rk=0x80518f8)
    at rdkafka.c:862
#5  rd_kafka_wait_op (rk=<optimized out>) at rdkafka.c:909
#6  rd_kafka_thread_main (arg=0x80518f8) at rdkafka.c:984
#7  0xb7fa4d4c in start_thread () from /lib/i386-linux-gnu/libpthread.so.0
#8  0xb7ec0d3e in clone () from /lib/i386-linux-gnu/libc.so.6

librdkafka 0.8 logger callback error

Hi, Magnus.
librdkafka 0.8 provides a callback for logger. The comment of function "rd_kafka_set_logger" mentions that "Alternatively the application may provide its own logger callback".
However, when I provide my logger callback, some compiler errors occur.

error: invalid use of incomplete type ‘const rd_kafka_t {aka const struct rd_kafka_s}’
/usr/local/include/librdkafka/rdkafka.h:59:16: error: forward declaration of ‘const rd_kafka_t {aka const struct rd_kafka_s}’

I wonder whether I use logger callback correctly.

Thanks.

It's possible to compile the library under cygwin?

Once the -Werror option was removed from Makefiles to avoid some warnings as errors (see below), when trying to compile the library under cygwin I obtain the following error:

$ make all
cc -MD -MP -O2 -Wall -Wfloat-equal -Wpointer-arith -fPIC -I. -g -Wno-gnu-designator -DSG -c rdkafka.c
rdkafka.c:1:0: warning: -fPIC ignored for target (all code is position independent) [enabled by default]
rdkafka.c: In function 'rd_kafka_q_serve':
rdkafka.c:356:2: error: 'rd_kafka_op_t' has no member named 'tqh_first'
rdkafka.c:356:2: error: 'rko_link' undeclared (first use in this function)
rdkafka.c:356:2: note: each undeclared identifier is reported only once for each function it appears in
rdkafka.c:356:2: error: expected identifier before '&' token
rdkafka.c:356:2: warning: left-hand operand of comma expression has no effect [-Wunused-value]
rdkafka.c: In function 'rd_kafka_destroy':
rdkafka.c:518:2: error: 'rd_kafka_topic_t' has no member named 'tqh_first'
rdkafka.c:518:2: error: 'rkt_link' undeclared (first use in this function)
rdkafka.c:518:2: error: expected identifier before '&' token
rdkafka.c:518:2: warning: left-hand operand of comma expression has no effect [-Wunused-value]
rdkafka.c: At top level:
cc1: warning: unrecognized command line option "-Wno-gnu-designator" [enabled by default]
Makefile:39: recipe for target `rdkafka.o' failed
make: *** [rdkafka.o] Error 1

Tools version are:

  • CYGWIN_NT-6.1 1.7.25(0.270/5/3) i686 Cygwin
  • gcc (GCC) 4.7.3

Minor fix to sample program

Error should say 'failed to create new consumer rather than producer'.

/* Create Kafka handle */
if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf,
errstr, sizeof(errstr)))) {
fprintf(stderr,
"%% Failed to create new producer: %s\n",
errstr);
exit(1);
}

Librdkafka does not compile on OSX 10.8.5

Hi
I try to build the library in my Mac, but it failed. My mac info is:

Configured with: --prefix=/Applications/Xcode.app/Contents/Developer/usr --with-gxx-include-dir=/usr/include/c++/4.2.1
Apple LLVM version 5.0 (clang-500.2.79) (based on LLVM 3.3svn)
Target: x86_64-apple-darwin12.5.0
Thread model: posix

the error message is:

cc -MD -MP -O2 -Wall -Werror -Wfloat-equal -Wpointer-arith -fPIC -I. -g -DSG -c rdkafka.c
rdkafka.c:1064:29: error: use of GNU old-style field designator extension [-Werror,-Wgnu-designator]
        struct consume_ctx ctx = { consume_cb: consume_cb, opaque: opaque };
                                   ^~~~~~~~~~~
                                   .consume_cb = 
rdkafka.c:1064:53: error: use of GNU old-style field designator extension [-Werror,-Wgnu-designator]
        struct consume_ctx ctx = { consume_cb: consume_cb, opaque: opaque };
                                                           ^~~~~~~
                                                           .opaque = 
2 errors generated.
make: *** [rdkafka.o] Error 1

Can you explain a bit about the 'key' passed by the producer?

I know it can be used by the partitioner, but I'm also wondering what other value it has - I read that it is also passed to the consumer. Could this be considered like a 'header field'? (thinking like a JMS Header field). Is there only a single key that can be passed? If so, I guess I would have to make it somewhat of a compound/concatenated/delimited one if I want to have a bunch of properties passed?

Failed to decompress gzip

I'm setting gzip in my publisher and am receiving this in rdkafka_example:

1389132516.572 RDKAFKA-7-GZIP: rdkafka#consumer-0: d145931-002.masked.com:5757/0: Failed to decompress Gzip message at offset 3145 of 216 bytes: ignoring message

I get this whether -z gzip is set or not. Any idea?

make error: rdkafka_example

The make rdkafka_performance works fine, but
make rdkafka_example gives this:
cc1: warnings being treated as errors
rdkafka_example.c: In function ‘hexdump’:
rdkafka_example.c:57: error: format ‘%lu’ expects type ‘long unsigned int’, but argument 4 has type ‘size_t’
make: *** [rdkafka_example] Error 1

Suggestion? (I don't do a lot of C work)

Thanks,
Chris

Is compression in fact supported?

The introduction seems to indicate yes, but the rdkafka_int.h indicates no/not supported in the enum definition of rd_kafka_compression_t. Hoping that it's simply outdated?

metadata req flood if no leader

If there is no leader for a wanted partition a new metadata request will be sent immediately, which will detect there is no leader for the wanted partition and send a new metadata request... goto 10.

This causes a steady flood of metadata requests to the broker.

RPM packaging

Proper RPM packaging for librdkafka would be good.

kafka v0.6 support

Hi edenhill.

I'm trying to send messages between a producer and a consumer version of rdkafka_example through a kafka server, and no messages are given to the consumer. Kafka's version is 0.6, and I think that the problem right there.

Is there a way to send messages using this version of kafka using your library? If not, can I download a previous version or another library in C to do so? The language is very important because I want integrate kafka in a existing project.

Thanks and regards

Error when compiling against librdkafka with g++ (as opposed to gcc), even with extern "C"

Hi,

I had to make a change to get a C++ program to compile against librdkafka even with extern "C" of the header file. I think the error is less about C vs. C++ though but perhaps related to more strict error checking.

rd_kafka_message_errstr returns a const char *, however, it is returning the rkmessage->payload, which is a void *. I had to add cast it to const char * in the return statement to get it to compile.

Build Fails on Centos6.4

Is librdkakfa ready for use w/ Kafka 0.8.x?

I'm unable to build from master which is currently at: a29956d

[david@ops-1 librdkafka{master}]$ make clean ;  make all 
rm -f rdkafka.o rdkafka_broker.o rdkafka_msg.o rdkafka_topic.o rdkafka_defaultconf.o rdcrc32.o rdgz.o rdaddr.o rdrand.o rdthread.o rdqueue.o rdlog.o rdkafka.d rdkafka_broker.d rdkafka_msg.d rdkafka_topic.d rdkafka_defaultconf.d rdcrc32.d rdgz.d rdaddr.d rdrand.d rdthread.d rdqueue.d rdlog.d \
        librdkafka*.a librdkafka*.so librdkafka*.so.1
cc -MD -MP -O2 -Wall -Werror -Wfloat-equal -Wpointer-arith -fPIC -I. -g -rdynamic -c rdkafka.c
In file included from rdkafka.c:37:
rdkafka_int.h:288: error: redefinition of typedef ‘rd_kafka_topic_t’
rdkafka.h:60: note: previous declaration of ‘rd_kafka_topic_t’ was here
rdkafka_int.h:383: error: redefinition of typedef ‘rd_kafka_t’
rdkafka.h:59: note: previous declaration of ‘rd_kafka_t’ was here
make: *** [rdkafka.o] Error 1

When I check out the 0.7 branch, I can build without errors.

Does the library support broker recover back

Hi,
In my case, when the kafka cluster is shutdown, and the producer use the librdkafka still try to send the log to kafka, it will failed.
but When I start the Kafka cluster, the producer also can't send the log to kafka, and I found the socket in producer is "CLOSE_WAIT" status

[[email protected] logs]$ netstat -an|grep 9093
tcp        1      0 10.96.250.211:43934         10.99.116.53:9093           CLOSE_WAIT  
tcp        1      0 10.96.250.211:50729         10.99.116.54:9093           CLOSE_WAIT  
tcp        1      0 10.96.250.211:45626         10.99.116.55:9093           CLOSE_WAIT  
tcp        1      0 10.96.250.211:45625         10.99.116.55:9093           CLOSE_WAIT  
tcp        1      0 10.96.250.211:43951         10.99.116.53:9093           CLOSE_WAIT  

Does the librdlibary has the feature to support checking broker alive? Thanks~!

rd_kafka_destroy doesn't seem to close the socket properly

I'm using this library in the end of message callback of a milter application. I've got a JSON object of the email message, and I need to hand it off to Kafka before I delete it from the Postfix queue. Here is the code:

  rk=rd_kafka_new(RD_KAFKA_PRODUCER, kafka_url, NULL);
  if(rk != NULL) {
    rd_kafka_produce(rk,"Trout-test",0,RD_KAFKA_OP_F_FREE,scratch,strlen(scratch));
    fprintf(logfile,"TID: %ld :: %ld :: scmfi_eom - Sent %d bytes to Trout-test:0\n",(long int)syscall(SYS_gettid),time(NULL),strlen(scratch));
    while (rd_kafka_outq_len(rk) > 0) {                                    /* Pulled this from the rdkafka_example */
            usleep(50000);
    }
    fprintf(logfile,"TID: %ld :: %ld :: scmfi_eom - kafka outq is now 0\n",(long int)syscall(SYS_gettid),time(NULL));
    //free(scratch);                                                         /* the rd_kafka_produce call is freeing scratch (RD_KAFKA_OP_F_FREE) */
    usleep(500000);
    rd_kafka_destroy(rk);                                                  /* Destroy the Kafka handle */
  }

When I run this code, everything works fine, until I've sent in about 1000 messages through the MTA. At that point, the rd_kafka_new started to fail with this message: Failed to create inet socket: Too many open files

So I upped my open files with ulimit to a number greater than 200000 (I was sending in batches of 100000 messages), and then it started failing at around 30000 messages because there were no more ephemeral sockets available to make connections to the broker.

When I look at the source, I see the close call on the socket, but when I follow the execution with lsof, or just netstat, the sockets are all established. Am I using the rd_kafka_new, rd_kafka_produce, rd_kafka_destroy improperly (once for each message), or is this an actual problem?

Thank you,
Paul

No error callback and hanging when publishing to non existent partition

my topic is defined as having 10 partitions. (0-9) When I attempt to send to partition 10, I expected to get an error callback. However, the poll (which I set to -1) just hangs. Shouldn't I get an error callback? I know the API is aware this is a bad partition because my log callback is being called and includes this detail:
27-Dec-2013 11:44:21:918 [T07][3828] kafka/kafka.cc:23 fac:'PART' name:'LARA#producer-0' log:'LaraReplicator_kafkacluster2 partition [10] not currently available'^M

static linking of librdkafka.a needs dynamic pthreads linking

Seeing this on Red Hat Linux 6. It's possible there's something in my code leading up to it, but the stack trace seems to show only librdkafka code:
#0 0x0000003456832885 in raise () from /lib64/libc.so.6

Missing separate debuginfos, use: debuginfo-install glibc-2.12-1.47.el6_2.9.x86_64 libgcc-4.4.6-3.el6.x86_64 libstdc++-4.4.6-3.el6.x86_64 zlib-1.2.3-27.el6.x86_64
(gdb) where
#0 0x0000003456832885 in raise () from /lib64/libc.so.6
#1 0x0000003456834065 in abort () from /lib64/libc.so.6
#2 0x000000345686f977 in __libc_message () from /lib64/libc.so.6
#3 0x0000003456875296 in malloc_printerr () from /lib64/libc.so.6
#4 0x0000003456865a1d in fclose@@GLIBC_2.2.5 () from /lib64/libc.so.6
#5 0x00000034584047fa in _nss_files_gethostbyname2_r () from /lib64/libnss_files.so.2
#6 0x0000003456900a39 in gethostbyname2_r@@GLIBC_2.2.5 () from /lib64/libc.so.6
#7 0x00000034568ce9f6 in gaih_inet () from /lib64/libc.so.6
#8 0x00000034568d0170 in getaddrinfo () from /lib64/libc.so.6
#9 0x00007f288c761d8c in rd_getaddrinfo (nodesvc=, defsvc=0x7f287c0008e0 "5757", flags=32, family=,

socktype=<value optimized out>, protocol=<value optimized out>, errstr=0x7f2877ffeda8) at rdaddr.c:161

#10 0x00007f288c757d46 in rd_kafka_broker_resolve (rkb=0x1d15fe0) at rdkafka_broker.c:432
#11 0x00007f288c75e03b in rd_kafka_broker_connect (arg=0x1d15fe0) at rdkafka_broker.c:1292
#12 rd_kafka_broker_thread_main (arg=0x1d15fe0) at rdkafka_broker.c:3041
#13 0x00000034570077f1 in start_thread () from /lib64/libpthread.so.0
#14 0x00000034568e5ccd in clone () from /lib64/libc.so.6

Valgrind shows:

==24258== Thread 3:^M
==24258== Invalid read of size 8^M
==24258== at 0x345687112E: _IO_file_underflow@@GLIBC_2.2.5 (in /lib64/libc-2.12.so)^M
==24258== by 0x3456872C2D: _IO_default_uflow (in /lib64/libc-2.12.so)^M
==24258== by 0x3456867419: _IO_getline_info (in /lib64/libc-2.12.so)^M
==24258== by 0x3456870140: fgets_unlocked (in /lib64/libc-2.12.so)^M
==24258== by 0x34584046CE: _nss_files_gethostbyname2_r (in /lib64/libnss_files-2.12.so)^M
==24258== by 0x3456900A38: gethostbyname2_r@@GLIBC_2.2.5 (in /lib64/libc-2.12.so)^M
==24258== by 0x34568CE9F5: gaih_inet (in /lib64/libc-2.12.so)^M
==24258== by 0x34568D016F: getaddrinfo (in /lib64/libc-2.12.so)^M
==24258== by 0x529DD8B: rd_getaddrinfo (rdaddr.c:161)^M
==24258== by 0x5293D45: rd_kafka_broker_resolve (rdkafka_broker.c:432)^M
==24258== by 0x529A03A: rd_kafka_broker_thread_main (rdkafka_broker.c:1292)^M
==24258== by 0x34570077F0: start_thread (in /lib64/libpthread-2.12.so)^M
==24258== Address 0x4ef3d40 is 144 bytes inside a block of size 568 free'd^M
==24258== at 0x4A06300: free (vg_replace_malloc.c:446)^M
==24258== by 0x3456865A1C: fclose@@GLIBC_2.2.5 (in /lib64/libc-2.12.so)^M
==24258== by 0x34584047F9: _nss_files_gethostbyname2_r (in /lib64/libnss_files-2.12.so)^M
==24258== by 0x3456900A38: gethostbyname2_r@@GLIBC_2.2.5 (in /lib64/libc-2.12.so)^M
==24258== by 0x34568CE9F5: gaih_inet (in /lib64/libc-2.12.so)^M
==24258== by 0x34568D016F: getaddrinfo (in /lib64/libc-2.12.so)^M
==24258== by 0x529DD8B: rd_getaddrinfo (rdaddr.c:161)^M
==24258== by 0x5293D45: rd_kafka_broker_resolve (rdkafka_broker.c:432)^M
==24258== by 0x529A03A: rd_kafka_broker_thread_main (rdkafka_broker.c:1292)^M
==24258== by 0x34570077F0: start_thread (in /lib64/libpthread-2.12.so)^M
==24258== by 0x34568E5CCC: clone (in /lib64/libc-2.12.so)^M

With debug turned on, this is what I see:
1388099207.266 RDKAFKA-7-BROKER: LARA#producer-0: kafkadevcluster1-1.maskedout.com:5757/bootstrap: Added new broker with NodeId -1
1388099207.266 RDKAFKA-7-BRKMAIN: LARA#producer-0: kafkadevcluster1-1.maskedout.com:5757/bootstrap: Enter main broker thread
1388099207.266 RDKAFKA-7-CONNECT: LARA#producer-0: kafkadevcluster1-1.maskedout.com:5757/bootstrap: broker in state DOWN connecting
1388099207.266 RDKAFKA-7-BROKER: LARA#producer-0: kafkadevcluster1-2.maskedout.com:5757/bootstrap: Added new broker with NodeId -1
1388099207.266 RDKAFKA-7-BRKMAIN: LARA#producer-0: kafkadevcluster1-2.maskedout.com:5757/bootstrap: Enter main broker thread
1388099207.266 RDKAFKA-7-CONNECT: LARA#producer-0: kafkadevcluster1-2.maskedout.com:5757/bootstrap: broker in state DOWN connecting
1388099207.267 RDKAFKA-7-BROKER: LARA#producer-0: kafkadevcluster1-3.maskedout.com:5757/bootstrap: Added new broker with NodeId -1
AFTER TRYING BROKERS:kafkadevcluster1-1.maskedout.com:5757,kafkadevcluster1-2.maskedout.com:5757,kafkadevcluster1-3.maskedout.com:5757
1388099207.267 RDKAFKA-7-BRKMAIN: LARA#producer-0: kafkadevcluster1-3.maskedout.com:5757/bootstrap: Enter main broker thread
1388099207.267 RDKAFKA-7-CONNECT: LARA#producer-0: kafkadevcluster1-3.maskedout.com:5757/bootstrap: broker in state DOWN connecting
1388099207.267 RDKAFKA-7-TOPIC: LARA#producer-0: New local topic: Lara
*** glibc detected *** ./replicator: double free or corruption (out): 0x00007fd1f0001200 ***

excessive memory allocation == poor performance?

Testing with code based on the example to stream data from rsyslog to kafka. I'm hit by the poor performance - when I would expect 5000 messages / s, I'm only getting ~300 messages / s.

Reading the code, I'm concerned that there's at least two mallocs in the producer thread:

First, in the client:

        char *opbuf = malloc(len + 1);
        strncpy(opbuf, buf, len + 1);

...then the actual producer:

void rd_kafka_produce (rd_kafka_t *rk, char *topic, uint32_t partition,
               int msgflags,
               char *payload, size_t len) {
    rd_kafka_op_t *rko;

    rko = calloc(1, sizeof(*rko));

It seems that it would be best for librdkafka to allocate, track usage and reuse of rd_kafka_op_t and rd_kafka_op_t->rko_payload for optimal performance.

Compile on Mac

Can you give any insight on overcoming errors below when trying to make on Mac?

cc -MD -MP -O2 -Wall -Werror -Wfloat-equal -Wpointer-arith -fPIC -I. -g -DSG -c rdkafka.c
rdkafka.c:1065:29: error: use of GNU old-style field designator extension [-Werror,-Wgnu-designator]
struct consume_ctx ctx = { consume_cb: consume_cb, opaque: opaque };
^~~~~~~~~~~
.consume_cb =
rdkafka.c:1065:53: error: use of GNU old-style field designator extension [-Werror,-Wgnu-designator]
struct consume_ctx ctx = { consume_cb: consume_cb, opaque: opaque };
^~~~~~~
.opaque =
2 errors generated.
make: *** [rdkafka.o] Error 1

Broke Kafka replication when requesting ack != -1 and a message larger than 1000000 bytes

I'm not sure if this is a librdkafka issue or a kafka general one. I did the following steps:

  • Changed Kafka broker to have max message size of 4000000
  • Ran the rdkafka_performance binary and specified a size of 3000 bytes and an ack of 1. Ran successfully
  • Ran the rdkafka_performance binary and specified a size of 2000000 bytes and an ack of 1. Ran successfully
  • Ran the rdkafka_performance binary and specified a size of 2000000 and an ack of -1. The binary reports a failure and times out after 5000 ms.
    Any attempt to run the rdkafka_performance binary (regardless of msg size) using an ack of anything other than 1 fails thereafter with a timeout of 5000 ms. The replication appears to be broken hereafter.

Was able to reproduce this at will, using a new topic each time. I wasn't able to reproduce this when using a cluster max message size of 1000000 (the default). So I suspect there's some sort of issue when the msg is > 1000000 but less than the max msg size configured.

Was wondering if you could reproduce this and if so had any thoughts?

nginx worker processes exit with signal 11 when publishing messages in nginx

Hi edenhill,
I write c code which called by lua to produce messages to kafka server . The lua code is called in nginx server.
I create one kafka handle and add a list of brokers once in nginx initialisation. Then when receiving requests to nginx server, call the lua code to publish messages.
But I got the nginx worker processes exit with signal 11 once requests sent out.
I also wonder if it's right to hold one kafka handle for dealing with all topic write effectively.

As I don't know how to attach the code,i past the major code snippt below:
//the code try to create one kafka handle and add brokers
int mercury_add_kafkabroker(mercury_t *me, const char *kafka_brokers)
{

    if (!me || !kafka_brokers) return -1;

    init_error();

    char errstr[100];
    rd_kafka_conf_t *conf = rd_kafka_conf_new();
    rd_kafka_conf_set_error_cb(conf, err_cb);
    rd_kafka_conf_set_dr_cb(conf, msg_delivered);

    char err[100];
    rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
    me->kafka_handler = rk; 

   if (rk == NULL) {
            set_error("rd_kafka_new fail");
            return -1;
    }

    /* Add brokers */
    if (rd_kafka_brokers_add(me->kafka_handler, kafka_brokers) == 0) {
            fprintf(stderr, "%% No valid brokers specified\n");
            return -1;
    }
    return 0;

}

//the code is called looply by receiving request to nginx server
int mercury_sendmsg_kafka(mercury_t *me, const char * topic, const char *msg)
{
if (!me || !msg) return -1;

    init_error();

    size_t msg_len = strlen(msg);

    char *opbuf = malloc(msg_len + 1);
    strcpy(opbuf, msg);

    if (NULL == me->kafka_topic_handler)  {
    rd_kafka_topic_conf_t * topic_conf = rd_kafka_topic_conf_new();
    rd_kafka_topic_t * rkt = rd_kafka_topic_new(me->kafka_handler, topic, topic_conf);
    me->kafka_topic_handler = rkt;
    }
    while(rd_kafka_produce(me->kafka_topic_handler, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_FREE, opbuf, msg_len, NULL, 0, NULL) == -1) {
        fprintf(stderr, "produce error; resend!");
        /* Poll to handle delivery reports */
        rd_kafka_poll(me->kafka_handler, 10); 
    }
    rd_kafka_poll(me->kafka_handler, 0); 

    if (NULL != me->kafka_handler)  { 
    rd_kafka_destroy(me->kafka_handler);
    }


    return 0;

}

Thanks
Aaron

crashes on too large messages

I want to use librdkafa in my project, but when I do test, it cause my app crash.
my situation is, in main thread create neccessary object, and multithread will call rd_kafka_produce to send the message to kafka server.

when the application runnig sometimes, it will crashed. the call stack like following:

Core was generated by `nelo2-thrift                                          '.
Program terminated with signal 11, Segmentation fault.
#0  0x00000000005ec47d in rd_kafka_broker_produce_toppar (rkb=0x7f8124000ef0) at rdkafka_broker.c:1944
1944            rkbuf->rkbuf_ts_timeout =
Missing separate debuginfos, use: debuginfo-install glibc-2.12-1.80.el6_3.3.x86_64 libgcc-4.4.6-4.el6.x86_64 libgcrypt-1.4.5-9.el6_2.2.x86_64 libgpg-error-1.7-4.el6.x86_64 libstdc++-4.4.6-4.el6.x86_64 ncurses-libs-5.7-3.20090208.el6.x86_64 zlib-1.2.3-27.el6.x86_64
(gdb) bt
#0  0x00000000005ec47d in rd_kafka_broker_produce_toppar (rkb=0x7f8124000ef0) at rdkafka_broker.c:1944
#1  rd_kafka_broker_producer_serve (rkb=0x7f8124000ef0) at rdkafka_broker.c:2105
#2  0x00000000005ed1d8 in rd_kafka_broker_thread_main (arg=0x7f8124000ef0) at rdkafka_broker.c:3002
#3  0x0000003e86e07851 in start_thread () from /lib64/libpthread.so.0
#4  0x0000003e86ae76dd in clone () from /lib64/libc.so.6

No errors when produce to non-existent external broker

Hi,

here is example when I try to produce message to non-existent broker at localhost:

examples/rdkafka_example -P -b localhost:9092 -t test00
% Type stuff and hit enter to send
1389364025.904 RDKAFKA-3-FAIL: rdkafka#producer-0: localhost:9092/bootstrap:
localhost:9092/bootstrap: Failed to connect to broker at localhost:9092: Connection refused

And that is fine, since I'm not running kafka broker on my localhost. But If I put external host, it doesn't show any error:

examples/rdkafka_example -P -b 2.2.2.2:9092 -t test00
% Type stuff and hit enter to send

% Sent 1 bytes to topic test00 partition -1

In this case, I'm expecting some "Failed to connect" message or so.

So, how to check does library can make a connection and produce messages?

Regards

PHP extension on top of librdkafka - Undefined symbol: snappy_compress_iov

Hello, I'm trying to build a PHP extension on top of this librdkafka C library. Unfortunately, after linking, compiling and installing I'm getting this error:

PHP Warning: PHP Startup: Unable to load dynamic library '/usr/lib/php5/2010052
5+lfs/kafka.so' - /usr/lib/php5/20100525+lfs/kafka.so: undefined symbol: snappy_
compress_iov
in Unknown on line 0

Because I'm not so good with C, maybe someone can help me to figure out what is happening. Here is my repo: https://github.com/salebab/phpkafka and here is compile log http://pastebin.com/FZnzz7JU

Thanks!

Librdkafka does not compile on OSX 10.7.5

Hey Magnus,

I am trying to compile Librdkafka on OSX with the following platform env:

Apple LLVM version 4.2 (clang-425.0.28) (based on LLVM 3.2svn)
Target: x86_64-apple-darwin11.4.2
Thread model: posix

A couple of issues showed up:

  1. the -rdynamic switch is not supported on OSX
  2. byteswap.h is not available on OSX, that can be fixed using the following:

if defined(APPLE)

include <libkern/OSByteOrder.h>

define le32toh OSSwapLittleToHostInt32

define htole32 OSSwapHostToLittleInt32

define bswap_32 OSSwapInt32

else

#include <byteswap.h>

endif

  1. The TIMEVAL_TO_TIMESPEC from rdtime.h can be removed
  2. OSX has an incompatible sys queue.h
  3. replace strndupa with strndup

Best,
D

Support sync interface

Hi,
currently the librdkafka only support async, but in my case, we prefer use sync interface.
Do you have plan to support this, Thanks~!

Do you have to specify a callback function for the rd_kafka_poll() to work?

I indicated request.required.acks=1 and am polling in a loop until I get a return value of 1. I have not specified a callback function (I may do this later), but I find that it doesn't ever seem to return 1? Is a callback function required?

In a different program I do have a callback function and it seems to work as expected.

Topic+partition specific produce errors from the broker not properly handled

This may be a Kafka bug rather than a librdkafka bug, but if I send messages larger than the value, I get no indication of failure. I get successful delivery callbacks. My consumers simply receive no messages and the only way I knew was that the JMX console showed failedproduce counter increases. Any chance the API could provide an error? Even if the broker doesn't, not sure if the API knows the max and can locally complain?

High concurrency usage

Hi,

I'm writing a nginx module, which parses a message and send it to kafka using librdkafka. But under high level of concurrency, the api locks e and says it cannot create more threads to send the message. Is there a way to overcome this, by using it in a synchronous way, for example?

thanks in advance,

andre

issue with rdkafka_example when auto.create.topics.enable=true in kafka8

Kafka 8 offers a feature where new topics are automatically created by the broker if a producer attempts to send messages to a non-existent topic. When 'auto.create.topics.enable=true' is set in Kafka 8, messages sent by the 'rdkafka_example producer' get black holed by the broker. FWIW: messages sent to existing topics when 'auto.create.topics.enable=true' works just fine.

To reproduce this condition, add 'auto.create.topics.enable=true' to the server.properties file and start the broker. You should find that kafka-consumer-producer.sh triggers the broker to create the new topics and works as advertised. But something goes wrong with rdkafka_example.

Thanks for continuing to maintain this project and looking into this issue.

-- Adam

Message timeout scans

The unlikely condidtion for last_timeout_scan in rd_kafka_broker_serve that has * 1000000 will 'unlikely' trigger timeout scans, is this as intended, or should it be + 1000000 (1 second) to trigger scans a bit more frequent?

librdkafka 0.8 can not produce messages when the leader of the topic partition failed

Hi, Magnus.
I find that librdkafka 0.8 can not produce messages to the kafka brokers when the leader of the topic partition failed. After I check the new leader is elected, the problem is not yet solved.
I use the rdkafka_example.c in the example folder and create a topic with 3 replicas and 1 partition.
After I restart the producer (rdkafka_example.c, I use it as a producer), the problem is solved.
I am not sure whether it is a bug for librdkafka 0.8.
Thanks.

Cannot send messages when the topic is set long time after the broker

The message producer only works if you set the topic short time after the broker addittion. If you just put a sleep(1) between rd_brokers_add and rd_kafka_topic_new, rd_kafka_example cannot send messages anymore.

Here is the debug output: Without the sleep:
%7|1376053795.949|PARTCNT|0x7ff2168bf700|default#producer-0| Ignore unknown topic rb_event
%7|1376053795.949|METADATA|0x7ff2168bf700|default#producer-0| Topic #2/3: rb_event partition 0 Leader 0
%7|1376053795.949|TOPICUPD|0x7ff2168bf700|default#producer-0| Ignoring topic rb_event: not found locally
%7|1376053795.949|METADATA|0x7ff2168bf700|default#producer-0| Topic #2/3: rb_event partition 1 Leader 0
%7|1376053795.949|TOPICUPD|0x7ff2168bf700|default#producer-0| Ignoring topic rb_event: not found locally
%7|1376053795.949|METADATA|0x7ff2168bf700|default#producer-0| Topic #2/3: rb_event partition 2 Leader 0
%7|1376053795.949|TOPICUPD|0x7ff2168bf700|default#producer-0| Ignoring topic rb_event: not found locally
%7|1376053795.949|PARTCNT|0x7ff2168bf700|default#producer-0| Ignore unknown topic rb_event
%7|1376053799.159|TOPIC|0x7ff2184d1020|default#producer-0| new topic: rb_event

and when I add the sleep:
%7|1376053831.161|PARTCNT|0x7f762d015700|default#producer-0| No change in partition count for topic rb_event
%7|1376053831.161|METADATA|0x7f762d015700|default#producer-0| Topic #2/3: rb_event partition 0 Leader 0
%7|1376053831.161|TOPICUPD|0x7f762d015700|default#producer-0| No leader change for topic rb_event [0] with leader 0
%7|1376053831.161|METADATA|0x7f762d015700|default#producer-0| Topic #2/3: rb_event partition 1 Leader 0
%7|1376053831.161|TOPICUPD|0x7f762d015700|default#producer-0| No leader change for topic rb_event [1] with leader 0
%7|1376053831.161|METADATA|0x7f762d015700|default#producer-0| Topic #2/3: rb_event partition 2 Leader 0
%7|1376053831.161|TOPICUPD|0x7f762d015700|default#producer-0| No leader change for topic rb_event [2] with leader 0
%7|1376053831.161|PARTCNT|0x7f762d015700|default#producer-0| Partitioning 0 unassigned messages in topic rb_event to 3 partitions
%7|1376053831.161|UAS|0x7f762d015700|default#producer-0| 0/0 messages were partitioned

Error during make examples

During a "make" or "make examples", I get the following compiler error:

#
# More usage options:
./rdkafka_example --help
cc1: warnings being treated as errors
rdkafka_performance.c: In function ‘main’:
rdkafka_performance.c:640: error: ‘rkmessages’ may be used uninitialized in this function
make[1]: *** [rdkafka_performance] Error 1
make[1]: Leaving directory `/tmp/librdkafka-master/examples'
make: *** [examples] Error 2

Changing line 640 of rdkafka_performance.c to the following fixes my problem:

                rd_kafka_message_t **rkmessages=NULL;

Information about my system if it helps you:

$ cat /etc/redhat-release 
CentOS release 6.4 (Final)
$ uname -a
Linux XXXXXXXX 2.6.32-279.11.1.el6.x86_64 #1 SMP Tue Oct 16 15:57:10 UTC 2012 x86_64 x86_64 x86_64 GNU/Linux
$ cc --version
cc (GCC) 4.4.7 20120313 (Red Hat 4.4.7-3)
$ make --version
GNU Make 3.81

0.8.1 master branch makefile can't generate librdkafka.so.1

edenhill,
i use centos 6.3 final version, 0.8.1 master branch librdkafka dont generate so.1, the bug is described below:

There is a issue in Makefile, line 50,
50 @(if [ $(UNAME_S) = "Linux" -o $(CYGWIN) = CYGWIN ]; then \

should use
@(if [ $(UNAME_S) = "Linux" ]; then \

or else no so.1 generated. I think it's brought in by your recent support for cygwin.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.