delta-io / kafka-delta-ingest Goto Github PK
View Code? Open in Web Editor NEWA highly efficient daemon for streaming data from Kafka into Delta Lake
License: Apache License 2.0
A highly efficient daemon for streaming data from Kafka into Delta Lake
License: Apache License 2.0
I tried to reproduce the example with amazon MSK and my S3 but failed.
Debug log:
2023-08-30T04:42:37 [INFO] - tester-mode-on: Ingesting messages from deltalake-test-ingest Kafka topic to s3a://datalake-test/deltalake-test-ingest Delta table
2023-08-30T04:42:37 [INFO] - tester-mode-on: Using options: [allowed_latency=60,max_messages_per_batch=5000,min_bytes_per_file=134217728,write_checkpoints=false]
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: BROKER [thrd:app]: GroupCoordinator: Added new broker with NodeId -1
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: BRKMAIN [thrd:GroupCoordinator]: GroupCoordinator: Enter main broker thread
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: BROKER [thrd:app]: b-1.OUR-KAFKA-HOST:9092/bootstrap: Added new broker with NodeId -1
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: BRKMAIN [thrd::0/internal]: :0/internal: Enter main broker thread
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: BROKER [thrd:app]: b-2.OUR-KAFKA-HOST:9092/bootstrap: Added new broker with NodeId -1
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: BRKMAIN [thrd:b-1.OUR-KAFKA-HOST:9092/b]: b-1.OUR-KAFKA-HOST:9092/bootstrap: Enter main broker thread
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: BROKER [thrd:app]: b-3.OUR-KAFKA-HOST:9092/bootstrap: Added new broker with NodeId -1
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: BRKMAIN [thrd:b-2.OUR-KAFKA-HOST:9092/b]: b-2.OUR-KAFKA-HOST:9092/bootstrap: Enter main broker thread
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: BROKER [thrd:app]: b-4.OUR-KAFKA-HOST:9092/bootstrap: Added new broker with NodeId -1
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: INIT [thrd:app]: librdkafka v2.2.0 (0x20200ff) rdkafka#consumer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_plain,sasl_scram,plugins,sasl_oauthbearer, GCC GXX PKGCONFIG INSTALL GNULD LDS C11THREADS LIBDL PLUGINS ZLIB SSL HDRHISTOGRAM SYSLOG SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER CRC32C_HW, debug 0x282)
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: BRKMAIN [thrd:b-3.OUR-KAFKA-HOST:9092/b]: b-3.OUR-KAFKA-HOST:9092/bootstrap: Enter main broker thread
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: BRKMAIN [thrd:b-4.OUR-KAFKA-HOST:9092/b]: b-4.OUR-KAFKA-HOST:9092/bootstrap: Enter main broker thread
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: CONNECT [thrd:main]: b-3.OUR-KAFKA-HOST:9092/bootstrap: Selected for cluster connection: coordinator query (broker has 0 connection attempt(s))
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: CONNECT [thrd:b-3.OUR-KAFKA-HOST:9092/b]: b-3.OUR-KAFKA-HOST:9092/bootstrap: Received CONNECT op
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: STATE [thrd:b-3.OUR-KAFKA-HOST:9092/b]: b-3.OUR-KAFKA-HOST:9092/bootstrap: Broker changed state INIT -> TRY_CONNECT
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: CONNECT [thrd:b-3.OUR-KAFKA-HOST:9092/b]: b-3.OUR-KAFKA-HOST:9092/bootstrap: broker in state TRY_CONNECT connecting
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: STATE [thrd:b-3.OUR-KAFKA-HOST:9092/b]: b-3.OUR-KAFKA-HOST:9092/bootstrap: Broker changed state TRY_CONNECT -> CONNECT
2023-08-30T04:42:37 [INFO] - tester-mode-on: Using Static credential provider
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: add_pem_file processed 129 valid and 0 invalid certs
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: loading checkpoint from _delta_log/_last_checkpoint
thread 'main' panicked at 'not stream', /usr/local/cargo/registry/src/index.crates.io-6f17d22bba15001f/object_store-0.5.6/src/aws/credential.rs:189:14
stack backtrace:
0: 0x5599fef1bc84 - std::backtrace_rs::backtrace::libunwind::trace::he648b5c8dd376705
at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/../../backtrace/src/backtrace/libunwind.rs:93:5
1: 0x5599fef1bc84 - std::backtrace_rs::backtrace::trace_unsynchronized::h5da3e203eef39e9f
at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/../../backtrace/src/backtrace/mod.rs:66:5
2: 0x5599fef1bc84 - std::sys_common::backtrace::_print_fmt::h8d28d3f20588ae4c
at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/sys_common/backtrace.rs:65:5
3: 0x5599fef1bc84 - <std::sys_common::backtrace::_print::DisplayBacktrace as core::fmt::Display>::fmt::hd9a5b0c9c6b058c0
at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/sys_common/backtrace.rs:44:22
4: 0x5599fe54e1bf - core::fmt::rt::Argument::fmt::h0afc04119f252b53
at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/core/src/fmt/rt.rs:138:9
5: 0x5599fe54e1bf - core::fmt::write::h50b1b3e73851a6fe
at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/core/src/fmt/mod.rs:1094:21
6: 0x5599feee2836 - std::io::Write::write_fmt::h184eaf275e4484f0
at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/io/mod.rs:1714:15
7: 0x5599fef1d0ff - std::sys_common::backtrace::_print::hf58c3a5a25090e71
at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/sys_common/backtrace.rs:47:5
8: 0x5599fef1d0ff - std::sys_common::backtrace::print::hb9cf0a7c7f077819
at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/sys_common/backtrace.rs:34:9
9: 0x5599fef1ccc3 - std::panicking::default_hook::{{closure}}::h066adb2e3f3e2c07
at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/panicking.rs:269:22
10: 0x5599fef1dcc5 - std::panicking::default_hook::h277fa2776900ff14
at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/panicking.rs:288:9
11: 0x5599fef1dcc5 - std::panicking::rust_panic_with_hook::hceaf38da6d9db792
at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/panicking.rs:705:13
12: 0x5599fef1d7c2 - std::panicking::begin_panic_handler::{{closure}}::h2bce3ed2516af7df
at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/panicking.rs:597:13
13: 0x5599fef1d726 - std::sys_common::backtrace::__rust_end_short_backtrace::h090f3faf8f98a395
at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/sys_common/backtrace.rs:151:18
14: 0x5599fef1d711 - rust_begin_unwind
at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/panicking.rs:593:5
15: 0x5599fe01fc12 - core::panicking::panic_fmt::h4ec8274704d163a3
at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/core/src/panicking.rs:67:14
16: 0x5599fe020152 - core::panicking::panic_display::h5ef861b25744765d
at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/core/src/panicking.rs:150:5
17: 0x5599fe020152 - core::panicking::panic_str::hb2e5b3cf68d2f306
at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/core/src/panicking.rs:134:5
18: 0x5599fe020152 - core::option::expect_failed::h3de37afca26e8e59
at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/core/src/option.rs:1952:5
19: 0x5599fe92fd8e - <reqwest::async_impl::request::RequestBuilder as object_store::aws::credential::CredentialExt>::with_aws_sigv4::h0f57dc36ae29eb36
20: 0x5599fe8b194b - object_store::aws::client::S3Client::get_request::{{closure}}::h40d8fd9b78abf607
21: 0x5599fe8b1186 - <object_store::aws::AmazonS3 as object_store::ObjectStore>::get::{{closure}}::ha213cbe47e3cc5b7
22: 0x5599fe636042 - <deltalake::storage::s3::S3StorageBackend as object_store::ObjectStore>::get::{{closure}}::hf8bae65134071c6f
23: 0x5599fe5e7b18 - <object_store::prefix::PrefixStore<T> as object_store::ObjectStore>::get::{{closure}}::hb27bf0a15bae6b07
24: 0x5599fe689243 - <deltalake::storage::DeltaObjectStore as object_store::ObjectStore>::get::{{closure}}::ha27fa50af36203fa
25: 0x5599fe0a0a45 - deltalake::delta::DeltaTable::get_last_checkpoint::{{closure}}::h42daa366a2a74ecf
26: 0x5599fe0a2bbd - deltalake::delta::DeltaTable::update::{{closure}}::h5df80b882c67bbe8
27: 0x5599fe0a2a38 - deltalake::delta::DeltaTable::load::{{closure}}::hd775a8a5aecc216f
28: 0x5599fe087090 - kafka_delta_ingest::delta_helpers::load_table::{{closure}}::hc2b0a1876bdedcea
29: 0x5599fe06d570 - kafka_delta_ingest::start_ingest::{{closure}}::h48d97c594f25df7c
30: 0x5599fe098a3c - tokio::runtime::task::core::Core<T,S>::poll::h97ea35cfb7167f90
31: 0x5599fe815292 - tokio::runtime::task::raw::poll::hdfd28e12a40e9032
32: 0x5599fe81cb84 - tokio::runtime::context::runtime::enter_runtime::h10ab6c4e1562baec
33: 0x5599fe8414e7 - kafka_delta_ingest::main::h43894231b37e2322
34: 0x5599fe8133e3 - std::sys_common::backtrace::__rust_begin_short_backtrace::hc2b14238c4bc1fc1
35: 0x5599fe8409db - main
36: 0x7f8fb86c4d0a - __libc_start_main
37: 0x5599fe068caa - _start
38: 0x0 - <unknown>
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: Async metrics receive loop terminated: receiving on an empty and disconnected channel
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: DESTROY [thrd:app]: Terminating instance (destroy flags none (0x0))
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: DESTROY [thrd:main]: Destroy internal
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: DESTROY [thrd:main]: Removing all topics
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: DESTROY [thrd:main]: Sending TERMINATE to b-4.OUR-KAFKA-HOST:9092/bootstrap
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: DESTROY [thrd:main]: Sending TERMINATE to b-3.OUR-KAFKA-HOST:9092/bootstrap
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: DESTROY [thrd:main]: Sending TERMINATE to b-2.OUR-KAFKA-HOST:9092/bootstrap
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: DESTROY [thrd:main]: Sending TERMINATE to b-1.OUR-KAFKA-HOST:9092/bootstrap
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: TERM [thrd:b-4.OUR-KAFKA-HOST:9092/b]: b-4.OUR-KAFKA-HOST:9092/bootstrap: Received TERMINATE op in state INIT: 2 refcnts, 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: DESTROY [thrd:main]: Sending TERMINATE to GroupCoordinator
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: TERM [thrd:b-2.OUR-KAFKA-HOST:9092/b]: b-2.OUR-KAFKA-HOST:9092/bootstrap: Received TERMINATE op in state INIT: 2 refcnts, 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: FAIL [thrd:b-4.OUR-KAFKA-HOST:9092/b]: b-4.OUR-KAFKA-HOST:9092/bootstrap: Client is terminating (after 9ms in state INIT) (_DESTROY)
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: TERM [thrd:b-1.OUR-KAFKA-HOST:9092/b]: b-1.OUR-KAFKA-HOST:9092/bootstrap: Received TERMINATE op in state INIT: 2 refcnts, 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: FAIL [thrd:b-2.OUR-KAFKA-HOST:9092/b]: b-2.OUR-KAFKA-HOST:9092/bootstrap: Client is terminating (after 9ms in state INIT) (_DESTROY)
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: TERM [thrd:GroupCoordinator]: GroupCoordinator: Received TERMINATE op in state INIT: 2 refcnts, 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: TERM [thrd::0/internal]: :0/internal: Received TERMINATE op in state INIT: 2 refcnts, 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: FAIL [thrd:b-1.OUR-KAFKA-HOST:9092/b]: b-1.OUR-KAFKA-HOST:9092/bootstrap: Client is terminating (after 9ms in state INIT) (_DESTROY)
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: FAIL [thrd:GroupCoordinator]: GroupCoordinator: Client is terminating (after 9ms in state INIT) (_DESTROY)
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: FAIL [thrd::0/internal]: :0/internal: Client is terminating (after 9ms in state INIT) (_DESTROY)
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: STATE [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state INIT -> DOWN
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: STATE [thrd:b-4.OUR-KAFKA-HOST:9092/b]: b-4.OUR-KAFKA-HOST:9092/bootstrap: Broker changed state INIT -> DOWN
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: BRKTERM [thrd:GroupCoordinator]: GroupCoordinator: terminating: broker still has 2 refcnt(s), 0 buffer(s), 0 partition(s)
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: STATE [thrd:b-2.OUR-KAFKA-HOST:9092/b]: b-2.OUR-KAFKA-HOST:9092/bootstrap: Broker changed state INIT -> DOWN
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: TERMINATE [thrd:GroupCoordinator]: GroupCoordinator: Handle is terminating in state DOWN: 1 refcnts (0x559a00e875e0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: BRKTERM [thrd:b-2.OUR-KAFKA-HOST:9092/b]: b-2.OUR-KAFKA-HOST:9092/bootstrap: terminating: broker still has 2 refcnt(s), 0 buffer(s), 0 partition(s)
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: FAIL [thrd:GroupCoordinator]: GroupCoordinator: Broker handle is terminating (after 0ms in state DOWN) (_DESTROY)
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: STATE [thrd:b-1.OUR-KAFKA-HOST:9092/b]: b-1.OUR-KAFKA-HOST:9092/bootstrap: Broker changed state INIT -> DOWN
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: STATE [thrd::0/internal]: :0/internal: Broker changed state INIT -> DOWN
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: BRKTERM [thrd:b-4.OUR-KAFKA-HOST:9092/b]: b-4.OUR-KAFKA-HOST:9092/bootstrap: terminating: broker still has 2 refcnt(s), 0 buffer(s), 0 partition(s)
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: TERMINATE [thrd:b-2.OUR-KAFKA-HOST:9092/b]: b-2.OUR-KAFKA-HOST:9092/bootstrap: Handle is terminating in state DOWN: 1 refcnts (0x559a00e941c0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: BRKTERM [thrd:b-1.OUR-KAFKA-HOST:9092/b]: b-1.OUR-KAFKA-HOST:9092/bootstrap: terminating: broker still has 2 refcnt(s), 0 buffer(s), 0 partition(s)
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: BRKTERM [thrd::0/internal]: :0/internal: terminating: broker still has 2 refcnt(s), 0 buffer(s), 0 partition(s)
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: TERMINATE [thrd:b-1.OUR-KAFKA-HOST:9092/b]: b-1.OUR-KAFKA-HOST:9092/bootstrap: Handle is terminating in state DOWN: 1 refcnts (0x559a00e93350), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: TERMINATE [thrd:b-4.OUR-KAFKA-HOST:9092/b]: b-4.OUR-KAFKA-HOST:9092/bootstrap: Handle is terminating in state DOWN: 1 refcnts (0x559a00e95ff0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: FAIL [thrd:b-1.OUR-KAFKA-HOST:9092/b]: b-1.OUR-KAFKA-HOST:9092/bootstrap: Broker handle is terminating (after 0ms in state DOWN) (_DESTROY)
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: FAIL [thrd:b-2.OUR-KAFKA-HOST:9092/b]: b-2.OUR-KAFKA-HOST:9092/bootstrap: Broker handle is terminating (after 0ms in state DOWN) (_DESTROY)
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: TERMINATE [thrd::0/internal]: :0/internal: Handle is terminating in state DOWN: 1 refcnts (0x559a00e926d0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: FAIL [thrd:b-4.OUR-KAFKA-HOST:9092/b]: b-4.OUR-KAFKA-HOST:9092/bootstrap: Broker handle is terminating (after 0ms in state DOWN) (_DESTROY)
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: FAIL [thrd::0/internal]: :0/internal: Broker handle is terminating (after 0ms in state DOWN) (_DESTROY)
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: CONNECT [thrd:b-3.OUR-KAFKA-HOST:9092/b]: b-3.OUR-KAFKA-HOST:9092/bootstrap: Connecting to ipv4#10.10.60.135:9092 (plaintext) with socket 9
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: TERMINATE [thrd:b-3.OUR-KAFKA-HOST:9092/b]: b-3.OUR-KAFKA-HOST:9092/bootstrap: Handle is terminating in state CONNECT: 2 refcnts (0x559a00e950b0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: TERM [thrd:b-3.OUR-KAFKA-HOST:9092/b]: b-3.OUR-KAFKA-HOST:9092/bootstrap: Received TERMINATE op in state CONNECT: 2 refcnts, 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: FAIL [thrd:b-3.OUR-KAFKA-HOST:9092/b]: b-3.OUR-KAFKA-HOST:9092/bootstrap: Client is terminating (after 51ms in state CONNECT) (_DESTROY)
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: STATE [thrd:b-3.OUR-KAFKA-HOST:9092/b]: b-3.OUR-KAFKA-HOST:9092/bootstrap: Broker changed state CONNECT -> DOWN
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: BRKTERM [thrd:b-3.OUR-KAFKA-HOST:9092/b]: b-3.OUR-KAFKA-HOST:9092/bootstrap: terminating: broker still has 2 refcnt(s), 0 buffer(s), 0 partition(s)
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: TERMINATE [thrd:b-3.OUR-KAFKA-HOST:9092/b]: b-3.OUR-KAFKA-HOST:9092/bootstrap: Handle is terminating in state DOWN: 1 refcnts (0x559a00e950b0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: FAIL [thrd:b-3.OUR-KAFKA-HOST:9092/b]: b-3.OUR-KAFKA-HOST:9092/bootstrap: Broker handle is terminating (after 0ms in state DOWN) (_DESTROY)
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: JoinError::Panic(Id(1), ...)', src/main.rs:173:14
stack backtrace:
0: 0x5599fef1bc84 - std::backtrace_rs::backtrace::libunwind::trace::he648b5c8dd376705
at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/../../backtrace/src/backtrace/libunwind.rs:93:5
1: 0x5599fef1bc84 - std::backtrace_rs::backtrace::trace_unsynchronized::h5da3e203eef39e9f
at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/../../backtrace/src/backtrace/mod.rs:66:5
2: 0x5599fef1bc84 - std::sys_common::backtrace::_print_fmt::h8d28d3f20588ae4c
at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/sys_common/backtrace.rs:65:5
3: 0x5599fef1bc84 - <std::sys_common::backtrace::_print::DisplayBacktrace as core::fmt::Display>::fmt::hd9a5b0c9c6b058c0
at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/sys_common/backtrace.rs:44:22
4: 0x5599fe54e1bf - core::fmt::rt::Argument::fmt::h0afc04119f252b53
at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/core/src/fmt/rt.rs:138:9
5: 0x5599fe54e1bf - core::fmt::write::h50b1b3e73851a6fe
at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/core/src/fmt/mod.rs:1094:21
6: 0x5599feee2836 - std::io::Write::write_fmt::h184eaf275e4484f0
at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/io/mod.rs:1714:15
7: 0x5599fef1d0ff - std::sys_common::backtrace::_print::hf58c3a5a25090e71
at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/sys_common/backtrace.rs:47:5
8: 0x5599fef1d0ff - std::sys_common::backtrace::print::hb9cf0a7c7f077819
at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/sys_common/backtrace.rs:34:9
9: 0x5599fef1ccc3 - std::panicking::default_hook::{{closure}}::h066adb2e3f3e2c07
at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/panicking.rs:269:22
10: 0x5599fef1dcc5 - std::panicking::default_hook::h277fa2776900ff14
at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/panicking.rs:288:9
11: 0x5599fef1dcc5 - std::panicking::rust_panic_with_hook::hceaf38da6d9db792
at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/panicking.rs:705:13
12: 0x5599fef1d7c2 - std::panicking::begin_panic_handler::{{closure}}::h2bce3ed2516af7df
at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/panicking.rs:597:13
13: 0x5599fef1d726 - std::sys_common::backtrace::__rust_end_short_backtrace::h090f3faf8f98a395
at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/sys_common/backtrace.rs:151:18
14: 0x5599fef1d711 - rust_begin_unwind
at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/panicking.rs:593:5
15: 0x5599fe01fc12 - core::panicking::panic_fmt::h4ec8274704d163a3
at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/core/src/panicking.rs:67:14
16: 0x5599fe0200e2 - core::result::unwrap_failed::h170bc2721a6c6ff2
at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/core/src/result.rs:1651:5
17: 0x5599fe81e39c - tokio::runtime::context::runtime::enter_runtime::h10ab6c4e1562baec
18: 0x5599fe8414e7 - kafka_delta_ingest::main::h43894231b37e2322
19: 0x5599fe8133e3 - std::sys_common::backtrace::__rust_begin_short_backtrace::hc2b14238c4bc1fc1
20: 0x5599fe8409db - main
21: 0x7f8fb86c4d0a - __libc_start_main
22: 0x5599fe068caa - _start
23: 0x0 - <unknown>
Hi! I'm back again. Again, apologies if this ends up being my own error/misunderstanding, I'm very new here!
When I try following the "Starting Worker Processes" steps in the Using Azure Event Hubs section of the README, I get this error:
thread 'main' panicked at 'called
Option::unwrap()on a
None value', src\main.rs:91:18
Full input and error:
$ RUST_LOG=debug cargo run ingest \
> --allowed_latency 60 \
> --app_id web_requests \
> --transform 'date: substr(meta.producer.timestamp, `0`, `10`)' \
> --transform 'meta.kafka.offset: kafka.offset' \
> --transform 'meta.kafka.partition: kafka.partition' \
> --transform 'meta.kafka.topic: kafka.topic' \
> --auto_offset_reset earliest \
> web_requests ./tests/data/web_requests
Finished dev [unoptimized + debuginfo] target(s) in 1.04s
Running `target\debug\kafka-delta-ingest.exe ingest --allowed_latency 60 --app_id web_requests --transform 'date: substr(meta.producer.timestamp, `0`, `10`)' --transform 'meta.kafka.offset: kafka.offset' --transform 'meta.kafka.partition: kafka.partition' --transform 'meta.kafka.topic: kafka.topic' --auto_offset_reset earliest web_requests ./tests/data/web_requests`
thread 'main' panicked at 'called `Option::unwrap()` on a `None` value', src\main.rs:91:18
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
error: process didn't exit successfully: `target\debug\kafka-delta-ingest.exe ingest --allowed_latency 60 --app_id web_requests --transform 'date: substr(meta.producer.timestamp, `0`, `10`)' --transform 'meta.kafka.offset: kafka.offset' --transform 'meta.kafka.partition: kafka.partition' --transform 'meta.kafka.topic: kafka.topic' --auto_offset_reset earliest web_requests ./tests/data/web_requests` (exit code: 101)
The error seems to occur with unwrapping the seek_offsets argument ID in main.rs. None of the examples show anyone using the seek_offsets argument. Based on that and my poking around in the code, I thought that one was optional to use, so I'm not sure how it's ending up with a None value.
I have tried using seek_offsets but I'm having trouble getting the format down for how to use it.
I have also tried following the steps in the README for Using Azure Event Hubs in order to connect my existing Kafka stream (in an Event Hub instance) to a delta table (in ADLS) but I'm getting the same exact error.
It seems like I'm the only one encountering these errors -- is there another version of KDI I should be using? Is there something I should've done besides just forking and cloning KDI then just starting to follow the README instructions?
If someone is able to help I would really appreciate it! Thank you.
Hello, I would like to know if it's in the roadmap of this project to work on a kafka-connect sink connector more than a daemon.
kafka -> deltaLakeSinkConnector -> deltaLake
Thank you all
Is there support to read images/videos from kafka and push to delta lake ?
Error compiling the project with rustc 1.73.0
rustc 1.73.0 (cc66ad468 2023-10-03)
cargo 1.73.0 (9c4383fb5 2023-08-26)
error: use of deprecated method `chrono::DateTime::<Tz>::timestamp_nanos`: use `timestamp_nanos_opt()` instead
--> src/coercions.rs:141:52
|
141 | .map(|dt: DateTime<Utc>| Value::Number((dt.timestamp_nanos() / 1000).into()))
| ^^^^^^^^^^^^^^^
|
note: the lint level is defined here
--> src/lib.rs:7:9
|
7 | #![deny(warnings)]
| ^^^^^^^^
= note: `#[deny(deprecated)]` implied by `#[deny(warnings)]`
error: use of deprecated method `chrono::DateTime::<Tz>::timestamp_nanos`: use `timestamp_nanos_opt()` instead
--> src/dead_letters.rs:47:34
|
47 | timestamp: timestamp.timestamp_nanos() / 1000,
| ^^^^^^^^^^^^^^^
error: use of deprecated method `chrono::DateTime::<Tz>::timestamp_nanos`: use `timestamp_nanos_opt()` instead
--> src/dead_letters.rs:59:34
|
59 | timestamp: timestamp.timestamp_nanos() / 1000,
| ^^^^^^^^^^^^^^^
error: use of deprecated method `chrono::DateTime::<Tz>::timestamp_nanos`: use `timestamp_nanos_opt()` instead
--> src/dead_letters.rs:72:34
|
72 | timestamp: timestamp.timestamp_nanos() / 1000,
Would it be possible to add Python bindings to this project?
I think a lot of programmers are comfortable writing Python, but aren't comfortable with Rust. I'm assuming the Python bindings could be added like they were for the delta-rs project. Thoughts?
Apologies if this functionality already exists and I've missed it.
It would be great if the program could record metrics and then make them available to Prometheus (e.g. using the prometheus_exporter crate).
(I appreciate that I can add the functionality myself and send a PR; this task is a placeholder for discussion, in case anyone else works on it in the meantime ;))
Low volume Kafka topics could benefit from a cron-style launch option like Spark's "trigger once" feature discussed in https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html.
I'm thinking we add a command line option to indicate the app should terminate after reading up to latest offsets (I would like to mimic kafkacat's --end
option semantics), lookup latest offsets in Kafka for each partition, and run the streaming writer up to that point. We would need to do some restructuring of the latency timer. We might keep this issue in mind while working on #74.
Hi,
I am getting the following Transform failed
warning:
2023-09-11T10:29:18 [WARN] - web_requests: Transform failed - partition 0, offset 22
2023-09-11T10:29:18 [DEBUG] - web_requests: Could not send to statsd Connection refused (os error 111)
2023-09-11T10:29:18 [DEBUG] - web_requests: Should complete record batch - latency test: 1740 >= 5000
2023-09-11T10:29:18 [DEBUG] - web_requests: Should complete record batch - buffer length test: 0 >= 5000
2023-09-11T10:29:18 [DEBUG] - web_requests: Should complete file - latency test: 1 >= 5
2023-09-11T10:29:18 [DEBUG] - web_requests: Should complete file - num bytes test: 0 >= 134217728
2023-09-11T10:29:18 [DEBUG] - web_requests: Could not send to statsd Connection refused (os error 111)
2023-09-11T10:29:18 [DEBUG] - web_requests: Could not send to statsd Connection refused (os error 111)
Here is my docker run command:
docker run -it --network=host ^
-e RUST_LOG="debug" ^
-e SSL_CERT_FILE=/etc/ssl/certs/ca-certificates.crt ^
-e AZURE_STORAGE_ACCOUNT_NAME=XXX ^
-e "AZURE_STORAGE_ACCOUNT_KEY=XXX" ^
-e RUST_BACKTRACE=full ^
kdi:0.1 ^
ingest web_requests abfss://[email protected]/web_requests ^
--allowed_latency 5 ^
--kafka XXX.servicebus.windows.net:9093 ^
--kafka_setting security.protocol=SASL_SSL ^
--kafka_setting sasl.mechanism=PLAIN ^
--kafka_setting sasl.username=$ConnectionString ^
--kafka_setting sasl.password=Endpoint=sb://XXX.servicebus.windows.net/;SharedAccessKeyName=XXX;SharedAccessKey=XXX ^
--kafka_setting socket.keepalive.enable=true ^
--kafka_setting metadata.max.age.ms=180000 ^
--kafka_setting heartbeat.interval.ms=3000 ^
--kafka_setting session.timeout.ms=30000 ^
--kafka_setting debug=broker,security,protocol ^
--app_id web_requests ^
--transform "date: substr(meta.producer.timestamp, '0', '10')" ^
--transform "meta.kafka.offset: kafka.offset" ^
--transform "meta.kafka.partition: kafka.partition" ^
--transform "meta.kafka.topic: kafka.topic" ^
--auto_offset_reset earliest
Submitted the standard payload (see below) using the Events Hub Generate data
feature within the Azure Portal and this was successfully sent (confirmed by kcat
).
Also successfully uploaded the following first Delta transaction containing the schema (https://github.com/delta-io/kafka-delta-ingest/blob/main/tests/data/web_requests/_delta_log/00000000000000000000.json) to the relevant directory:
{"commitInfo":{"timestamp":1564524295023,"operation":"CREATE TABLE","operationParameters":{"isManaged":"false","description":null,"partitionBy":"[]","properties":"{}"},"isBlindAppend":true}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"22ef18ba-191c-4c36-a606-3dad5cdf3830","format":{"provider":"parquet","options":{}},"schemaString":"{"type":"struct","fields":[{"name":"meta","type":{"type":"struct","fields":[{"name":"producer","type":{"type":"struct","fields":[{"name":"timestamp","type":"string","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"kafka","type":{"type":"struct","fields":[{"name":"offset","type":"long","nullable":true,"metadata":{}},{"name":"topic","type":"string","nullable":true,"metadata":{}},{"name":"partition","type":"integer","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"method","type":"string","nullable":true,"metadata":{}},{"name":"session_id","type":"string","nullable":true,"metadata":{}},{"name":"status","type":"integer","nullable":true,"metadata":{}},{"name":"url","type":"string","nullable":true,"metadata":{}},{"name":"uuid","type":"string","nullable":true,"metadata":{}},{"name":"date","type":"string","nullable":true,"metadata":{}}]}","partitionColumns":["date"],"configuration":{},"createdTime":1564524294376}}
To continue to use Rust version 1.67, I tweaked the Cargo.lock and Cargo,toml files (see below for Cargo.toml), but not expected to be the cause of any errors.
Really appreciate any help you can offer.
Standard Payload:
{
"status": 200,
"session_id": "7c28bcf9-be26-4d0b-931a-3374ab4bb458",
"method": "GET",
"meta": {
"producer": {
"timestamp": "2021-03-24T15:06:17.321710+00:00"
}
},
"uuid": "831c6afa-375c-4988-b248-096f9ed101f8",
"url": "http://www.example.com"
}
Cargo.toml:
[package]
name = "kafka-delta-ingest"
version = "0.2.0"
authors = ["R. Tyler Croy [email protected]", "Christian Williams [email protected]"]
edition = "2018"
[dependencies]
anstream = "=0.3.2"
anyhow = "=1.0.26"
async-trait = "=0.1.53"
apache-avro = "=0.14.0"
base64 = "=0.13.0"
bytes = "=1.1.0"
chrono = "=0.4.24"
clap = { version = "=4.3.0", features = ["color"] }
clap_builder = "=4.3.0"
clap_lex = "=0.5.0"
dipstick = "=0.9.0"
dotenv = "=0.15"
env_logger = "=0.10.0"
futures = "=0.3.15"
half = "=2.2.1"
jmespatch = { version = "=0.3.0", features = ["sync"] }
lazy_static = "=1.4.0"
log = "=0.4.17"
maplit = "=1.0.2"
rdkafka = { version = "=0.28.0", features = ["ssl-vendored"] }
rusoto_core = { version = "=0.46.0" }
rusoto_credential = { version = "=0.46.0" }
rusoto_s3 = { version = "=0.46.0" }
schema_registry_converter = { version = "=3.1.0", features = ["easy", "json", "avro"] }
serde = { version = "=1.0.140", features = ["derive"] }
serde_json = "=1.0.82"
strum = "=0.20.0"
strum_macros = "=0.20.0"
thiserror = "=1.0.31"
tokio = { version = "=1.25.0", features = ["full"] }
tokio-stream = { version = "=0.1.8", features = ["fs"] }
tokio-util = "=0.6.3"
uuid = { version = "=0.8.2", features = ["serde", "v4"] }
deltalake = { version = "=0.11.0", features = ["s3", "azure"] }
dynamodb_lock = "=0.4.3"
sentry = { version = "=0.23.0", optional = true }
url = "2.3"
[features]
sentry-ext = ["sentry"]
dynamic-linking = [ "rdkafka/dynamic-linking" ]
[dev-dependencies]
deltalake = { version = "=0.11.0", features = ["s3", "azure", "json"] }
utime = "0.3"
serial_test = "=0.6.0"
tempfile = "=3.2.0"
azure_core = "=0.10.0"
azure_storage = "=0.10.0"
azure_storage_blobs = "=0.10.0"
time = "0.3"
[profile.release]
lto = true
README.adoc has a bit of outdated information and could use better description all around for running tests and the web_requests example locally.
A follow up issue ticket for #23 (review) discussion. There's different behaviours when new consumer added before/after other consumers processing the messages. Both cases need to be tested. However there's no straightforward way yet to capture that. Please refer to the linked discussion for possible solutions
Kafka delta ingest currently requires a message to trigger the latency timer check for flushing buffers. It would be better if we ran the latency timer on a separate thread to trigger flushes - especially for low volume topics that receive periodic writes. For these low volume topics that are triggered periodically - we suck everything in from Kafka and it just sits in buffer until we get another message.
Update 2021-10-22:
An alternative/probably better solution than running the latency timer on a separate thread is to switch from the rdkafka async StreamConsumer to the non-async BasicConsumer which requires a poll and then a check on Option. We have to process messages in order end-to-end so we canβt take much advantage of async anyway. Managing the consumer.poll ourselves would give us an opportunity to check latency on an interval within the run loop whether a message was received or not.
Here's a test case with redpanda and vector.
git clone https://github.com/bbigras/test-kdi-gzip
docker-compose up -d
# start kdi
# add a couple of lines to the `log` file
# wait for one "Delta write for version x has completed in x millis"
# stop kdi
# uncomment "compression" in vector.toml
docker-compose up -d --force-recreate
# start kdi again
run kdi with:
target/release/kafka-delta-ingest ingest my-topic ~/checkout_folder/delta \
--checkpoints \
-l 5 \
--max_messages_per_batch 2 \
--kafka 127.0.0.1:9092 \
-K "auto.offset.reset=earliest" \
-t \
'date: substr(timestamp, `0`, `10`)' \
'message: message' \
'timestamp: timestamp' \
'meta.kafka.offset: kafka.offset' \
'meta.kafka.partition: kafka.partition' \
'meta.kafka.topic: kafka.topic'
and you'll get:
[2021-10-22T20:06:43Z ERROR kafka_delta_ingest] Error getting BorrowedMessage while processing stream KafkaError (Message consumption error: NotImplemented (Local: Not implemented))
Hello guys, I'm trying to set up kafka-delta-ingest
with AWS S3, but get an error
Ingest service exited with error DeltaTable { source: NotATable("No snapshot or version 0 found, perhaps s3://mybucket/rust-deltalake/raw.test.mytopic is an empty dir?") }
I'm running the command in a local Docker container using a local Kafka broker.
command: ["/usr/local/bin/kafka-delta-ingest", "ingest",
"raw.test.mytopic",
"s3://mybucket/rust-deltalake/raw.test.mytopic",
"--allowed_latency", "300",
"--kafka", "host.docker.internal:9092",
"--consumer_group_id", "my_consumer_group"]
Did I miss a step or a configuration option? Should I create the empty deltalake myself?
Thanks,
T
I have looked in the repo but could not find the configuration to include the key while ingesting. Can someone help with this please?
Sample message in my topic: student_id,{"first_name":"john","last_name":"doe","courses":["math","english"]}
I want to write all these fields (including the key student_id) to the Delta table.
arrow_schema_ref is stale once the arrow writer is created. However it's expected that delta table's schema could change, but it's not going to be tracked.
AC: Update arrow_schema_ref from table.get_metadata once in a while, after writers or new version for example.
Dear maintainer, the DESIGN.md file is carrying outdated link in the list of libraries noted under Key Dependencies section
The link for arrow Rust implementation points to outdated address
https://github.com/apache/arrow/tree/master/rust/arrow
please update Readme to point to new location of the same
https://github.com/apache/arrow-rs.git
Including a slack link!
Debug is inreadable now, for example
DEBUG kafka_delta_ingest::instrumentation] StatsHandler received stat MessageTransformed with value 1
DEBUG kafka_delta_ingest::instrumentation] StatsHandler awaiting channel.
and more
I cannot find any reference to the default credential chain in the code, can anyone help me?
Our stats logging code are written as self.log_message_buffered(msg, state.value_buffers.len()).await
, which blocks the current task from execution until the stats send async function has completed. It's better to handle these kind of instrumentation io in a separate tokio task in the background.
@MrPowers wants the simplest and easiest installation and getting started instructions possible.
This should go into the top of the README and be as easy as possible, Kafka setup not-with-standing.
related to delta-io/delta-rs#348. blocked by delta-io/delta-rs#354.
@xianwill @mosyp it looks like we are modeling partition values as HashMap<String, String>
in kdi, which means null partition values will not be written out as null
value in the add action json and parquet checkpoint, is this correct?
In playing around with KDI I realize it doesn't have logic to "Create Delta Table if not exist". If you point the container to a Filesystem that has no Delta table, it complains:
Whereas if you point it to an existing Delta Table, works fine:
I think this should be trivial to add in, I created a "KDI Java" just for the heck it of it after chatting with @thovoll and it's a matter of checking if the Trx log version is !(log.snapshot().getVersion() > -1)
:
https://github.com/mdrakiburrahman/kafka-delta-ingest-adls/blob/2802eead5174e5fc00da047470572d5fd4c76981/1.KDI-Java/src/main/java/com/microsoft/kdi/KDI.java#L261
My use case is I'm pointing KDI at a large number of Change Data Capture Topics from Debezium - so it can start pumping out Delta Tables at scale. I can't assume the Delta Tables already exist - so this would be great to have!
I have a column with type DECIMAL(38,0
, but when I try to start ingestion I get the following error:
Ingest service exited with error DeltaWriteFailed { ending_offsets: "{\"0\":4999}", partition_counts: "{\"0\":5000}", source: Arrow { source: JsonError("Decimal(38, 0) type is not supported") } }
I believe this is a compatibility issue with the www.github.com/delta-io/delta-rs, a help would be appreciated. I am ready to provide more information.
Thanks in advance !
#23 replaces the write ahead log with a txn per partition written to the delta log π. After this is merged, we must follow up with another change to add a retry loop in case two workers try to commit the same delta log version and one fails. The failing worker will need to do conflict resolution in case of overlapping partition assignment between the two commits.
Great project! I ran into an issue building the project and ended up adding builder syntax to the project. Are the authors open to something like this instead of using clap_app
?
let matches = App::new("kafka-delta-ingest-program")
.version("CARGO_PKG_VERSION")
.about("Service for ingesting messages from a Kafka topic and writing them to a Delta table")
.subcommand(App::new("ingest")
.about("Starts a stream that consumes from a Kafka topic and and writes to a Delta table")
.arg(Arg::new("TOPIC").required(true).help("The Kafka topic to stream from"))
.arg(Arg::new("TABLE_LOCATION").required(true).help("The Delta table location to write to"))
.arg(Arg::new("KAFKA_BROKERS").short('k').long("kafka").takes_value(true).help("The Kafka broker connection string to use when connecting to Kafka"))
.arg(Arg::new("CONSUMER_GROUP").short('g').long("consumer_group_id").takes_value(true).help("The consumer group id to use when subscribing to Kafka."))
Hi there,
I see you are using rdkafka
for your Kafka interaction. We (= InfluxDB IOx) implemented our own pure-Rust async client called rskafka
which you might find useful. There is also an intro blog post: https://www.influxdata.com/blog/building-simple-pure-rust-async-apache-kafka-client/
I think you might be missing a feature or two, but nothing fundamental (to my knowledge).
Let me know if you have questions, or want to chat.
Cheers,
Marco
PS: Also feel free to close this ticket.
This is more like the investigating request. To figure out wether we'll benefit of writing parquet into file system and then send to s3 instead of using memory buffers
what are the files that needs to be changed to connect with aws.
I am facing issues to ingest data to aws s3.
Can anyone please help?
There can be cases where we wouldn't want the extra overhead of dynamo-db, with the understanding of increasing risk of data-duplication.
This would mean relying entirely on the Kafka consumer offsets for restarts and recoveries.
Whenever rebalance event is happening, each workers is revoked and then assigned the list of partitions each. As a result, consumer might get new partitions or remove existing. In sake of simplicity and safety in both cases the consumer state is reset and it start consuming messages from previous commit/offset.
However, in theory if the partitions is only added to the consumers and all of existing are kept with a consumer it makes sense to not reset the state for performance reasons.
See https://github.com/delta-io/kafka-delta-ingest/pull/27/files/8e598e35406f5d2978306134ddde9d9d54b4dccc#diff-b1a35a68f14e696205874893c07fd24fdb88882b47c23cc0e0c80a30c7d53759R420 for inspiration
We don't want to waste resources with dead_letters table and s3/dynamodb clients. Thus, it makes more sense to create/close those resources on demand
Apologies if this ends up being my own error/misunderstanding, I'm very new here!
I'm working on getting kafka-delta-ingest up and running locally, but when I try following the "Starting Worker Processes" steps in the Using Azure Event Hubs section of the README, I get "unexpected argument" errors.
When I try running it as it appears in the README:
$ RUST_LOG=debug cargo run ingest web_requests ./tests/data/web_requests \
> --allowed_latency 60 \
> --app_id web_requests \
> --transform 'date: substr(meta.producer.timestamp, `0`, `10`)' \
> 'meta.kafka.offset: kafka.offset' \
> 'meta.kafka.partition: kafka.partition' \
> 'meta.kafka.topic: kafka.topic' \
> --auto_offset_reset earliest
Finished dev [unoptimized + debuginfo] target(s) in 1.18s
Running `target\debug\kafka-delta-ingest.exe ingest web_requests ./tests/data/web_requests --allowed_latency 60 --app_id web_requests --transform 'date: substr(meta.producer.timestamp, `0`, `10`)' 'meta.kafka.offset: kafka.offset' 'meta.kafka.partition: kafka.partition' 'meta.kafka.topic: kafka.topic' --auto_offset_reset earliest`
error: unexpected argument '--allowed_latency' found
tip: to pass '--allowed_latency' as a value, use '-- --allowed_latency'
Usage: kafka-delta-ingest.exe ingest <topic> <table_location>
For more information, try '--help'.
error: process didn't exit successfully: `target\debug\kafka-delta-ingest.exe ingest web_requests ./tests/data/web_requests --allowed_latency 60 --app_id web_requests --transform 'date: substr(meta.producer.timestamp, `0`, `10`)' 'meta.kafka.offset: kafka.offset' 'meta.kafka.partition: kafka.partition' 'meta.kafka.topic: kafka.topic' --auto_offset_reset earliest` (exit code: 2)
When I try running it with a space between the dashes and the argument IDs, I get the same error but with this line changed slightly:
error: unexpected argument 'allowed_latency' found
When I change the order of the argument IDs, it gives me the error for whichever one is first.
When I use the short argument IDs (like "-k" or "-a"), it stops giving me unexpected argument errors, but gives me a different error instead:
$ RUST_LOG=debug cargo run ingest web_requests ./tests/data/web_requests \
> -l 60 \
> -a web_requests \
> -K "auto.offset.reset=earliest" \
> -t 'date: substr(meta.producer.timestamp, `0`, `10`)'
Finished dev [unoptimized + debuginfo] target(s) in 1.23s
Running `target/debug/kafka-delta-ingest ingest web_requests ./tests/data/web_requests -l 60 -a web_requests -K auto.offset.reset=earliest -t 'date: substr(meta.producer.timestamp, `0`, `10`)'`
thread 'main' panicked at 'Mismatch between definition and access of `APP_ID`. Unknown argument or group id. Make sure you are using the argument id and not the short or long flags
', src/main.rs:65:18
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
I have also tried following the steps in the README for Using Azure Event Hubs in order to connect my existing Kafka stream (in an Event Hub instance) to a delta table (in ADLS) but I'm getting the same exact errors.
Is anyone else experiencing this issue with the argument IDs? Is the documentation outdated and I should be trying something else? Or did I do something wrong in setup (forking and cloning the repo in VS Code then just following the steps in the readme)?
Any help would be hugely appreciated. Thank you!
Whenever there's reset state in consumer state, it reads latest offsets from delta, adds 1 and seeks to them. However this will not work if the resulting offset is missing (offsets are no contiguous in kafka) or not yet occupied (no new messages since). In latter case the consumer will read messages from the beginning
Add a CONTRIBUTING.md file, so other devs know how to contribute to this project.
2021-08-19T16:38:45 kafka_delta_ingest [INFO] - WORKER-1: pre-process message 1:10 -- id:46
2021-08-19T16:38:46 kafka_delta_ingest::instrumentation [INFO] - WORKER-1: Delta write for version 7 has completed in 414 millis
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-1: REBALANCE - Partition assignments revoked
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: REBALANCE - Received new partition assignment list [0, 1]
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-1: REBALANCE - Received new partition assignment list [2, 3]
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 1:0 -- id:4
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: Resetting state with partitions: [1, 0]
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: Seeking consumer to 1:0
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: Partition 0 has no recorded offset. Not seeking consumer.
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 1:0 -- id:4
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:0 -- id:3
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:0 -- id:3
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:1 -- id:7
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:1 -- id:7
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:2 -- id:10
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:2 -- id:10
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:3 -- id:11
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:3 -- id:11
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:4 -- id:14
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:4 -- id:14
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:5 -- id:15
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:5 -- id:15
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:6 -- id:24
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:6 -- id:24
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:7 -- id:25
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:7 -- id:25
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:8 -- id:28
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:8 -- id:28
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:9 -- id:29
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:9 -- id:29
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-1: pre-process message 2:0 -- id:0
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-1: Resetting state with partitions: [3, 2]
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-1: Seeking consumer to 3:31
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-1: Seeking consumer to 2:26
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-1: pre-process message 2:0 -- id:0
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: Conflict offset for partition 1: state=Some(0), delta=10
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: Resetting state with partitions: [1, 0]
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: Seeking consumer to 1:10
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: Partition 0 has no recorded offset. Not seeking consumer.
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:10 -- id:36
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:10 -- id:36
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:11 -- id:37
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:11 -- id:37
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:12 -- id:38
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:12 -- id:38
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:13 -- id:39
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:13 -- id:39
Worker 2 is assigned to partition 0,1. But worker 1 is still working meanwhile. So when worker 2 tries to write new version, it faces the conflict on partition 1. On conflict the worker 2, gets latest offsets from delta and seeks. But there's no stored offset for partition 0, so it does not seek it, however worker 2 already consumed some messages from partition 0, which result in their absence.
The body of buffer_len
is
self.cursor.data().len()
The .data()
call is copying all bytes from cursor and returns new vector. Since we just need a len()
this is inefficient, moreover it's being called on each new message. I couldn't find a possible easy workaround for it, also we don't have an access to internal buffer in in-memory cursor. However it seems that introducing a new function in parquet::file::writer::InMemoryWriteableCursor
should resolve that, e.g
pub fn len(&self) -> usize {
let inner = self.buffer.lock().unwrap();
inner.get_ref().len()
}
A declarative, efficient, and flexible JavaScript library for building user interfaces.
π Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. πππ
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google β€οΈ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.