kafka4beam / brod_gssapi Goto Github PK
View Code? Open in Web Editor NEWSASL GSSAPI auth backend for brod (https://github.com/klarna/brod)
License: Apache License 2.0
SASL GSSAPI auth backend for brod (https://github.com/klarna/brod)
License: Apache License 2.0
.app.src has below entries missing:
{applications,[kernel,stdlib,sasl_auth]}, % otherwise release tools won't pack it (and its deps) when creating a release.
{modules, []} % some build tools require this skeleton to fill-up the empty list at compile time.
And I would recommend a semver version number 0.1.0 instead of 0.1.
Same issue goes to sasl_auth
Currently this plugin does not support handshake v0. It supports sasl auth prior to the handshake being introduced in kafka protocol, we currently call this module brod_gssapi_v0
, which is wrong, it should perhaps be called brod_gssapi_legacy
.
brod_gssapi_v1
which supports handshake version 1, should be renamed to something that doesn't paint it into a v1 corner. In other words, it should be able to work with v0 or v1 handshakes.
Reference:
https://kafka.apache.org/protocol.html#sasl_handshake
https://cwiki.apache.org/confluence/display/KAFKA/KIP-43%3A+Kafka+SASL+enhancements
Checkout PR #6
cd brod_gssapi/example
./up
Check that the Kafka instance is functional and that SASL GSSAPI (Kerberos) authentication works by running ./test_send_receive.sh
Run the example brod_gssapi
project in example/brod_client
running docker-compose up brod_client
You will now see that the Erlang code crashes with the Error message
brod_client | ===> Verifying dependencies...
brod_client | ===> Upgrading brod_gssapi (from {path,"../../",{mtime,<<"2022-06-03T12:23:57Z">>}})
brod_client | ===> Cleaning out example...
brod_client | ===> Verifying dependencies...
brod_client | ===> Upgrading brod_gssapi (from {path,"../../",{mtime,<<"2022-06-03T12:23:57Z">>}})
brod_client | ===> Analyzing applications...
brod_client | ===> Compiling brod_gssapi
brod_client | ===> Compiling brod
brod_client | ===> Analyzing applications...
brod_client | ===> Compiling example
brod_client | Args: []
brod_client | {going_to_connect,[{"kafka.kerberos-demo.local",9093}]}
brod_client | =ERROR REPORT==== 3-Jun-2022::13:35:33.385727 ===
brod_client | ** Generic server client1 terminating
brod_client | ** Last message in was init
brod_client | ** When Server state == {state,client1,
brod_client | [{"kafka.kerberos-demo.local",9093}],
brod_client | undefined,[],undefined,undefined,
brod_client | [{sasl,
brod_client | {callback,brod_gssapi_v1,
brod_client | {gssapi,<<"/var/lib/secret/rig.key">>,
brod_client | <<"[email protected]">>}}}],
brod_client | client1}
brod_client | ** Reason for termination ==
brod_client | ** {[{{"kafka.kerberos-demo.local",9093},
brod_client | {{{kpro_req,#Ref<0.3212240052.812122119.53473>,sasl_authenticate,1,
brod_client | false,
brod_client | [<<"����">>]},
brod_client | closed},
brod_client | [{kpro_lib,send_and_recv_raw,4,
brod_client | [{file,"/opt/brod_gssapi/example/brod_client/_build/default/lib/kafka_protocol/src/kpro_lib.erl"},
brod_client | {line,70}]},
brod_client | {kpro_lib,send_and_recv,5,
brod_client | [{file,"/opt/brod_gssapi/example/brod_client/_build/default/lib/kafka_protocol/src/kpro_lib.erl"},
brod_client | {line,81}]},
brod_client | {brod_gssapi_v1,send_sasl_token,6,
brod_client | [{file,"/opt/brod_gssapi/example/brod_client/_build/default/lib/brod_gssapi/src/brod_gssapi_v1.erl"},
brod_client | {line,129}]},
brod_client | {brod_gssapi_v1,'-sasl_recv/7-fun-0-',7,
brod_client | [{file,"/opt/brod_gssapi/example/brod_client/_build/default/lib/brod_gssapi/src/brod_gssapi_v1.erl"},
brod_client | {line,78}]},
brod_client | {brod_gssapi_v1,do_while,1,
brod_client | [{file,"/opt/brod_gssapi/example/brod_client/_build/default/lib/brod_gssapi/src/brod_gssapi_v1.erl"},
brod_client | {line,113}]},
brod_client | {brod_gssapi_v1,sasl_recv,7,
brod_client | [{file,"/opt/brod_gssapi/example/brod_client/_build/default/lib/brod_gssapi/src/brod_gssapi_v1.erl"},
brod_client | {line,84}]},
brod_client | {kpro_sasl,auth,7,
brod_client | [{file,"/opt/brod_gssapi/example/brod_client/_build/default/lib/kafka_protocol/src/kpro_sasl.erl"},
brod_client | {line,38}]},
brod_client | {kpro_connection,init_connection,2,
brod_client | [{file,"/opt/brod_gssapi/example/brod_client/_build/default/lib/kafka_protocol/src/kpro_connection.erl"},
brod_client | {line,240}]}]}}],
brod_client | [{brod_client,ensure_metadata_connection,1,
brod_client | [{file,"/opt/brod_gssapi/example/brod_client/_build/default/lib/brod/src/brod_client.erl"},
brod_client | {line,570}]},
brod_client | {brod_client,handle_info,2,
brod_client | [{file,"/opt/brod_gssapi/example/brod_client/_build/default/lib/brod/src/brod_client.erl"},
brod_client | {line,298}]},
brod_client | {gen_server,try_dispatch,4,[{file,"gen_server.erl"},{line,695}]},
brod_client | {gen_server,handle_msg,6,[{file,"gen_server.erl"},{line,771}]},
brod_client | {proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,226}]}]}
brod_client |
brod_client | {"init terminating in do_boot",{{badmatch,{error,{client_down,[{{"kafka.kerberos-demo.local",9093},{{{kpro_req,#Ref<0.3212240052.812122119.53473>,sasl_authenticate,1,false,[<<255,255,255,255>>]},closed},[{kpro_lib,send_and_recv_raw,4,[{file,"/opt/brod_gssapi/example/brod_client/_build/default/lib/kafka_protocol/src/kpro_lib.erl"},{line,70}]},{kpro_lib,send_and_recv,5,[{file,"/opt/brod_gssapi/example/brod_client/_build/default/lib/kafka_protocol/src/kpro_lib.erl"},{line,81}]},{brod_gssapi_v1,send_sasl_token,6,[{file,"/opt/brod_gssapi/example/brod_client/_build/default/lib/brod_gssapi/src/brod_gssapi_v1.erl"},{line,129}]},{brod_gssapi_v1,'-sasl_recv/7-fun-0-',7,[{file,"/opt/brod_gssapi/example/brod_client/_build/default/lib/brod_gssapi/src/brod_gssapi_v1.erl"},{line,78}]},{brod_gssapi_v1,do_while,1,[{file,"/opt/brod_gssapi/example/brod_client/_build/default/lib/brod_gssapi/src/brod_gssapi_v1.erl"},{line,113}]},{brod_gssapi_v1,sasl_recv,7,[{file,"/opt/brod_gssapi/example/brod_client/_build/default/lib/brod_gssapi/src/brod_gssapi_v1.erl"},{line,84}]},{kpro_sasl,auth,7,[{file,"/opt/brod_gssapi/example/brod_client/_build/default/lib/kafka_protocol/src/kpro_sasl.erl"},{line,38}]},{kpro_connection,init_connection,2,[{file,"/opt/brod_gssapi/example/brod_client/_build/default/lib/kafka_protocol/src/kpro_connection.erl"},{line,240}]}]}}]}}},[{example,main,1,[{file,"/opt/brod_gssapi/example/brod_client/src/example.erl"},{line,22}]},{erl_eval,do_apply,6,[{file,"erl_eval.erl"},{line,685}]},{init,start_it,1,[]},{init,start_em,1,[]},{init,do_boot,3,[]}]}}
brod_client | init terminating in do_boot ({{badmatch,{error,{client_down,[_]}}},[{example,main,1,[{_},{_}]},{erl_eval,do_apply,6,[{_},{_}]},{init,start_it,1,[]},{init,start_em,1,[]},{init,do_boot,3,[]}]})
brod_client |
brod_client | Crash dump is being written to: erl_crash.dump...done
brod_client exited with code 1
The Kafka logs with authentication debugging turned on can be seen by running docker-compose logs kafka
. The relevant part can be seen here:
kafka | [2022-06-03 13:35:33,367] DEBUG Accepted connection from /192.168.160.6:55466 on /192.168.160.4:9093 and assigned it to processor 2, sendBufferSize [actual|requested]: [102400|102400] recvBufferSize [actual|requested]: [102400|102400] (kafka.network.Acceptor)
kafka | [2022-06-03 13:35:33,367] DEBUG Processor 2 listening to new connection from /192.168.160.6:55466 (kafka.network.Processor)
kafka | [2022-06-03 13:35:33,367] DEBUG connections.max.reauth.ms for mechanism=GSSAPI: 0 (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
kafka | [2022-06-03 13:35:33,370] DEBUG Set SASL server state to HANDSHAKE_OR_VERSIONS_REQUEST during authentication (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
kafka | [2022-06-03 13:35:33,370] DEBUG Handling Kafka request API_VERSIONS during authentication (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
kafka | [2022-06-03 13:35:33,371] DEBUG Set SASL server state to HANDSHAKE_REQUEST during authentication (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
kafka | [2022-06-03 13:35:33,380] DEBUG Handling Kafka request SASL_HANDSHAKE during authentication (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
kafka | [2022-06-03 13:35:33,380] DEBUG Using SASL mechanism 'GSSAPI' provided by client (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
kafka | [2022-06-03 13:35:33,380] DEBUG Creating SaslServer for kafka/[email protected] with mechanism GSSAPI (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
kafka | Found KeyTab /var/lib/secret/kafka.key for kafka/[email protected]
kafka | Found ticket for kafka/[email protected] to go to krbtgt/[email protected] expiring on Sat Jun 04 13:34:18 UTC 2022
kafka | [2022-06-03 13:35:33,381] DEBUG Set SASL server state to AUTHENTICATE during authentication (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
kafka | Entered Krb5Context.acceptSecContext with state=STATE_NEW
kafka | Looking for keys for: kafka/[email protected]
kafka | Added key: 17version: 1
kafka | Added key: 18version: 1
kafka | >>> EType: sun.security.krb5.internal.crypto.Aes256CtsHmacSha1EType
kafka | Using builtin default etypes for permitted_enctypes
kafka | default etypes for permitted_enctypes: 18 17 20 19 16 23.
kafka | >>> EType: sun.security.krb5.internal.crypto.Aes256CtsHmacSha1EType
kafka | MemoryCache: add 1654263333/378345/44366FF2746B2515F667914D7E95B8E60BE6CC8C732918E76AE309D45B3AAFB5/[email protected] to [email protected]|kafka/[email protected]
kafka | >>> KrbApReq: authenticate succeed.
kafka | Krb5Context setting peerSeqNumber to: 539147621
kafka | >>> EType: sun.security.krb5.internal.crypto.Aes256CtsHmacSha1EType
kafka | Krb5Context setting mySeqNumber to: 839975820
kafka | [2022-06-03 13:35:33,384] DEBUG Failed during authentication: Error getting request for apiKey: SASL_AUTHENTICATE, apiVersion: 1, connectionId: 192.168.160.4:9093-192.168.160.6:55466-9, listenerName: ListenerName(SASL_PLAINTEXT), principal: User:ANONYMOUS (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
kafka | [2022-06-03 13:35:33,384] WARN [SocketServer listenerType=ZK_BROKER, nodeId=0] Unexpected error from /192.168.160.6; closing connection (org.apache.kafka.common.network.Selector)
kafka | org.apache.kafka.common.errors.InvalidRequestException: Error getting request for apiKey: SASL_AUTHENTICATE, apiVersion: 1, connectionId: 192.168.160.4:9093-192.168.160.6:55466-9, listenerName: ListenerName(SASL_PLAINTEXT), principal: User:ANONYMOUS
kafka | Caused by: java.lang.RuntimeException: non-nullable field authBytes was serialized as null
kafka | at org.apache.kafka.common.message.SaslAuthenticateRequestData.read(SaslAuthenticateRequestData.java:103)
kafka | at org.apache.kafka.common.message.SaslAuthenticateRequestData.<init>(SaslAuthenticateRequestData.java:71)
kafka | at org.apache.kafka.common.requests.SaslAuthenticateRequest.parse(SaslAuthenticateRequest.java:78)
kafka | at org.apache.kafka.common.requests.AbstractRequest.doParseRequest(AbstractRequest.java:230)
kafka | at org.apache.kafka.common.requests.AbstractRequest.parseRequest(AbstractRequest.java:152)
kafka | at org.apache.kafka.common.requests.RequestContext.parseRequest(RequestContext.java:95)
kafka | at org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.handleSaslToken(SaslServerAuthenticator.java:430)
kafka | at org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:280)
kafka | at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:181)
kafka | at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543)
kafka | at org.apache.kafka.common.network.Selector.poll(Selector.java:481)
kafka | at kafka.network.Processor.poll(SocketServer.scala:989)
kafka | at kafka.network.Processor.run(SocketServer.scala:892)
kafka | at java.base/java.lang.Thread.run(Thread.java:829)
This might be related to problems described here: kafka4beam/brod#383
@starbelly , @vikas15bhardwaj do you have any ideas about what the problem might be and how to fix it?
Opening this issue as a way for everyone to stay on the same page.
Have published brod_gssapi v0.1.0-rc0 to hex, HCA will begin vetting this today or tomorrow. Has there been other vetting?
A PR #16 has been opened to switch to using to git for fetching the version to use when publishing for tags. I'm ok with hard code too (normally what I do tbh). If we do stick with git vsn via tags, then our tags must conform to semantic versioning (note how the current tags do not).
@kjellwinblad or @zmstone since I do not have access to create tags (I believe), can you create one an v0.1.0-rc0 tag?
Finally, I'll add you two and Vikas as owners on hex for this package right now.
This is a low priority, as it's simply aesthetic, but the test suite for brod_gssapi_v1 is a bit hairy.
Only new versions of kafka_protocol support the auth plugin interface where the handshake version is passed to the plugin. We should have test cases for both versions of the plugin interface. The test cases in the example directory can be extended to accomplish that.
There are three handshake versions undefined, 0 and 1 (see #5 (comment)). @starbelly, please correct me if I did not understand this correctly. We should add Kafka test cases and github actions for all handshake versions by extending the code in the example directory.
There is currently a configuration option for setting the authentication module to use when the handshake version is undefined. This option should be documented in the README.md.
Here is where the option is used:
brod_gssapi/src/brod_gssapi.erl
Line 79 in d4d41f9
Hi,
Sorry, if I'm not providing the right detail, I'm fairly new at this and it's the first time I'm trying to consume from kafka through kerberos authentication.
I used the code under example.erl as a reference to built mine.
That realm is defined under my krb5.conf, which is located under /etc/
I have also set this environment variable: export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf"
Do I need to load this file somewhere else?
After running the last line of code, I get the error below:
{{sasl_auth_error,{sasl_fail,<<"SASL(-1): generic failure: GSSAPI Error: Unspecified GSS failure. Minor code may provide more information (Cannot contact any KDC for realm 'BELLDEV.DEV.BCE.CA')">>}},
[{kpro_sasl,auth,7,
[{file,"/home/med/bm/ossfm/server/fm/_build/default/lib/kafka_protocol/src/kpro_sasl.erl"},
{line,43}]},
{kpro_connection,init_connection,2,
[{file,"/home/med/bm/ossfm/server/fm/_build/default/lib/kafka_protocol/src/kpro_connection.erl"},
{line,240}]},
{kpro_connection,init,4,
[{file,"/home/med/bm/ossfm/server/fm/_build/default/lib/kafka_protocol/src/kpro_connection.erl"},
{line,170}]},
{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,247}]}]}}],
[{brod_client,ensure_metadata_connection,1,
[{file,"/home/med/bm/ossfm/server/fm/_build/default/lib/brod/src/brod_client.erl"},
{line,554}]},
{brod_client,handle_info,2,
[{file,"/home/med/bm/ossfm/server/fm/_build/default/lib/brod/src/brod_client.erl"},
{line,300}]},
{gen_server,try_dispatch,4,[{file,"gen_server.erl"},{line,616}]},
{gen_server,handle_msg,6,[{file,"gen_server.erl"},{line,686}]},
{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,247}]}]}
Config =
[
{ssl, true},
{sasl, {callback, brod_gssapi, {gssapi, <<"FileKeytab.keytab">>, <<"[email protected]">>}}}
].
Args =
[
{bootstrap_endpoints, [{"server_name", port}]},
{topic, <<"topic_name">>},
{partition, all},
{config, Config},
{begin_offset, latest},
{client_id, test_kafka}
].
Bootstrap_endpoints = proplists:get_value(bootstrap_endpoints, Args),
Topic = proplists:get_value(topic, Args),
Config = proplists:get_value(config, Args),
Partition = proplists:get_value(partition, Args),
Begin_offset = proplists:get_value(begin_offset, Args),
Client_id = proplists:get_value(client_id, Args).
{ok, _} = application:ensure_all_started(brod).
ok = application:load(brod_gssapi).
application:set_env(brod_gssapi,default_handshake_vsn,1).
ok = brod:start_client(Bootstrap_endpoints, Client_id, Config).
Thank you!
Currently there's no useful information for getting up and running with this plugin. In preparation for a release we should rectify this.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.