Code Monkey home page Code Monkey logo

kafka_protocol's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka_protocol's Issues

TLS client: In state cipher at tls_record.erl:564 generated CLIENT ALERT: Fatal - Unexpected Message

SSL works without certs against port 9094(SASL_SSL), but not port 9093(SSL).

when test with
kpro_connection:start("localhost",9093,[{ssl, [{log_level,debug},{verify, verify_none}]}]).

below error happens.

=NOTICE REPORT==== 11-Jan-2023::11:05:31.056896 ===
TLS client: In state cipher at tls_record.erl:564 generated CLIENT ALERT: Fatal - Unexpected Message
 - {unsupported_record_type,0}
{error,{{failed_to_upgrade_to_ssl,{tls_alert,{unexpected_message,"TLS client: In state cipher at tls_record.erl:564 generated CLIENT ALERT: Fatal - Unexpected Message\n {unsupported_record_type,0}"}}},
        [{kpro_connection,maybe_upgrade_to_ssl,5,
                          [{file,"/mnt/code/src/kafka/kafka_protocol/src/kpro_connection.erl"},
                           {line,333}]},
         {kpro_connection,init_connection,3,
                          [{file,"/mnt/code/src/kafka/kafka_protocol/src/kpro_connection.erl"},
                           {line,252}]},
         {kpro_connection,init,4,
                          [{file,"/mnt/code/src/kafka/kafka_protocol/src/kpro_connection.erl"},
                           {line,195}]},
         {proc_lib,init_p_do_apply,3,
                   [{file,"proc_lib.erl"},{line,226}]}]}}

test_txn_produce_2 fail with concurrent_transactions

CONCURRENT_TRANSACTION as Kafka doc suggests:

The producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing

This could be caused by race condition between the commits or a bug in transaction commit handling.
This error was found in CI for PR: #72

kpro_connection:read_sasl_file/1 will leak credentials if file does not contain exactly two lines

The function kpro_connection:read_sasl_file/1 is unsafe as it may leak Kafka credentials (through badmatch errors) if the credentials file does not contain exactly two lines.

read_sasl_file(File) ->
  {ok, Bin} = file:read_file(File),
  Lines = binary:split(Bin, <<"\n">>, [global]),
  [User, Pass] = lists:filter(fun(Line) -> Line =/= <<>> end, Lines),
  {User, Pass}.

This should be replaced with something like

read_sasl_file(File) ->
  {ok, Bin} = file:read_file(File),
  Lines = binary:split(Bin, <<"\n">>, [global]),
  case lists:filter(fun(Line) -> Line =/= <<>> end, Lines) of
    [User, Pass] -> {User, Pass};
    _ -> erlang:error({invalid_sasl_file, File})
  {User, Pass}.

Issue with Snappyer

Hello,

I am using elixir 1.13 and OTP 24.1.7 (and latest version of brod)

I am tryting to use Brod with snappyer. But seems that the instructions is to follow kafka_protocol

I tried this

Application.put_env(:kafka_protocol, :provide_compression, [{:snappyer, Arth0.Publishers.Kafka}])

and then started my producer in brod with the config

compression: "snappyer"

but I am getting this error

[error] GenServer #PID<0.845.0> terminating
** (FunctionClauseError) no function clause matching in :kpro_compress.get_module/1
    (kafka_protocol 4.0.1) /Users/emmanuelpinault/podium/arth0/deps/kafka_protocol/src/kpro_compress.erl:93: :kpro_compress.get_module("snappyer")
    (kafka_protocol 4.0.1) /Users/emmanuelpinault/podium/arth0/deps/kafka_protocol/src/kpro_compress.erl:86: :kpro_compress.do_compress/2
    (kafka_protocol 4.0.1) /Users/emmanuelpinault/podium/arth0/deps/kafka_protocol/src/kpro_batch.erl:78: :kpro_batch.encode_tx/4
    (kafka_protocol 4.0.1) /Users/emmanuelpinault/podium/arth0/deps/kafka_protocol/src/kpro_req_lib.erl:250: :kpro_req_lib.produce/5
    (brod 3.16.1) /Users/emmanuelpinault/podium/arth0/deps/brod/src/brod_producer.erl:419: :brod_producer.do_send_fun/4
    (brod 3.16.1) /Users/emmanuelpinault/podium/arth0/deps/brod/src/brod_producer_buffer.erl:276: :brod_producer_buffer.do_send/4
    (brod 3.16.1) /Users/emmanuelpinault/podium/arth0/deps/brod/src/brod_producer.erl:525: :brod_producer.maybe_produce/1
    (brod 3.16.1) /Users/emmanuelpinault/podium/arth0/deps/brod/src/brod_producer.erl:477: :brod_producer.d
    ````
    
    I can t seem to figure out what the issue so far looking at the code. 

Wrong key for delete_topics

Should this be topic_names instead of topics? https://github.com/kafka4beam/kafka_protocol/blob/master/src/kpro_req_lib.erl#L360

As per the bnf here https://github.com/kafka4beam/kafka_protocol/blob/master/priv/kafka.bnf#L3781

I'm trying to get tests working in brod against the released version of kafka_protocol 3.1.0 and I have two tests failing because:

{{field_missing,[{stack,[{delete_topics,0},topic_names]},
                 {input,#{timeout => 280000,topics => [<<"brod_SUITE">>]}}]},

Add zstd support

zstd is supported natively as of kafka 2.1. zstd bindings are available here. I'm not sure about the license though.

It would be great to have it supported by this library.

Undefined module lz4b_frame

I'm not sure if I'm missing a dependency or what. I'll try to dig in a little.

* (UndefinedFunctionError) function :lz4b_frame.decompress/1 is undefined (module :lz4b_frame is not available)
    :lz4b_frame.decompress(<<4, 34, 77, 24, 96, 64, 130, 211, 5, 0, 0, 21, 0, 1, 0, 255, 30, 10, 204, 11, 88, 71, 194, 1, 0, 0, 0, 1, 121, 216, 99, 183, 97, 0, 0, 0, 21, 123, 34, 97, 99, 99, 111, 117, 110, 116, 95, 105, 100, 34, ...>>)
    (kafka_protocol 4.0.1) /Users/tpitale/Development/dm/dm-service/deps/kafka_protocol/src/kpro_compress.erl:91: :kpro_compress.do_decompress/2
    (kafka_protocol 4.0.1) /Users/tpitale/Development/dm/dm-service/deps/kafka_protocol/src/kpro_batch_v01.erl:126: :kpro_batch_v01.decode/3
    (kafka_protocol 4.0.1) /Users/tpitale/Development/dm/dm-service/deps/kafka_protocol/src/kpro_batch.erl:128: :kpro_batch.decode/2
    (brod 3.16.1) /Users/tpitale/Development/dm/dm-service/deps/brod/src/brod_utils.erl:718: :brod_utils.parse_fetch_rsp/1
    (brod 3.16.1) /Users/tpitale/Development/dm/dm-service/deps/brod/src/brod_utils.erl:558: :brod_utils.parse_rsp/1
    (brod 3.16.1) /Users/tpitale/Development/dm/dm-service/deps/brod/src/brod_consumer.erl:440: :brod_consumer.handle_fetch_response/2
    (stdlib 3.14) gen_server.erl:689: :gen_server.try_dispatch/4
    (stdlib 3.14) gen_server.erl:765: :gen_server.handle_msg/6
    (stdlib 3.14) proc_lib.erl:226: :proc_lib.init_p_do_apply/3

crypto:hmac is deprecated

Get timeout on querying version from Kafka Server

I have a problem connecting to a Kafka 2.5.0 Server.

Just to set the stage right, I'm trying to connect to a server that has OAUTHBEARER as the SASL mechanism. We are trying to use a custom handler to support this auth model but the client fails before reaching that during querying version

I'm trying to connect to Mercedes-Benz's Kafka(https://developer.mercedes-benz.com/products/connect_your_fleet/docs)

I'll share as much of public information that I can:

  1. Host:PORT: bootstrap.msg-stream.connect-business.net:443
  2. Certfile: is downloadable from Firefox by visiting https://bootstrap.msg-stream.connect-business.net

download cert 01

download cert 02

# Erlang version:
1> Bootstrap = [{<<"bootstrap.msg-stream.connect-business.net">>, 443}].
2> Client = my_kafka_client.
3> Opts = [{query_api_versions, true}, {ssl, [{cacertfile, <<"kafka-cluster-external-prod-kafka.pem">>}]}].
4> brod:start_client(Bootstrap, Client, Opts).


# Elixir version:
iex(1)> (1bootstrap = [{"bootstrap.msg-stream.connect-business.net", 443}]
iex(2)> opts = [{:query_api_versions, true}, {:ssl, [{:cacertfile, "kafka-cluster-external-prod-kafka.pem"}]}]
iex(3)> :brod.start_client(bootstrap, :my_kafka_client, opts)
:ok

10:39:53.233 [info]      :supervisor: {:local, :brod_sup}
    :started: [
  pid: #PID<0.208.0>,
  id: :my_kafka_client,
  mfargs: {:brod_client, :start_link,
   [
     [{"bootstrap.msg-stream.connect-business.net", 443}],
     :my_kafka_client,
     [query_api_versions: true, ssl: [cacertfile: "kafka-cluster-external-prod-kafka.pem"]]
   ]},
  restart_type: {:permanent, 10},
  shutdown: 5000,
  child_type: :worker
]
iex(4)> CorrId : 1716395828
ReqIoData : [<<0,18>>,
             <<0,0>>,
             <<102,78,31,52>>,
             [<<0,15>>,<<"my_kafka_client">>],
             []]

10:39:58.589 [warn]  :brod_client [#PID<0.208.0>] :my_kafka_client is terminating
reason: [
  {{"bootstrap.msg-stream.connect-business.net", 443},
   {{{:kpro_req, #Reference<0.602024089.3602382851.100616>, :api_versions, 0,
      false, []}, :timeout},
    [
      {:kpro_lib, :send_and_recv_raw, 4, [file: 'src/kpro_lib.erl', line: 70]},
      {:kpro_lib, :send_and_recv, 5, [file: 'src/kpro_lib.erl', line: 83]},
      {:kpro_connection, :query_api_versions, 4,
       [file: 'src/kpro_connection.erl', line: 246]},
      {:kpro_connection, :init_connection, 2,
       [file: 'src/kpro_connection.erl', line: 233]},
      {:kpro_connection, :init, 4, [file: 'src/kpro_connection.erl', line: 170]},
      {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 226]}
    ]}}
]

FYI I added few io:format in src/kpro_lib.erl to see the data that goes through the wire

-spec send_and_recv(kpro:req(), port(), module(),
                    kpro:client_id(), timeout()) -> kpro:struct().
send_and_recv(#kpro_req{api = API, vsn = Vsn} = Req,
                 Sock, Mod, ClientId, Timeout) ->
  CorrId = make_corr_id(),
  io:format("CorrId : ~p~n",[CorrId]),
  ReqIoData = kpro_req_lib:encode(ClientId, CorrId, Req),
  io:format("ReqIoData : ~p~n",[ReqIoData]),
  try
    RspBin = send_and_recv_raw(ReqIoData, Sock, Mod, Timeout),
    io:format("RspBin : ~p~n",[RspBin]),
    {CorrId, Body} = decode_corr_id(RspBin), %% assert match CorrId
    io:format("Body : ~p~n",[Body]),
    #kpro_rsp{api = API, vsn = Vsn, msg = Msg} = %% assert match API and Vsn

At the end I should mentioned that I'm able to connect to this server using Kafka-python library.

Release new minor for alpine compatibility

Hi,
I found out you already fixed crc32cer to support alpine image, and bumped dependency on master here.
Can i ask if you will release 2.1.2 and also brod 3.6.1 so it will work in alpine based images?

No such field: topic_metadata

Hello,
When using brod 3.15.0 to connect to a topic, I get the following error:

** Reason for termination ==
** {{{{no_such_field,topic_metadata},
[{kpro_lib,find,2,
[{file,"kpro_lib.erl"},
{line,266}]}...

Is it because topic_metadata isn't supported in kafka version 1.1?

Thanks for bringing kafka to the erlang ecosystem!
Oscar.

crc32cer_nif_not_loaded

Hi! I am using kafka_protocol through elsa, and can't get a trivial producer working because of this error:

iex(1)> Elsa.produce(:driver_telemetry_connection, "driver-telemetry-events", "value")
[debug] Elixir.Elsa.Producer Sending 1 messages to driver-telemetry-events:0
[info] client :driver_telemetry_connection connected to kafka:9092

[error] GenServer #PID<0.337.0> terminating
** (stop) {:crc32cer_nif_not_loaded, '/app/_build/dev/lib/crc32cer/priv/crc32cer'}
    :erlang.nif_error({:crc32cer_nif_not_loaded, '/app/_build/dev/lib/crc32cer/priv/crc32cer'})
    (crc32cer) src/crc32cer.erl:23: :crc32cer.nif/2
...

I wanted to report it to crc32cer, but the project does not have issues enabled, and the author has no email in his GitHub page, so asking here in case you have ever seen this before.

I am a bit lost. This is happening in a very simple Debian Stretch Docker container. Everything compiled inside just fine, and the priv directory of that lib is

# ls _build/dev/lib/crc32cer/priv
crc32cer.so  crc32cer_drv.so

Does it ring a bell by any chance?

Use ExDoc

The documentation of brod is much nicer now when you use ExDoc (I love it!). I think it would be great if kafka_protocol used it as well.

Aside from UI/UX, it would make links (particularly for types) from brod's documentation to kafka_protocol's one to work better. ExDoc uses a different format of anchors than EDoc does, so for example the link https://hexdocs.pm/kafka_protocol/kpro.html#t:connection/0 opens the page at the top and does not jump to directly to the connection/0 type so one has to find it manually.

Encoding 0 bytes behavior

according to the kafka protocol doc: https://kafka.apache.org/protocol#protocol_types
"bytes" should be encoded as:

Represents a raw sequence of bytes. First the length N is given as an INT32. Then N bytes follow.

But I can see that there is a special case in the library where it encode 0 bytes as <<-1:32/signed-integer>> (Ref), but not [<<0:32>>, <<>>] as what the doc says.

I am seeing the kafka broker rejecting a sasl_authenticate v0 request call with auth_bytes having 0 bytes content.

Is the special 0 bytes encoding behavior intended for some other case?

Drop Rebar dependency

Hello, i open this as an issue because i suppose the dependency on the old rebar is due to internal process at Klarna.

Brod already use only rebar3 and kafka_protocol can use rebar3. Is there a reason you are still using both ?

I ask because rebar does not compile nicely on otp23 and this is causing all kind of pain.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.