Code Monkey home page Code Monkey logo

erlkaf's People

Contributors

byronpc avatar cheng81 avatar escobera avatar henryj avatar hikui avatar jsvitolo avatar macasado86 avatar matreyes avatar pankajsoni19 avatar ramonpin avatar robsonpeixoto avatar silviucpp avatar sulphur avatar sztheory avatar tpitale avatar tsloughter avatar v-kat avatar vitortrin avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

erlkaf's Issues

Compile failure on M1 MacBook Pro

MacOS Monterey 12.6.1

I'm trying to use erlkaf in an Elixir project. When I go to compile however it fails, and there are a number of "interesting" entries in the output pointing at why:

Creating static library librdkafka.a
...
ld: warning: ignoring file /opt/homebrew/opt/[email protected]/lib/libssl.dylib, building for macOS-x86_64 but attempting to link with file built for macOS-arm64
ld: warning: ignoring file /opt/homebrew/opt/[email protected]/lib/libcrypto.dylib, building for macOS-x86_64 but attempting to link with file built for macOS-arm64
ld: warning: ignoring file /opt/homebrew/Cellar/lz4/1.9.4/lib/liblz4.dylib, building for macOS-x86_64 but attempting to link with file built for macOS-arm64
cp librdkafka.a librdkafka-dbg.a
Undefined symbols for architecture x86_64:
...

And then further down:

WARNING: librdkafka-static.a: No static libraries available/enabled for inclusion in self-contained static library librdkafka-static.a: this library will be identical to librdkafka.a

And:

WARNING: librdkafka-static.a: The following libraries were not available as static libraries and need to be linked dynamically: -llz4 -lm -lcurl -lsasl2 -lssl -lcrypto -lz -ldl -lpthread

And finally:

ld: symbol(s) not found for architecture x86_64
cp librdkafka-static.a librdkafka-static-dbg.a
clang: error: linker command failed with exit code 1 (use -v to see invocation)
make[2]: *** [librdkafka.1.dylib] Error 1
make[2]: *** Waiting for unfinished jobs....
make[1]: *** [libs] Error 2
error with make
make: *** [get_deps] Error 1
===> Hook for compile failed!

When I look at the configuration that's generated, indeed I see ARCH x86_64 and I've been unable to determine why it's not set to arm64 as expected. A coworker has a similar setup as mine and his config is set correctly.

Some info:

m1-mbp:payment-processor $ brew info librdkafka
==> librdkafka: stable 1.9.2 (bottled), HEAD
Apache Kafka C/C++ library
https://github.com/edenhill/librdkafka
/opt/homebrew/Cellar/librdkafka/1.9.2 (38 files, 7.6MB) *
Poured from bottle on 2023-01-11 at 15:31:59
From: https://github.com/Homebrew/homebrew-core/blob/HEAD/Formula/librdkafka.rb
License: BSD-2-Clause
==> Dependencies
Build: pkg-config ✔, [email protected]
Required: lz4 ✔, lzlib ✔, [email protected] ✔, zstd ✔
==> Options
--HEAD
Install HEAD version

m1-mbp:payment-processor $ openssl version
LibreSSL 2.8.3

I admit that I am a bit out of my element here, however I have spent hours digging through GitHub issues, blog posts, StackOverflow, etc. trying to figure this out and I am out of ideas. I am happy to provide any more info that I can that might be helpful. Thank you

Dialyzer shows an error for usages of `create_consumer_group`

The contract for create_consumer_group function says:

-spec create_consumer_group(client_id(), binary(), [binary()], [client_option()], [topic_option()]) -> ok | {error, reason()}.

create_consumer_group(ClientId, GroupId, Topics, ClientConfig0, DefaultTopicsConfig)

It seems that we expect Topics to be a [binary()] but later in the code there is the following validation:

valid_consumer_topics([H|T]) ->
    case H of
        {K, V} when is_binary(K) and is_list(V) ->
            Mod = erlkaf_utils:lookup(callback_module, V),
            case Mod =/= undefined andalso is_atom(Mod) of
                true ->
                    valid_consumer_topics(T);
                _ ->
                    {error, {invalid_topic, H}}
            end;
        _ ->
            {error, {invalid_topic, H}}
    end;

That seems to force Topics to be a list of pairs {binary(), [{callback_module, atom()}]} or similar.

Dialyzer says the contract is broken for valid Topics parameter.

lib/broadway_erlkaf/v1/api/topic.ex:0: The call erlkaf:create_consumer_group
         (_worker_name@1 :: atom(),
          _group_name@1 :: any(),
          [{_,
            [{'callback_args', [any(), ...]} |
             {'callback_module', 'Elixir.BroadwayErlkaf.V1.Api.Topic'} |
             {'dispatch_mode', {'batch', _}},
             ...]},
           ...],
          _client@1 :: any(),
          _topics_opts@1 :: [{'auto_offset_reset', 'smallest'}, ...]) breaks the contract 
          (client_id(),
          binary(),
          [binary()],
          [client_option()],
          [topic_option()]) ->
             'ok' | {'error', reason()}

I believe we only need to properly define the spec for the Topics parameter at create_consumer_group but don't really know which is the proper type for it.

Build error

/home/admin/data/new_ejjabered/deps/erlkaf/c_src/erlkaf_consumer.cc: In function ‘ERL_NIF_TERM enif_consumer_cleanup(ErlNifEnv*, int, const ERL_NIF_TERM*)’:
/home/admin/data/new_ejjabered/deps/erlkaf/c_src/erlkaf_consumer.cc:456:5: error: this ‘if’ clause does not guard... [-Werror=misleading-indentation]
     if(!enif_get_resource(env, argv[0], data->res_consumer, reinterpret_cast<void**>(&consumer)))
     ^~
/home/admin/data/new_ejjabered/deps/erlkaf/c_src/erlkaf_consumer.cc:459:2: note: ...this statement, but the latter is misleadingly indented as if it is guarded by the ‘if’
  consumer->running = false;
  ^~~~~~~~
 CPP    erlkaf_nif.cc

Potential consumer queue cleanup issue?

erlkaf version: 2.0.8

Hi Silviu,

Thanks for your work.

We are currently using erlkaf in our server. However, I noticed sometimes when partition rebalance is triggered, some partition consumers may get stuck. This happens when a partition consumer is doing heavy work that takes long time.

After dig deeper in erlkaf, I found that when :revoke_partition is received by the consumer group, it tries to stop all partition consumers. It does so by sending a stop message to each partition consumer and wait for 5s. Now that we have a heavy consumer that is doing a task longer than 5s, it will then be force killed. When this happens, consumer_queue_cleanup is not run. I suspect it may be related to partition stuck issue.

Not sure if my understanding is correct.

Flush producer during shutdown

There isn't a erlkaf:flush function so I'm wondering if rdkafka or erlkaf internally has some logic to flush during shutdown of the node?

How to use EOD (transactional.id)

Hi, I've recently started testing erlkaf after a year using brod, and I'm very impressed of how it works perfectly well. Congratulations!
Have anyone tried to implement a consumer -> producer (stream processing) job with transactional_id to ensure Exactly once delivery?
How would it be?, Would we need a new type of consumer / producer group?

Question – Producing synchronously

Hi,

I found here that you are comparing erlkaf:produce/4 to brod:produce_sync/5. But if I understand well, erlkaf does not produces synchronously, but rather return immediately and sends a delivery report back.

Please correct me if that is wrong. If it is correct, is there a special support for blocking until the message is delivered ? Or should I just wait the delivery report?

Thank you.

Translate error code

I called the erkaf:produce function and got {:error, 29}, which is not very helpful. The signature of the function says that reason is anything, but there is already an internal mapping in erlkaf_private.hrl, is there no way that erlkaf can attempt to translate the error before returning?

Any plans to make a new release?

Do you have any plans to make a new release? Could I help by starting a PR? We're currently pinned to a SHA on master. I'd love to get to a release artifact.

Thanks!

Pause/Resume Consumer

As far as I can tell, pause/resume of consumers and producers is currently not supported by erlkaf. I would like to see if this is at all a possibility. My main use case for this is controlling the fetching of messages in cases where downstream data storage is e.g. offline. I imagine that for consumers it could be set returning a {pause, #state{}} tuple or some such.

Would it be possible to implement this in erlkaf?

erlkaf_consumer stop timeout

Hello,

after upgrading to erlkaf 2.0.0 (commit 1d706c6) we started experiencing consumer stop timeouts on consumer group rebalance.

Our consumer group has only one topic with 36 partitions and originally we have 2 application instances consuming 18 each partitions. When there is some more load we scale horizontally up to 9 applications and consumer group rebalances.

Sometimes, when rebalance occurs consumer timeout on stop happens (erlkaf_consumer.erl:50), gen_server crashes. There wouldn't be any problem but consumer group rebalance continues in a loop and our app never recovers. The consumer just keeps starting and crashing in a loop and stable group rebalance never happens.

erlkaf_timeout.txt

  • shows behaviour with original erlkaf code which shows that in first case 15/18 consumers stops within 5 seconds timeout and then the crash occurs. Partitions are being reassigned and shortly after in second rebalance 14/18 consumers stop gracefully and gen_server crashes again... this keeps happening in a loop and application never recovers to work stable.

erlkaf_timeout_extended.txt

  • shows behaviour when timeout in erlkaf_consumer:50 is increased to 60 seconds. This enables all the consumers to stop and a successful rebalance to complete, but the timestamps clearly show that some consumers stop instantly and in worst case, we have to wait over 30 seconds for the last one to stop.

This never happened with erlkaf 1.1.9 which we used for quite a while. What is different?

Seek to offset

I am looking at using this for something I am working on and I was wondering if there is a way to seek to an offset in a consumer or if it must start from the beginning / current time?

MacOs Build fail

I am building on macos catalina 10.15.1

logs

make[3]: Entering directory `/Users/pankajsoni/Documents/Git/Rivers/ejabberd/master/deps/erlkaf/c_src'
 CPP    erlkaf_producer.cc
 CPP    queuemanager.cc
 CPP    topicmanager.cc
 CPP    erlkaf_logger.cc
 CPP    erlkaf_config.cc
 CPP    erlkaf_nif.cc
 CPP    erlkaf_consumer.cc
 CPP    nif_utils.cc
 LD     erlkaf_nif.so
Undefined symbols for architecture x86_64:
  "_OPENSSL_sk_pop_free", referenced from:
      _rd_kafka_transport_ssl_ctx_init in librdkafka.a(rdkafka_transport.o)
  "_OpenSSL_version", referenced from:
      _rd_kafka_transport_ssl_ctx_init in librdkafka.a(rdkafka_transport.o)
  "_OpenSSL_version_num", referenced from:
      _rd_kafka_transport_ssl_ctx_init in librdkafka.a(rdkafka_transport.o)
  "_SSL_CTX_set_options", referenced from:
      _rd_kafka_transport_ssl_ctx_init in librdkafka.a(rdkafka_transport.o)
  "_TLS_client_method", referenced from:
      _rd_kafka_transport_ssl_ctx_init in librdkafka.a(rdkafka_transport.o)
ld: symbol(s) not found for architecture x86_64
clang: error: linker command failed with exit code 1 (use -v to see invocation)
make[3]: *** [/Users/pankajsoni/Documents/Git/Rivers/ejabberd/master/deps/erlkaf/c_src/../priv/erlkaf_nif.so] Error 1
make[3]: Leaving directory `/Users/pankajsoni/Documents/Git/Rivers/ejabberd/master/deps/erlkaf/c_src'

Error compiling last release using hex

When I use hex and I add a dependency to the latest release like this

{:erlkaf, "~> 2.1.0"}

fetching the deps results in the following error

Error evaluating Rebar config script ./rebar.config.script:22: evaluation failed with reason error:{badmatch,{error,enoent}} and stacktrace [{erl_eval,expr,6,[{file,[101,114,108,95,101,118,97,108,46,101,114,108]},{line,496}]},{erl_eval,exprs,6,[{file,[101,114,108,95,101,118,97,108,46,101,114,108]},{line,136}]},{erl_eval,expr_list,7,[{file,[101,114,108,95,101,118,97,108,46,101,114,108]},{line,961}]},{erl_eval,expr,6,[{file,[101,114,108,95,101,118,97,108,46,101,114,108]},{line,472}]},{file,eval_stream2,6,[{file,[102,105,108,101,46,101,114,108]},{line,1504}]},{file,script,2,[{file,[102,105,108,101,46,101,114,108]},{line,1142}]},{'Elixir.File','cd!',2,[{file,[108,105,98,47,102,105,108,101,46,101,120]},{line,1607}]},{'Elixir.Mix.Rebar',eval_script,2,[{file,[108,105,98,47,109,105,120,47,114,101,98,97,114,46,101,120]},{line,191}]}]
Any dependencies defined in the script won't be available unless you add them to your Mix project

This does not happen if we add the dependency directly from the github like this:

{:erlkaf, github: "silviucpp/erlkaf"}

even though no changes happened to rebar config in the master since the last release

Manually committing offsets

Hi! 👋

I was poking around trying to see if there is a way to manually commit offsets to the kafka broker, and found this commit function in librdkafka docs - but as far as I can tell, this doesn't seem to be exposed by the erlkaf wrapper?

Is it possible to make this call in user-land, or does this require changes from erlkaf to support?

Cheers 🙌

Build failing on mac

I am trying to use the library on mac. The esq dependency is failing to compile with

src/esq_queue.erl: undefined parse transform 'category'

If I download esq repo, it does compiles by itself. Can you check please.

Error compiling on elixir 1.13.1

I'm getting an error the first time I try to build the nif using the latest version of elixir.

Last log lines are as follows

~/.cache/mix/installs/elixir-1.13.1-erts-12.2/febfcb37852cfa2dd1caf0e08cdda1de/deps/erlkaf/deps ~/.cache/mix/installs/elixir-1.13.1-erts-12.2/febfcb37852cfa2dd1caf0e08cdda1de/deps/erlkaf
~/.cache/mix/installs/elixir-1.13.1-erts-12.2/febfcb37852cfa2dd1caf0e08cdda1de/deps/erlkaf
make[1]: Entering directory '/root/.cache/mix/installs/elixir-1.13.1-erts-12.2/febfcb37852cfa2dd1caf0e08cdda1de/deps/erlkaf/c_src'
 CPP    erlkaf_consumer.cc
 CPP    queuemanager.cc
 CPP    topicmanager.cc
 CPP    erlkaf_nif.cc
 CPP    queuecallbacksdispatcher.cc
 CPP    erlkaf_logger.cc
 CPP    nif_utils.cc
 CPP    erlkaf_message.cc
 CPP    erlkaf_producer.cc
 CPP    erlkaf_config.cc
 LD     erlkaf_nif.so
make[1]: Leaving directory '/root/.cache/mix/installs/elixir-1.13.1-erts-12.2/febfcb37852cfa2dd1caf0e08cdda1de/deps/erlkaf/c_src'
===> Analyzing applications...
===> Compiling erlkaf
===> Missing artifact priv/erlkaf_nif.so
** (Mix.Error) Could not compile dependency :erlkaf, "/root/.mix/rebar3 bare compile --paths /root/.cache/mix/installs/elixir-1.13.1-erts-12.2/febfcb37852cfa2dd1caf0e08cdda1de/_build/dev/lib/*/ebin" command failed. Errors may have been logged above. You may run Mix.install/2 to try again or change the arguments to Mix.install/2 to try another version

Steps to reproduce

I created a simplified flow using docker, but it is happening on my local installation as well.

Using the following Dockerfile to get latest elixir version and erlkaf build dependencies

FROM elixir:1.13.1-slim AS app_builder

RUN apt-get update && \
    apt-get install -y --no-install-recommends git build-essential libssl-dev libcrypto++-dev zlib1g-dev libsasl2-dev \
                     liblz4-dev libzstd-dev ca-certificates

Then just build, start the container and test the installation:

docker build -t erlkaf_test .
docker run -it --rm erlkaf_test

iex(1)> Mix.install([{:erlkaf, github: "silviucpp/erlkaf"}])

# there will be a propmt to install rebar

Then the error occurs, after that if you try again on the same image it installs just fine. Maybe it is related to elixir-lang/elixir#11530

jsone v1.4.5 dependency and OTP21 - compile error

Greetings,

we have problems with erlkaf dependency jsone v1.4.5 on OTP21 in our elixir project. I know that this is pure erlang implementation, but it works great with our elixir projects until elixir 1.7.x which runs on OTP21.

When run mix compile, we're getting error:

mix compile                                                                                 ✔  10073  19:12:35
===> Compiling jsone
===> Compiling src/jsone.erl failed
src/jsone.erl:297: erlang:get_stacktrace/0: deprecated; use the new try/catch syntax for retrieving the stack backtrace
src/jsone.erl:346: erlang:get_stacktrace/0: deprecated; use the new try/catch syntax for retrieving the stack backtrace

** (Mix) Could not compile dependency :jsone, "/Users/trajakovic/.mix/rebar3 bare compile --paths "/Users/trajakovic/IdeaProjects/scorealarm/pretty-kafka-client/_build/dev/lib/*/ebin"" command failed. You can recompile this dependency with "mix deps.compile jsone", update it with "mix deps.update jsone" or clean it with "mix deps.clean jsone"

Simplest solution to this is use jsone tag 1.4.7, so we're wondering if it's possible to update dependency directly in erlkaf project?

Thank you

Error multiple consumers

Getting this error when I try to start multiple consumers while connecting to Confluent Cloud

06:16:30.023 [error] rdkafka#consumer-1 FAIL [thrd:GroupCoordinator]: GroupCoordinator: b30-pkc-2396y.us-east-1.aws.confluent.cloud:9092: Failed to resolve 'b30-pkc-2396y.us-east-1.aws.confluent.cloud:9092': nodename nor servname provided, or not known (after 19ms in state CONNECT, 4 identical error(s) suppressed) 06:16:30.024 [error] rdkafka#consumer-1 ERROR [thrd:app]: rdkafka#consumer-1: GroupCoordinator: b30-pkc-2396y.us-east-1.aws.confluent.cloud:9092: Failed to resolve 'b30-pkc-2396y.us-east-1.aws.confluent.cloud:9092': nodename nor servname provided, or not known (after 19ms in state CONNECT, 4 identical error(s) suppressed)

Everything is working fine connecting to a local Kafka server (without sasl), and connecting one consumer to Confluent Cloud is working perfectly.

Is there any configuration or something I can do to get it working with more than one consumer?

Thanks in advance.

Support for simple ungrouped consumers

I'm looking at switching from Elsa to erlkaf. My use case is to consume entire Kafka topics to populate in-memory caches on application boot. This requires two things from my consumers:

  1. They start from the beginning of the topic every time the application starts, rather than starting from a previously-set offset
  2. They're not part of a consumer group, or at least not part of a common or previously-used one, so each node in the cluster can maintain its own independent cache

I think I could accomplish this with randomly-generated unique consumer groups, but that feels like a hack, so I'm hoping there's a better way. I didn't see one looking through the code but might've missed it.

make compile_nif error

I can't make erlkaf_nif.so.

my OS is Debian GNU/Linux 10 (buster)

and here is the log:

$ make compile_nif 
librdkafka fork already exist. delete deps/librdkafka for a fresh checkout ...
concurrentqueue fork already exist. delete deps/concurrentqueue for a fresh checkout ...
make[1]: Entering directory '/home/nava/src/tst/erlkaf/c_src'
 CPP    topicmanager.cc
 CPP    queuemanager.cc
 CPP    erlkaf_logger.cc
 CPP    nif_utils.cc
 CPP    erlkaf_config.cc
 CPP    erlkaf_nif.cc
 CPP    queuecallbacksdispatcher.cc
 CPP    erlkaf_producer.cc
 CPP    erlkaf_consumer.cc
 LD     erlkaf_nif.so
/usr/bin/ld: cannot find -lz
collect2: error: ld returned 1 exit status
make[1]: *** [nif.mk:72: /home/ali/src/tst/erlkaf/c_src/../priv/erlkaf_nif.so] Error 1
make[1]: Leaving directory '/home/ali/src/tst/erlkaf/c_src'
make: *** [Makefile:7: compile_nif] Error 2

Error bad arg calling erlkaf_nif:consumer_new

Inside of erlkaf_consumer_group init there is a call to erlkaf_nif:consumer_new(GroupId, TopicsNames, RdkClientConfig, RdkTopicConfig).

This appears to be returning an error {error, badarg} up when I call erlkaf:create_consumer_group.

So I printed the args being passed in to consumer_new:

erlang:display(GroupId),
erlang:display(TopicsNames),
erlang:display(RdkClientConfig),
erlang:display(RdkTopicConfig),

And then from a console (I'm using elixir, so iex) I try calling consumer_new with those args (some info redacted):

:erlkaf_nif.consumer_new(
  'lifecycle-consumer-local',
  [<<"lifecycle_events">>],
  [{<<"security.protocol">>,<<"ssl">>},{<<"bootstrap.servers">>,<<"blah-blah-blah.kafka.amazonaws.com:9094">>}],
  [{<<"auto.offset.reset">>,<<"latest">>}]
)

And that call works, returning {ok, Ref} … no badarg 🙃

Any ideas why it would return a badarg once and then work when I call it myself?

Produce tombstones

Hi,

I'm unable to produce tombstones (null messages) to kafka for a specified key in a compacted topic. I think the problem is that the producer is always expecting a value with a binary content and when an undefined atom arrives, the make_badarg method is invoked.

I would like to know if this is not supported or I'm missing something.

Thank you

Release on OTP 26 "freezing" on startup

When we start our application, it get's "stuck" or "freezes" before doing anything that we can observe. It doesn't crash, it doesn't time out, it never seems to start our application code.

Not sure of a better way to describe what's happening. An application (happens to be elixir) that was working on OTP 24 (haven't tried 25, yet) no longer works when we try to start the release on OTP 26.

I've currently got an application built that only uses erlkaf, and does nothing else but try to load. I've gone so far as to try to modify the release's start_clean.script file to try to find out what is happening, or narrow it down.

Before running mix release, we can run an iex console with erlkaf operating correctly. We've compiled with the prod environment.

We're just trying to narrow it down at this point. If you have any ideas what it could be, or perhaps have a simple erlkaf erlang example we can try (since maybe it's an elixir issue in the release?). Perhaps you've encountered something like this before.

We'll keep digging!

headers support

It looks like headers aren't exposed but are supported by the current version of librdkafka, correct?

I can take a wack at it if it is simply adding a field to the message record and setting it on decode and using it when calling librdkafka's produce function?

(lldb) stop reason = EXC_BAD_ACCESS

Upon starting erlkaf in the lldb I sometimes get random Segmentation faluts.

When I run the beam machine with lldb this is where I get stopped over and over again.

* thread #5, name = '1_scheduler', stop reason = EXC_BAD_ACCESS (code=1, address=0x0)
    frame #0: 0x000000010033ad80 beam.debug.smp`erts_dlc_create_lock(dlc=0x000000014383e078, name=0x0000000000000000) at erl_dyn_lock_check.c:170:5
   167 	    erts_aint_t i, n = erts_atomic_read_nob(&n_lock_types);
   168 	    int name_len;
   169 	
-> 170 	    for (i = 0; name[i]; i++) {
   171 	        if (name[i] == '[')
   172 	            break;
   173 	    }
Target 0: (beam.debug.smp) stopped.

When I do a backtrace:

(lldb) bt
* thread #5, name = '1_scheduler', stop reason = EXC_BAD_ACCESS (code=1, address=0x0)
  * frame #0: 0x000000010033ad80 beam.debug.smp`erts_dlc_create_lock(dlc=0x000000014383e078, name=0x0000000000000000) at erl_dyn_lock_check.c:170:5
    frame #1: 0x0000000100384549 beam.debug.smp`erl_drv_mutex_create(name=0x0000000000000000) at erl_drv_thread.c:171:5
    frame #2: 0x00000001003a6325 beam.debug.smp`enif_mutex_create(name=0x0000000000000000) at erl_nif.c:2131:53
    frame #3: 0x00000001484615c7 erlkaf_nif.so`QueueCallbacksDispatcher::QueueCallbacksDispatcher() [inlined] CriticalSection::CriticalSection(this=0x0000000100d18880) at critical_section.h:11:34 [opt]
    frame #4: 0x00000001484615c0 erlkaf_nif.so`QueueCallbacksDispatcher::QueueCallbacksDispatcher() [inlined] CriticalSection::CriticalSection(this=0x0000000100d18880) at critical_section.h:11 [opt]
    frame #5: 0x00000001484615c0 erlkaf_nif.so`QueueCallbacksDispatcher::QueueCallbacksDispatcher(this=0x0000000100d18880) at queuecallbacksdispatcher.cc:11 [opt]
    frame #6: 0x0000000148464407 erlkaf_nif.so`on_nif_load(env=<unavailable>, priv_data=0x000000014383c4e0, load_info=<unavailable>) at erlkaf_nif.cc:56:27 [opt]
    frame #7: 0x00000001003acb6f beam.debug.smp`erts_load_nif(c_p=0x00000001438b0268, I=0x000000014981e8b0, filename=0x14364c169 "/Users/y/sportening/we-api-user-account/_build/test/lib/erlkaf/priv/erlkaf_nif", args=0xf 0) at erl_nif.c:4653:16
    frame #8: 0x0000000100032056 beam.debug.smp`beam_jit_load_nif(c_p=0x00000001438b0268, I=0x000000014981e8b0, reg=0x0000700008932dc0) at beam_jit_common.c:154:18

I do not understand how this can be correct, passing NULL to the enif_mutex_create function:

# critical_section.h
CriticalSection() { mutex_ = enif_mutex_create(NULL);}

Name becomes a NULL pointer (name=0x0000000000000000) and obviously accessing name[i] fails

`brew install` causing undesired updates and crashing other Homebrew pkgs (macOS-only)

I recently had issues while building erlkaf on macOS due to its attempt to install its dependencies through a forced brew install.

During the process, Homebrew ran some auto-update (brew update --auto-update) which caused some unrelated packages to break. I had to spend some minutes figuring out what had happened, and how a mix deps.get could potentially break my Homebrew – and then I could this snippet:

case $1 in
  $LIBRDKAFKA_DESTINATION)
    case $OS in
      Darwin)
        brew install [email protected] lz4 zstd curl
        OPENSSL_ROOT_DIR=$(brew --prefix [email protected])
        export CPPFLAGS=-I$OPENSSL_ROOT_DIR/include/
        export LDFLAGS=-L$OPENSSL_ROOT_DIR/lib
        ;;
    esac

  # (...)
esac

See: https://github.com/silviucpp/erlkaf/blob/master/build_deps.sh#L67-L72

It's pretty bad to touch system packages during the build of a library since it can potentially affect other packages like the scenario I ran into. So, I would recommend removing this snippet from the build scripts, replacing it with a clear doc section about system requirements (in macOS specifically, suggest Homebrew packages as a recommendation – though people could also use alternative methods like nixpkgs, etc.), and possibly failing the build when necessary with some informative error messages stating that system requirements were not met.

I think we could accomplish this by checking if some binaries and/or environment variables are set. I do similar things in my .zshrc to configure my shell if/when it's on a Homebrew-powered macOS (eg.: https://github.com/joeljuca/cli/blob/main/config/zsh/zshrc.sh#L70-L77).

if ! which openssl >/dev/null 2>&1; then
  echo "You must have OpenSSL installed in your system."
  echo "See: https://(...)"
  exit 1
fi

if [ "${LDFLAGS}" == "" ]; then
  echo "Required config LDFLAGS is not set"
  echo "See: https://(...)"
  exit 1
fi

# (...)

# if everything looks good, proceed with the build...

PS: I'm not a great shell programmer, but I imagine a couple of if blocks could do the trick.

Kafka producer throwing warnings on default configuration

These 4 default configurations are injected when not specified by apply_kafka_default_config/1 routine from erlkaf_config.cc.

bool appy_kafka_default_config(rd_kafka_conf_t* config)
{
    if(rd_kafka_conf_set(config, "enable.auto.commit", "true", NULL, 0) != RD_KAFKA_CONF_OK)
        return false;

    if(rd_kafka_conf_set(config, "enable.auto.offset.store", "false", NULL, 0) != RD_KAFKA_CONF_OK)
        return false;

    if(rd_kafka_conf_set(config, "enable.partition.eof", "false", NULL, 0) != RD_KAFKA_CONF_OK)
        return false;

    if(rd_kafka_conf_set(config, "allow.auto.create.topics", "true", NULL, 0) != RD_KAFKA_CONF_OK)
        return false;
...

It produces warnings everytime it starts:

CONFWARN [thrd:app]: Configuration property enable.auto.commit is a consumer property and will be ignored by this producer instance
CONFWARN [thrd:app]: Configuration property enable.auto.offset.store is a consumer property and will be ignored by this producer instance
CONFWARN [thrd:app]: Configuration property enable.partition.eof is a consumer property and will be ignored by this producer instance
CONFWARN [thrd:app]: Configuration property allow.auto.create.topics is a consumer property and will be ignored by this producer instance

Build error on Manjaro

Hello there,

I am getting this error while building the NIF on Manjaro Linux

HEAD is now at cff1001 Fixed compile error introduced in previous commit
~/projects/antecedentes_api/deps/erlkaf/deps ~/projects/antecedentes_api/deps/erlkaf
~/projects/antecedentes_api/deps/erlkaf
make[1]: Entering directory '/home/rafa/projects/antecedentes_api/deps/erlkaf/c_src'
 CPP    erlkaf_producer.cc
 CPP    erlkaf_config.cc
 CPP    topicmanager.cc
 CPP    nif_utils.cc
 CPP    queuemanager.cc
 CPP    erlkaf_logger.cc
 CPP    erlkaf_nif.cc
 CPP    queuecallbacksdispatcher.cc
 CPP    erlkaf_consumer.cc
/home/rafa/projects/antecedentes_api/deps/erlkaf/c_src/queuecallbacksdispatcher.cc: In member function ‘void QueueCallbacksDispatcher::watch(rd_kafka_t*, bool)’:
/home/rafa/projects/antecedentes_api/deps/erlkaf/c_src/queuecallbacksdispatcher.cc:47:75: error: ‘stoull’ is not a member of ‘std’; did you mean ‘wcstoull’?
   47 |         objects_[instance] = item(is_consumer, static_cast<uint64_t>(std::stoull(buffer)/2), now_ms());
      |                                                                           ^~~~~~
      |                                                                           wcstoull
make[1]: *** [nif.mk:78: /home/rafa/projects/antecedentes_api/deps/erlkaf/c_src/queuecallbacksdispatcher.o] Error 1
make[1]: *** Waiting for unfinished jobs....
make[1]: Leaving directory '/home/rafa/projects/antecedentes_api/deps/erlkaf/c_src'
make: *** [Makefile:7: compile_nif] Error 2
===> Hook for compile failed!

After some tests I got it to compile by adding

#include <string>

to https://github.com/silviucpp/erlkaf/blob/master/c_src/queuecallbacksdispatcher.cc#L6

I'm not sending a PR yet cause I don't know if this is the correct solution, maybe you could provide me some feedback @silviucpp ?

segfault when stopping client

I haven't been able to reproduce this outside of the test suite of an internal project, so I'm opening in hopes you might have seen this before.

In the test suite I was going to have a consumer start and stop with the suite that needs it. A producer is configured in sys.config and started when the the application is booted during the start of the test suites.

The segfault happens when attempting to stop the consumer during test suite cleanup, erlkaf:stop_client(client_consumer). Logs look like:

22:03:18.453 [info] stop consumer for: <<"events">> partition: 0
22:03:18.453 [info] wait for consumer client client_consumer to stop...
22:03:18.529 [info] client client_consumer stopped
Segmentation fault (core dumped)

I tried reproducing by manually starting and stopping a consumer but it works fine.

Do you have any theories on possible reasons for a segfault that might help me make this reproducible?

Headers missing in delivery report?

I want to do some retry logic based on the message header. But I noticed that the delivery report doesn't include headers. Is it possible to add them?

Producer timestamp

Hi,

I am asked to add a timestamp to messages when producing a message. I am currently using brod but would like to migrate to erlkaf eventually.

I would like to know if it is supported by erlkaf or if support is planned.

Thank you!

question: Does erlkaf handle producer failures?

Due to the async nature of the libdrkafka API, I understand that the only way to guarantee the producer is sync would be handling the delivery reports and blocking the producer waiting for the callback to return.

My question is, do we have other retry mechanisms, like some kind of auto-retry, or the only way to handle errors is for the user to implement such a callback?

Support deleting topics

It would be useful for cleaning up the state of the broker when using erlkaf in tests
that produce to random topics that we are not really interested to be long lived.

For example:

setup_all do
  :erlkaf.create_producer(TestProducer, [])
  topic = "random_test_topic_#{Ecto.UUID.generate()}"
  :ok = :erlkaf.create_topic(TestProducer, topic)
  
  on_exit(fn ->
      :erlkaf.delete_topic(TestProducer, topic)
      :erlkaf.stop_client(TestProducer)
  end)
end

Pardon my Elixir.

Unable to compile it on a MacBook Pro M1

I'm having issues compiling erlkaf on a macOS Ventura 13.6.1:

$ mix deps.compile erlkaf
=ERROR REPORT==== 28-Feb-2024::17:31:13.767352 ===
beam/beam_load.c(190): Error loading module rebar3_hex_owner:
  This BEAM file was compiled for a later version of the runtime system than the current (Erlang/OTP 25).
  To fix this, please re-compile this module with an Erlang/OTP 25 compiler.
  (Use of opcode 182; this emulator supports only up to 180.)


=ERROR REPORT==== 28-Feb-2024::17:31:13.767370 ===
Loading of $HOME/.cache/rebar3/plugins/rebar3_hex/ebin/rebar3_hex_owner.beam failed: badfile

===> Errors loading plugin {rebar_cmd,"0.2.6"}. Run rebar3 with DEBUG=1 set to see errors.
librdkafka fork already exist. delete _build/deps/librdkafka for a fresh checkout ...
concurrentqueue fork already exist. delete _build/deps/concurrentqueue for a fresh checkout ...
 LD     erlkaf_nif.so
ld: Undefined symbols:
  _SSL_get_peer_certificate, referenced from:
      _rd_kafka_transport_ssl_handshake in librdkafka.a[70](rdkafka_ssl.o)
clang: error: linker command failed with exit code 1 (use -v to see invocation)
make[1]: *** [_build/dev/lib/erlkaf/priv/erlkaf_nif.so] Error 1
make: *** [compile_nif] Error 2
===> Hook for compile failed!

** (Mix) Could not compile dependency :erlkaf, "$HOME/.asdf/installs/elixir/1.15.4-otp-25/.mix/elixir/1-15/rebar3 bare compile --paths _build/dev/lib/*/ebin" command failed. Errors may have been logged above. You can recompile this dependency with "mix deps.compile erlkaf --force", update it with "mix deps.update erlkaf" or clean it with "mix deps.clean erlkaf"

I'm compiling it through Elixir's Mix. It breaks and I don't have much of a clue on how to tackle this. Any help is highly appreciated.

Versions:

  • Erlang 25.3.2.3 (asdf)
  • Elixir 1.15.4-otp-25 (asdf)
  • erlkaf 2.1.4

Hex package 1.1.7 failure undef erl_nif:set_log_process

When using the hex package, version 1.1.7, it fails during init of erlkaf_logger because of undef erl_nif:set_log_process.

I look at the code for the module in the package and it seems fine.

When using the master branch as the dep in rebar3 it works fine, so seems to either be a problem with the package itself or just needs a new version published off current master.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.