kafka4beam / kafka_protocol Goto Github PK
View Code? Open in Web Editor NEWKafka protocol erlang library
License: Apache License 2.0
Kafka protocol erlang library
License: Apache License 2.0
Kafka has a setting that introduces rate limiting for clients. kafka_protocol could be more informative in the situations when this happens
SSL works without certs against port 9094(SASL_SSL), but not port 9093(SSL).
when test with
kpro_connection:start("localhost",9093,[{ssl, [{log_level,debug},{verify, verify_none}]}]).
below error happens.
=NOTICE REPORT==== 11-Jan-2023::11:05:31.056896 ===
TLS client: In state cipher at tls_record.erl:564 generated CLIENT ALERT: Fatal - Unexpected Message
- {unsupported_record_type,0}
{error,{{failed_to_upgrade_to_ssl,{tls_alert,{unexpected_message,"TLS client: In state cipher at tls_record.erl:564 generated CLIENT ALERT: Fatal - Unexpected Message\n {unsupported_record_type,0}"}}},
[{kpro_connection,maybe_upgrade_to_ssl,5,
[{file,"/mnt/code/src/kafka/kafka_protocol/src/kpro_connection.erl"},
{line,333}]},
{kpro_connection,init_connection,3,
[{file,"/mnt/code/src/kafka/kafka_protocol/src/kpro_connection.erl"},
{line,252}]},
{kpro_connection,init,4,
[{file,"/mnt/code/src/kafka/kafka_protocol/src/kpro_connection.erl"},
{line,195}]},
{proc_lib,init_p_do_apply,3,
[{file,"proc_lib.erl"},{line,226}]}]}}
The latest version supported in kafka_protocol
is 1.1
CONCURRENT_TRANSACTION as Kafka doc suggests:
The producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing
This could be caused by race condition between the commits or a bug in transaction commit handling.
This error was found in CI for PR: #72
The function kpro_connection:read_sasl_file/1
is unsafe as it may leak Kafka credentials (through badmatch
errors) if the credentials file does not contain exactly two lines.
read_sasl_file(File) ->
{ok, Bin} = file:read_file(File),
Lines = binary:split(Bin, <<"\n">>, [global]),
[User, Pass] = lists:filter(fun(Line) -> Line =/= <<>> end, Lines),
{User, Pass}.
This should be replaced with something like
read_sasl_file(File) ->
{ok, Bin} = file:read_file(File),
Lines = binary:split(Bin, <<"\n">>, [global]),
case lists:filter(fun(Line) -> Line =/= <<>> end, Lines) of
[User, Pass] -> {User, Pass};
_ -> erlang:error({invalid_sasl_file, File})
{User, Pass}.
Hello,
I am using elixir 1.13 and OTP 24.1.7 (and latest version of brod)
I am tryting to use Brod with snappyer. But seems that the instructions is to follow kafka_protocol
I tried this
Application.put_env(:kafka_protocol, :provide_compression, [{:snappyer, Arth0.Publishers.Kafka}])
and then started my producer in brod with the config
compression: "snappyer"
but I am getting this error
[error] GenServer #PID<0.845.0> terminating
** (FunctionClauseError) no function clause matching in :kpro_compress.get_module/1
(kafka_protocol 4.0.1) /Users/emmanuelpinault/podium/arth0/deps/kafka_protocol/src/kpro_compress.erl:93: :kpro_compress.get_module("snappyer")
(kafka_protocol 4.0.1) /Users/emmanuelpinault/podium/arth0/deps/kafka_protocol/src/kpro_compress.erl:86: :kpro_compress.do_compress/2
(kafka_protocol 4.0.1) /Users/emmanuelpinault/podium/arth0/deps/kafka_protocol/src/kpro_batch.erl:78: :kpro_batch.encode_tx/4
(kafka_protocol 4.0.1) /Users/emmanuelpinault/podium/arth0/deps/kafka_protocol/src/kpro_req_lib.erl:250: :kpro_req_lib.produce/5
(brod 3.16.1) /Users/emmanuelpinault/podium/arth0/deps/brod/src/brod_producer.erl:419: :brod_producer.do_send_fun/4
(brod 3.16.1) /Users/emmanuelpinault/podium/arth0/deps/brod/src/brod_producer_buffer.erl:276: :brod_producer_buffer.do_send/4
(brod 3.16.1) /Users/emmanuelpinault/podium/arth0/deps/brod/src/brod_producer.erl:525: :brod_producer.maybe_produce/1
(brod 3.16.1) /Users/emmanuelpinault/podium/arth0/deps/brod/src/brod_producer.erl:477: :brod_producer.d
````
I can t seem to figure out what the issue so far looking at the code.
Should this be topic_names
instead of topics
? https://github.com/kafka4beam/kafka_protocol/blob/master/src/kpro_req_lib.erl#L360
As per the bnf here https://github.com/kafka4beam/kafka_protocol/blob/master/priv/kafka.bnf#L3781
I'm trying to get tests working in brod
against the released version of kafka_protocol
3.1.0 and I have two tests failing because:
{{field_missing,[{stack,[{delete_topics,0},topic_names]},
{input,#{timeout => 280000,topics => [<<"brod_SUITE">>]}}]},
Why does the repo depend on 3rd crc32?
Erlang has a set of functions for crc32 .erlang:crc32/1 ,erlang:crc32/2.
This makes the code unclean.
lz4 was added in version 3.1
brod
is still not upgraded yet.
it would be nice to have lz4 ported back to 2.x version.
the lz4 support can be done as a static rebar3 dependency.
zstd is supported natively as of kafka 2.1. zstd bindings are available here. I'm not sure about the license though.
It would be great to have it supported by this library.
I'm not sure if I'm missing a dependency or what. I'll try to dig in a little.
* (UndefinedFunctionError) function :lz4b_frame.decompress/1 is undefined (module :lz4b_frame is not available)
:lz4b_frame.decompress(<<4, 34, 77, 24, 96, 64, 130, 211, 5, 0, 0, 21, 0, 1, 0, 255, 30, 10, 204, 11, 88, 71, 194, 1, 0, 0, 0, 1, 121, 216, 99, 183, 97, 0, 0, 0, 21, 123, 34, 97, 99, 99, 111, 117, 110, 116, 95, 105, 100, 34, ...>>)
(kafka_protocol 4.0.1) /Users/tpitale/Development/dm/dm-service/deps/kafka_protocol/src/kpro_compress.erl:91: :kpro_compress.do_decompress/2
(kafka_protocol 4.0.1) /Users/tpitale/Development/dm/dm-service/deps/kafka_protocol/src/kpro_batch_v01.erl:126: :kpro_batch_v01.decode/3
(kafka_protocol 4.0.1) /Users/tpitale/Development/dm/dm-service/deps/kafka_protocol/src/kpro_batch.erl:128: :kpro_batch.decode/2
(brod 3.16.1) /Users/tpitale/Development/dm/dm-service/deps/brod/src/brod_utils.erl:718: :brod_utils.parse_fetch_rsp/1
(brod 3.16.1) /Users/tpitale/Development/dm/dm-service/deps/brod/src/brod_utils.erl:558: :brod_utils.parse_rsp/1
(brod 3.16.1) /Users/tpitale/Development/dm/dm-service/deps/brod/src/brod_consumer.erl:440: :brod_consumer.handle_fetch_response/2
(stdlib 3.14) gen_server.erl:689: :gen_server.try_dispatch/4
(stdlib 3.14) gen_server.erl:765: :gen_server.handle_msg/6
(stdlib 3.14) proc_lib.erl:226: :proc_lib.init_p_do_apply/3
Builds on OTP-23 are crashing, see, eg, https://travis-ci.org/github/klarna/kafka_protocol/jobs/760231865
Can probably be fixed with some macro magic, see, eg, https://github.com/epgsql/epgsql/blob/f811a09926892dbd1359afe44a9bfa8f6907b322/src/epgsql_scram.erl#L119-L130
I have a problem connecting to a Kafka 2.5.0 Server.
Just to set the stage right, I'm trying to connect to a server that has OAUTHBEARER as the SASL mechanism. We are trying to use a custom handler to support this auth model but the client fails before reaching that during querying version
I'm trying to connect to Mercedes-Benz's Kafka(https://developer.mercedes-benz.com/products/connect_your_fleet/docs)
I'll share as much of public information that I can:
# Erlang version:
1> Bootstrap = [{<<"bootstrap.msg-stream.connect-business.net">>, 443}].
2> Client = my_kafka_client.
3> Opts = [{query_api_versions, true}, {ssl, [{cacertfile, <<"kafka-cluster-external-prod-kafka.pem">>}]}].
4> brod:start_client(Bootstrap, Client, Opts).
# Elixir version:
iex(1)> (1bootstrap = [{"bootstrap.msg-stream.connect-business.net", 443}]
iex(2)> opts = [{:query_api_versions, true}, {:ssl, [{:cacertfile, "kafka-cluster-external-prod-kafka.pem"}]}]
iex(3)> :brod.start_client(bootstrap, :my_kafka_client, opts)
:ok
10:39:53.233 [info] :supervisor: {:local, :brod_sup}
:started: [
pid: #PID<0.208.0>,
id: :my_kafka_client,
mfargs: {:brod_client, :start_link,
[
[{"bootstrap.msg-stream.connect-business.net", 443}],
:my_kafka_client,
[query_api_versions: true, ssl: [cacertfile: "kafka-cluster-external-prod-kafka.pem"]]
]},
restart_type: {:permanent, 10},
shutdown: 5000,
child_type: :worker
]
iex(4)> CorrId : 1716395828
ReqIoData : [<<0,18>>,
<<0,0>>,
<<102,78,31,52>>,
[<<0,15>>,<<"my_kafka_client">>],
[]]
10:39:58.589 [warn] :brod_client [#PID<0.208.0>] :my_kafka_client is terminating
reason: [
{{"bootstrap.msg-stream.connect-business.net", 443},
{{{:kpro_req, #Reference<0.602024089.3602382851.100616>, :api_versions, 0,
false, []}, :timeout},
[
{:kpro_lib, :send_and_recv_raw, 4, [file: 'src/kpro_lib.erl', line: 70]},
{:kpro_lib, :send_and_recv, 5, [file: 'src/kpro_lib.erl', line: 83]},
{:kpro_connection, :query_api_versions, 4,
[file: 'src/kpro_connection.erl', line: 246]},
{:kpro_connection, :init_connection, 2,
[file: 'src/kpro_connection.erl', line: 233]},
{:kpro_connection, :init, 4, [file: 'src/kpro_connection.erl', line: 170]},
{:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 226]}
]}}
]
FYI I added few io:format in src/kpro_lib.erl
to see the data that goes through the wire
-spec send_and_recv(kpro:req(), port(), module(),
kpro:client_id(), timeout()) -> kpro:struct().
send_and_recv(#kpro_req{api = API, vsn = Vsn} = Req,
Sock, Mod, ClientId, Timeout) ->
CorrId = make_corr_id(),
io:format("CorrId : ~p~n",[CorrId]),
ReqIoData = kpro_req_lib:encode(ClientId, CorrId, Req),
io:format("ReqIoData : ~p~n",[ReqIoData]),
try
RspBin = send_and_recv_raw(ReqIoData, Sock, Mod, Timeout),
io:format("RspBin : ~p~n",[RspBin]),
{CorrId, Body} = decode_corr_id(RspBin), %% assert match CorrId
io:format("Body : ~p~n",[Body]),
#kpro_rsp{api = API, vsn = Vsn, msg = Msg} = %% assert match API and Vsn
At the end I should mentioned that I'm able to connect to this server using Kafka-python library.
Hi,
I found out you already fixed crc32cer to support alpine image, and bumped dependency on master here.
Can i ask if you will release 2.1.2 and also brod 3.6.1 so it will work in alpine based images?
Hello,
When using brod 3.15.0 to connect to a topic, I get the following error:
** Reason for termination ==
** {{{{no_such_field,topic_metadata},
[{kpro_lib,find,2,
[{file,"kpro_lib.erl"},
{line,266}]}...
Is it because topic_metadata isn't supported in kafka version 1.1?
Thanks for bringing kafka to the erlang ecosystem!
Oscar.
Hi! I am using kafka_protocol
through elsa
, and can't get a trivial producer working because of this error:
iex(1)> Elsa.produce(:driver_telemetry_connection, "driver-telemetry-events", "value")
[debug] Elixir.Elsa.Producer Sending 1 messages to driver-telemetry-events:0
[info] client :driver_telemetry_connection connected to kafka:9092
[error] GenServer #PID<0.337.0> terminating
** (stop) {:crc32cer_nif_not_loaded, '/app/_build/dev/lib/crc32cer/priv/crc32cer'}
:erlang.nif_error({:crc32cer_nif_not_loaded, '/app/_build/dev/lib/crc32cer/priv/crc32cer'})
(crc32cer) src/crc32cer.erl:23: :crc32cer.nif/2
...
I wanted to report it to crc32cer
, but the project does not have issues enabled, and the author has no email in his GitHub page, so asking here in case you have ever seen this before.
I am a bit lost. This is happening in a very simple Debian Stretch Docker container. Everything compiled inside just fine, and the priv
directory of that lib is
# ls _build/dev/lib/crc32cer/priv
crc32cer.so crc32cer_drv.so
Does it ring a bell by any chance?
The documentation of brod
is much nicer now when you use ExDoc (I love it!). I think it would be great if kafka_protocol
used it as well.
Aside from UI/UX, it would make links (particularly for types) from brod
's documentation to kafka_protocol
's one to work better. ExDoc uses a different format of anchors than EDoc does, so for example the link https://hexdocs.pm/kafka_protocol/kpro.html#t:connection/0 opens the page at the top and does not jump to directly to the connection/0
type so one has to find it manually.
according to the kafka protocol doc: https://kafka.apache.org/protocol#protocol_types
"bytes" should be encoded as:
Represents a raw sequence of bytes. First the length N is given as an INT32. Then N bytes follow.
But I can see that there is a special case in the library where it encode 0 bytes as <<-1:32/signed-integer>>
(Ref), but not [<<0:32>>, <<>>]
as what the doc says.
I am seeing the kafka broker rejecting a sasl_authenticate
v0 request call with auth_bytes
having 0 bytes content.
Is the special 0 bytes encoding behavior intended for some other case?
Hello, i open this as an issue because i suppose the dependency on the old rebar is due to internal process at Klarna.
Brod already use only rebar3 and kafka_protocol can use rebar3. Is there a reason you are still using both ?
I ask because rebar does not compile nicely on otp23 and this is causing all kind of pain.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.