Code Monkey home page Code Monkey logo

kafka-delta-ingest's People

Contributors

bbigras avatar clairewood avatar florissmit10 avatar gterzian avatar mightyshazam avatar mosyp avatar rtyler avatar thovoll avatar txdv avatar xianwill avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-delta-ingest's Issues

thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value

I tried to reproduce the example with amazon MSK and my S3 but failed.
Debug log:

2023-08-30T04:42:37 [INFO] - tester-mode-on: Ingesting messages from deltalake-test-ingest Kafka topic to s3a://datalake-test/deltalake-test-ingest Delta table
2023-08-30T04:42:37 [INFO] - tester-mode-on: Using options: [allowed_latency=60,max_messages_per_batch=5000,min_bytes_per_file=134217728,write_checkpoints=false]
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: BROKER [thrd:app]: GroupCoordinator: Added new broker with NodeId -1
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: BRKMAIN [thrd:GroupCoordinator]: GroupCoordinator: Enter main broker thread
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: BROKER [thrd:app]: b-1.OUR-KAFKA-HOST:9092/bootstrap: Added new broker with NodeId -1
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: BRKMAIN [thrd::0/internal]: :0/internal: Enter main broker thread
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: BROKER [thrd:app]: b-2.OUR-KAFKA-HOST:9092/bootstrap: Added new broker with NodeId -1
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: BRKMAIN [thrd:b-1.OUR-KAFKA-HOST:9092/b]: b-1.OUR-KAFKA-HOST:9092/bootstrap: Enter main broker thread
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: BROKER [thrd:app]: b-3.OUR-KAFKA-HOST:9092/bootstrap: Added new broker with NodeId -1
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: BRKMAIN [thrd:b-2.OUR-KAFKA-HOST:9092/b]: b-2.OUR-KAFKA-HOST:9092/bootstrap: Enter main broker thread
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: BROKER [thrd:app]: b-4.OUR-KAFKA-HOST:9092/bootstrap: Added new broker with NodeId -1
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: INIT [thrd:app]: librdkafka v2.2.0 (0x20200ff) rdkafka#consumer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_plain,sasl_scram,plugins,sasl_oauthbearer, GCC GXX PKGCONFIG INSTALL GNULD LDS C11THREADS LIBDL PLUGINS ZLIB SSL HDRHISTOGRAM SYSLOG SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER CRC32C_HW, debug 0x282)
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: BRKMAIN [thrd:b-3.OUR-KAFKA-HOST:9092/b]: b-3.OUR-KAFKA-HOST:9092/bootstrap: Enter main broker thread
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: BRKMAIN [thrd:b-4.OUR-KAFKA-HOST:9092/b]: b-4.OUR-KAFKA-HOST:9092/bootstrap: Enter main broker thread
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: CONNECT [thrd:main]: b-3.OUR-KAFKA-HOST:9092/bootstrap: Selected for cluster connection: coordinator query (broker has 0 connection attempt(s))
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: CONNECT [thrd:b-3.OUR-KAFKA-HOST:9092/b]: b-3.OUR-KAFKA-HOST:9092/bootstrap: Received CONNECT op
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: STATE [thrd:b-3.OUR-KAFKA-HOST:9092/b]: b-3.OUR-KAFKA-HOST:9092/bootstrap: Broker changed state INIT -> TRY_CONNECT
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: CONNECT [thrd:b-3.OUR-KAFKA-HOST:9092/b]: b-3.OUR-KAFKA-HOST:9092/bootstrap: broker in state TRY_CONNECT connecting
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: STATE [thrd:b-3.OUR-KAFKA-HOST:9092/b]: b-3.OUR-KAFKA-HOST:9092/bootstrap: Broker changed state TRY_CONNECT -> CONNECT
2023-08-30T04:42:37 [INFO] - tester-mode-on: Using Static credential provider
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: add_pem_file processed 129 valid and 0 invalid certs
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: loading checkpoint from _delta_log/_last_checkpoint
thread 'main' panicked at 'not stream', /usr/local/cargo/registry/src/index.crates.io-6f17d22bba15001f/object_store-0.5.6/src/aws/credential.rs:189:14
stack backtrace:
   0:     0x5599fef1bc84 - std::backtrace_rs::backtrace::libunwind::trace::he648b5c8dd376705
                               at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/../../backtrace/src/backtrace/libunwind.rs:93:5
   1:     0x5599fef1bc84 - std::backtrace_rs::backtrace::trace_unsynchronized::h5da3e203eef39e9f
                               at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/../../backtrace/src/backtrace/mod.rs:66:5
   2:     0x5599fef1bc84 - std::sys_common::backtrace::_print_fmt::h8d28d3f20588ae4c
                               at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/sys_common/backtrace.rs:65:5
   3:     0x5599fef1bc84 - <std::sys_common::backtrace::_print::DisplayBacktrace as core::fmt::Display>::fmt::hd9a5b0c9c6b058c0
                               at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/sys_common/backtrace.rs:44:22
   4:     0x5599fe54e1bf - core::fmt::rt::Argument::fmt::h0afc04119f252b53
                               at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/core/src/fmt/rt.rs:138:9
   5:     0x5599fe54e1bf - core::fmt::write::h50b1b3e73851a6fe
                               at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/core/src/fmt/mod.rs:1094:21
   6:     0x5599feee2836 - std::io::Write::write_fmt::h184eaf275e4484f0
                               at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/io/mod.rs:1714:15
   7:     0x5599fef1d0ff - std::sys_common::backtrace::_print::hf58c3a5a25090e71
                               at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/sys_common/backtrace.rs:47:5
   8:     0x5599fef1d0ff - std::sys_common::backtrace::print::hb9cf0a7c7f077819
                               at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/sys_common/backtrace.rs:34:9
   9:     0x5599fef1ccc3 - std::panicking::default_hook::{{closure}}::h066adb2e3f3e2c07
                               at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/panicking.rs:269:22
  10:     0x5599fef1dcc5 - std::panicking::default_hook::h277fa2776900ff14
                               at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/panicking.rs:288:9
  11:     0x5599fef1dcc5 - std::panicking::rust_panic_with_hook::hceaf38da6d9db792
                               at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/panicking.rs:705:13
  12:     0x5599fef1d7c2 - std::panicking::begin_panic_handler::{{closure}}::h2bce3ed2516af7df
                               at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/panicking.rs:597:13
  13:     0x5599fef1d726 - std::sys_common::backtrace::__rust_end_short_backtrace::h090f3faf8f98a395
                               at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/sys_common/backtrace.rs:151:18
  14:     0x5599fef1d711 - rust_begin_unwind
                               at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/panicking.rs:593:5
  15:     0x5599fe01fc12 - core::panicking::panic_fmt::h4ec8274704d163a3
                               at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/core/src/panicking.rs:67:14
  16:     0x5599fe020152 - core::panicking::panic_display::h5ef861b25744765d
                               at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/core/src/panicking.rs:150:5
  17:     0x5599fe020152 - core::panicking::panic_str::hb2e5b3cf68d2f306
                               at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/core/src/panicking.rs:134:5
  18:     0x5599fe020152 - core::option::expect_failed::h3de37afca26e8e59
                               at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/core/src/option.rs:1952:5
  19:     0x5599fe92fd8e - <reqwest::async_impl::request::RequestBuilder as object_store::aws::credential::CredentialExt>::with_aws_sigv4::h0f57dc36ae29eb36
  20:     0x5599fe8b194b - object_store::aws::client::S3Client::get_request::{{closure}}::h40d8fd9b78abf607
  21:     0x5599fe8b1186 - <object_store::aws::AmazonS3 as object_store::ObjectStore>::get::{{closure}}::ha213cbe47e3cc5b7
  22:     0x5599fe636042 - <deltalake::storage::s3::S3StorageBackend as object_store::ObjectStore>::get::{{closure}}::hf8bae65134071c6f
  23:     0x5599fe5e7b18 - <object_store::prefix::PrefixStore<T> as object_store::ObjectStore>::get::{{closure}}::hb27bf0a15bae6b07
  24:     0x5599fe689243 - <deltalake::storage::DeltaObjectStore as object_store::ObjectStore>::get::{{closure}}::ha27fa50af36203fa
  25:     0x5599fe0a0a45 - deltalake::delta::DeltaTable::get_last_checkpoint::{{closure}}::h42daa366a2a74ecf
  26:     0x5599fe0a2bbd - deltalake::delta::DeltaTable::update::{{closure}}::h5df80b882c67bbe8
  27:     0x5599fe0a2a38 - deltalake::delta::DeltaTable::load::{{closure}}::hd775a8a5aecc216f
  28:     0x5599fe087090 - kafka_delta_ingest::delta_helpers::load_table::{{closure}}::hc2b0a1876bdedcea
  29:     0x5599fe06d570 - kafka_delta_ingest::start_ingest::{{closure}}::h48d97c594f25df7c
  30:     0x5599fe098a3c - tokio::runtime::task::core::Core<T,S>::poll::h97ea35cfb7167f90
  31:     0x5599fe815292 - tokio::runtime::task::raw::poll::hdfd28e12a40e9032
  32:     0x5599fe81cb84 - tokio::runtime::context::runtime::enter_runtime::h10ab6c4e1562baec
  33:     0x5599fe8414e7 - kafka_delta_ingest::main::h43894231b37e2322
  34:     0x5599fe8133e3 - std::sys_common::backtrace::__rust_begin_short_backtrace::hc2b14238c4bc1fc1
  35:     0x5599fe8409db - main
  36:     0x7f8fb86c4d0a - __libc_start_main
  37:     0x5599fe068caa - _start
  38:                0x0 - <unknown>
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: Async metrics receive loop terminated: receiving on an empty and disconnected channel
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: DESTROY [thrd:app]: Terminating instance (destroy flags none (0x0))
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: DESTROY [thrd:main]: Destroy internal
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: DESTROY [thrd:main]: Removing all topics
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: DESTROY [thrd:main]: Sending TERMINATE to b-4.OUR-KAFKA-HOST:9092/bootstrap
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: DESTROY [thrd:main]: Sending TERMINATE to b-3.OUR-KAFKA-HOST:9092/bootstrap
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: DESTROY [thrd:main]: Sending TERMINATE to b-2.OUR-KAFKA-HOST:9092/bootstrap
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: DESTROY [thrd:main]: Sending TERMINATE to b-1.OUR-KAFKA-HOST:9092/bootstrap
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: TERM [thrd:b-4.OUR-KAFKA-HOST:9092/b]: b-4.OUR-KAFKA-HOST:9092/bootstrap: Received TERMINATE op in state INIT: 2 refcnts, 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: DESTROY [thrd:main]: Sending TERMINATE to GroupCoordinator
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: TERM [thrd:b-2.OUR-KAFKA-HOST:9092/b]: b-2.OUR-KAFKA-HOST:9092/bootstrap: Received TERMINATE op in state INIT: 2 refcnts, 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: FAIL [thrd:b-4.OUR-KAFKA-HOST:9092/b]: b-4.OUR-KAFKA-HOST:9092/bootstrap: Client is terminating (after 9ms in state INIT) (_DESTROY)
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: TERM [thrd:b-1.OUR-KAFKA-HOST:9092/b]: b-1.OUR-KAFKA-HOST:9092/bootstrap: Received TERMINATE op in state INIT: 2 refcnts, 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: FAIL [thrd:b-2.OUR-KAFKA-HOST:9092/b]: b-2.OUR-KAFKA-HOST:9092/bootstrap: Client is terminating (after 9ms in state INIT) (_DESTROY)
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: TERM [thrd:GroupCoordinator]: GroupCoordinator: Received TERMINATE op in state INIT: 2 refcnts, 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: TERM [thrd::0/internal]: :0/internal: Received TERMINATE op in state INIT: 2 refcnts, 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: FAIL [thrd:b-1.OUR-KAFKA-HOST:9092/b]: b-1.OUR-KAFKA-HOST:9092/bootstrap: Client is terminating (after 9ms in state INIT) (_DESTROY)
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: FAIL [thrd:GroupCoordinator]: GroupCoordinator: Client is terminating (after 9ms in state INIT) (_DESTROY)
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: FAIL [thrd::0/internal]: :0/internal: Client is terminating (after 9ms in state INIT) (_DESTROY)
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: STATE [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state INIT -> DOWN
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: STATE [thrd:b-4.OUR-KAFKA-HOST:9092/b]: b-4.OUR-KAFKA-HOST:9092/bootstrap: Broker changed state INIT -> DOWN
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: BRKTERM [thrd:GroupCoordinator]: GroupCoordinator: terminating: broker still has 2 refcnt(s), 0 buffer(s), 0 partition(s)
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: STATE [thrd:b-2.OUR-KAFKA-HOST:9092/b]: b-2.OUR-KAFKA-HOST:9092/bootstrap: Broker changed state INIT -> DOWN
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: TERMINATE [thrd:GroupCoordinator]: GroupCoordinator: Handle is terminating in state DOWN: 1 refcnts (0x559a00e875e0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: BRKTERM [thrd:b-2.OUR-KAFKA-HOST:9092/b]: b-2.OUR-KAFKA-HOST:9092/bootstrap: terminating: broker still has 2 refcnt(s), 0 buffer(s), 0 partition(s)
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: FAIL [thrd:GroupCoordinator]: GroupCoordinator: Broker handle is terminating (after 0ms in state DOWN) (_DESTROY)
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: STATE [thrd:b-1.OUR-KAFKA-HOST:9092/b]: b-1.OUR-KAFKA-HOST:9092/bootstrap: Broker changed state INIT -> DOWN
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: STATE [thrd::0/internal]: :0/internal: Broker changed state INIT -> DOWN
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: BRKTERM [thrd:b-4.OUR-KAFKA-HOST:9092/b]: b-4.OUR-KAFKA-HOST:9092/bootstrap: terminating: broker still has 2 refcnt(s), 0 buffer(s), 0 partition(s)
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: TERMINATE [thrd:b-2.OUR-KAFKA-HOST:9092/b]: b-2.OUR-KAFKA-HOST:9092/bootstrap: Handle is terminating in state DOWN: 1 refcnts (0x559a00e941c0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: BRKTERM [thrd:b-1.OUR-KAFKA-HOST:9092/b]: b-1.OUR-KAFKA-HOST:9092/bootstrap: terminating: broker still has 2 refcnt(s), 0 buffer(s), 0 partition(s)
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: BRKTERM [thrd::0/internal]: :0/internal: terminating: broker still has 2 refcnt(s), 0 buffer(s), 0 partition(s)
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: TERMINATE [thrd:b-1.OUR-KAFKA-HOST:9092/b]: b-1.OUR-KAFKA-HOST:9092/bootstrap: Handle is terminating in state DOWN: 1 refcnts (0x559a00e93350), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: TERMINATE [thrd:b-4.OUR-KAFKA-HOST:9092/b]: b-4.OUR-KAFKA-HOST:9092/bootstrap: Handle is terminating in state DOWN: 1 refcnts (0x559a00e95ff0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: FAIL [thrd:b-1.OUR-KAFKA-HOST:9092/b]: b-1.OUR-KAFKA-HOST:9092/bootstrap: Broker handle is terminating (after 0ms in state DOWN) (_DESTROY)
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: FAIL [thrd:b-2.OUR-KAFKA-HOST:9092/b]: b-2.OUR-KAFKA-HOST:9092/bootstrap: Broker handle is terminating (after 0ms in state DOWN) (_DESTROY)
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: TERMINATE [thrd::0/internal]: :0/internal: Handle is terminating in state DOWN: 1 refcnts (0x559a00e926d0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: FAIL [thrd:b-4.OUR-KAFKA-HOST:9092/b]: b-4.OUR-KAFKA-HOST:9092/bootstrap: Broker handle is terminating (after 0ms in state DOWN) (_DESTROY)
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: FAIL [thrd::0/internal]: :0/internal: Broker handle is terminating (after 0ms in state DOWN) (_DESTROY)
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: CONNECT [thrd:b-3.OUR-KAFKA-HOST:9092/b]: b-3.OUR-KAFKA-HOST:9092/bootstrap: Connecting to ipv4#10.10.60.135:9092 (plaintext) with socket 9
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: TERMINATE [thrd:b-3.OUR-KAFKA-HOST:9092/b]: b-3.OUR-KAFKA-HOST:9092/bootstrap: Handle is terminating in state CONNECT: 2 refcnts (0x559a00e950b0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: TERM [thrd:b-3.OUR-KAFKA-HOST:9092/b]: b-3.OUR-KAFKA-HOST:9092/bootstrap: Received TERMINATE op in state CONNECT: 2 refcnts, 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: FAIL [thrd:b-3.OUR-KAFKA-HOST:9092/b]: b-3.OUR-KAFKA-HOST:9092/bootstrap: Client is terminating (after 51ms in state CONNECT) (_DESTROY)
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: STATE [thrd:b-3.OUR-KAFKA-HOST:9092/b]: b-3.OUR-KAFKA-HOST:9092/bootstrap: Broker changed state CONNECT -> DOWN
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: BRKTERM [thrd:b-3.OUR-KAFKA-HOST:9092/b]: b-3.OUR-KAFKA-HOST:9092/bootstrap: terminating: broker still has 2 refcnt(s), 0 buffer(s), 0 partition(s)
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: TERMINATE [thrd:b-3.OUR-KAFKA-HOST:9092/b]: b-3.OUR-KAFKA-HOST:9092/bootstrap: Handle is terminating in state DOWN: 1 refcnts (0x559a00e950b0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
2023-08-30T04:42:37 [DEBUG] - tester-mode-on: librdkafka: FAIL [thrd:b-3.OUR-KAFKA-HOST:9092/b]: b-3.OUR-KAFKA-HOST:9092/bootstrap: Broker handle is terminating (after 0ms in state DOWN) (_DESTROY)
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: JoinError::Panic(Id(1), ...)', src/main.rs:173:14
stack backtrace:
   0:     0x5599fef1bc84 - std::backtrace_rs::backtrace::libunwind::trace::he648b5c8dd376705
                               at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/../../backtrace/src/backtrace/libunwind.rs:93:5
   1:     0x5599fef1bc84 - std::backtrace_rs::backtrace::trace_unsynchronized::h5da3e203eef39e9f
                               at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/../../backtrace/src/backtrace/mod.rs:66:5
   2:     0x5599fef1bc84 - std::sys_common::backtrace::_print_fmt::h8d28d3f20588ae4c
                               at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/sys_common/backtrace.rs:65:5
   3:     0x5599fef1bc84 - <std::sys_common::backtrace::_print::DisplayBacktrace as core::fmt::Display>::fmt::hd9a5b0c9c6b058c0
                               at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/sys_common/backtrace.rs:44:22
   4:     0x5599fe54e1bf - core::fmt::rt::Argument::fmt::h0afc04119f252b53
                               at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/core/src/fmt/rt.rs:138:9
   5:     0x5599fe54e1bf - core::fmt::write::h50b1b3e73851a6fe
                               at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/core/src/fmt/mod.rs:1094:21
   6:     0x5599feee2836 - std::io::Write::write_fmt::h184eaf275e4484f0
                               at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/io/mod.rs:1714:15
   7:     0x5599fef1d0ff - std::sys_common::backtrace::_print::hf58c3a5a25090e71
                               at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/sys_common/backtrace.rs:47:5
   8:     0x5599fef1d0ff - std::sys_common::backtrace::print::hb9cf0a7c7f077819
                               at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/sys_common/backtrace.rs:34:9
   9:     0x5599fef1ccc3 - std::panicking::default_hook::{{closure}}::h066adb2e3f3e2c07
                               at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/panicking.rs:269:22
  10:     0x5599fef1dcc5 - std::panicking::default_hook::h277fa2776900ff14
                               at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/panicking.rs:288:9
  11:     0x5599fef1dcc5 - std::panicking::rust_panic_with_hook::hceaf38da6d9db792
                               at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/panicking.rs:705:13
  12:     0x5599fef1d7c2 - std::panicking::begin_panic_handler::{{closure}}::h2bce3ed2516af7df
                               at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/panicking.rs:597:13
  13:     0x5599fef1d726 - std::sys_common::backtrace::__rust_end_short_backtrace::h090f3faf8f98a395
                               at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/sys_common/backtrace.rs:151:18
  14:     0x5599fef1d711 - rust_begin_unwind
                               at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/panicking.rs:593:5
  15:     0x5599fe01fc12 - core::panicking::panic_fmt::h4ec8274704d163a3
                               at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/core/src/panicking.rs:67:14
  16:     0x5599fe0200e2 - core::result::unwrap_failed::h170bc2721a6c6ff2
                               at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/core/src/result.rs:1651:5
  17:     0x5599fe81e39c - tokio::runtime::context::runtime::enter_runtime::h10ab6c4e1562baec
  18:     0x5599fe8414e7 - kafka_delta_ingest::main::h43894231b37e2322
  19:     0x5599fe8133e3 - std::sys_common::backtrace::__rust_begin_short_backtrace::hc2b14238c4bc1fc1
  20:     0x5599fe8409db - main
  21:     0x7f8fb86c4d0a - __libc_start_main
  22:     0x5599fe068caa - _start
  23:                0x0 - <unknown>

unable to run example code: "thread 'main' panicked at 'called `Option::unwrap()` on a `None` value', src\main.rs:91:18"

Hi! I'm back again. Again, apologies if this ends up being my own error/misunderstanding, I'm very new here!

When I try following the "Starting Worker Processes" steps in the Using Azure Event Hubs section of the README, I get this error:
thread 'main' panicked at 'called Option::unwrap()on aNone value', src\main.rs:91:18

Full input and error:

$ RUST_LOG=debug cargo run ingest \
>   --allowed_latency 60 \
>   --app_id web_requests \
>   --transform 'date: substr(meta.producer.timestamp, `0`, `10`)' \
>   --transform 'meta.kafka.offset: kafka.offset' \
>   --transform 'meta.kafka.partition: kafka.partition' \
>   --transform 'meta.kafka.topic: kafka.topic' \
>   --auto_offset_reset earliest \
> web_requests ./tests/data/web_requests
    Finished dev [unoptimized + debuginfo] target(s) in 1.04s
     Running `target\debug\kafka-delta-ingest.exe ingest --allowed_latency 60 --app_id web_requests --transform 'date: substr(meta.producer.timestamp, `0`, `10`)' --transform 'meta.kafka.offset: kafka.offset' --transform 'meta.kafka.partition: kafka.partition' --transform 'meta.kafka.topic: kafka.topic' --auto_offset_reset earliest web_requests ./tests/data/web_requests`
thread 'main' panicked at 'called `Option::unwrap()` on a `None` value', src\main.rs:91:18
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
error: process didn't exit successfully: `target\debug\kafka-delta-ingest.exe ingest --allowed_latency 60 --app_id web_requests --transform 'date: substr(meta.producer.timestamp, `0`, `10`)' --transform 'meta.kafka.offset: kafka.offset' --transform 'meta.kafka.partition: kafka.partition' --transform 'meta.kafka.topic: kafka.topic' --auto_offset_reset earliest web_requests ./tests/data/web_requests` (exit code: 101)      

The error seems to occur with unwrapping the seek_offsets argument ID in main.rs. None of the examples show anyone using the seek_offsets argument. Based on that and my poking around in the code, I thought that one was optional to use, so I'm not sure how it's ending up with a None value.

I have tried using seek_offsets but I'm having trouble getting the format down for how to use it.

I have also tried following the steps in the README for Using Azure Event Hubs in order to connect my existing Kafka stream (in an Event Hub instance) to a delta table (in ADLS) but I'm getting the same exact error.

It seems like I'm the only one encountering these errors -- is there another version of KDI I should be using? Is there something I should've done besides just forking and cloning KDI then just starting to follow the README instructions?

If someone is able to help I would really appreciate it! Thank you.

Kafka-connect sink connector

Hello, I would like to know if it's in the roadmap of this project to work on a kafka-connect sink connector more than a daemon.

kafka -> deltaLakeSinkConnector -> deltaLake

Thank you all

use of deprecated method `chrono::DateTime::<Tz>::timestamp_nanos`: use `timestamp_nanos_opt()` instead

Error compiling the project with rustc 1.73.0

rustc 1.73.0 (cc66ad468 2023-10-03)
cargo 1.73.0 (9c4383fb5 2023-08-26)

error: use of deprecated method `chrono::DateTime::<Tz>::timestamp_nanos`: use `timestamp_nanos_opt()` instead
   --> src/coercions.rs:141:52
    |
141 |         .map(|dt: DateTime<Utc>| Value::Number((dt.timestamp_nanos() / 1000).into()))
    |                                                    ^^^^^^^^^^^^^^^
    |
note: the lint level is defined here
   --> src/lib.rs:7:9
    |
7   | #![deny(warnings)]
    |         ^^^^^^^^
    = note: `#[deny(deprecated)]` implied by `#[deny(warnings)]`

error: use of deprecated method `chrono::DateTime::<Tz>::timestamp_nanos`: use `timestamp_nanos_opt()` instead
  --> src/dead_letters.rs:47:34
   |
47 |             timestamp: timestamp.timestamp_nanos() / 1000,
   |                                  ^^^^^^^^^^^^^^^

error: use of deprecated method `chrono::DateTime::<Tz>::timestamp_nanos`: use `timestamp_nanos_opt()` instead
  --> src/dead_letters.rs:59:34
   |
59 |             timestamp: timestamp.timestamp_nanos() / 1000,
   |                                  ^^^^^^^^^^^^^^^

error: use of deprecated method `chrono::DateTime::<Tz>::timestamp_nanos`: use `timestamp_nanos_opt()` instead
  --> src/dead_letters.rs:72:34
   |
72 |             timestamp: timestamp.timestamp_nanos() / 1000,

Possibility of adding Python bindings to this project

Would it be possible to add Python bindings to this project?

I think a lot of programmers are comfortable writing Python, but aren't comfortable with Rust. I'm assuming the Python bindings could be added like they were for the delta-rs project. Thoughts?

feature request: record metrics and export to prometheus

Apologies if this functionality already exists and I've missed it.

It would be great if the program could record metrics and then make them available to Prometheus (e.g. using the prometheus_exporter crate).

(I appreciate that I can add the functionality myself and send a PR; this task is a placeholder for discussion, in case anyone else works on it in the meantime ;))

Add cron-style "Trigger Once" feature

Low volume Kafka topics could benefit from a cron-style launch option like Spark's "trigger once" feature discussed in https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html.

I'm thinking we add a command line option to indicate the app should terminate after reading up to latest offsets (I would like to mimic kafkacat's --end option semantics), lookup latest offsets in Kafka for each partition, and run the streaming writer up to that point. We would need to do some restructuring of the latency timer. We might keep this issue in mind while working on #74.

Transform failed (using Azure Event Hubs & ADLS Gen 2 storage account)

Hi,

I am getting the following Transform failed warning:

2023-09-11T10:29:18 [WARN] - web_requests: Transform failed - partition 0, offset 22
2023-09-11T10:29:18 [DEBUG] - web_requests: Could not send to statsd Connection refused (os error 111)
2023-09-11T10:29:18 [DEBUG] - web_requests: Should complete record batch - latency test: 1740 >= 5000
2023-09-11T10:29:18 [DEBUG] - web_requests: Should complete record batch - buffer length test: 0 >= 5000
2023-09-11T10:29:18 [DEBUG] - web_requests: Should complete file - latency test: 1 >= 5
2023-09-11T10:29:18 [DEBUG] - web_requests: Should complete file - num bytes test: 0 >= 134217728
2023-09-11T10:29:18 [DEBUG] - web_requests: Could not send to statsd Connection refused (os error 111)
2023-09-11T10:29:18 [DEBUG] - web_requests: Could not send to statsd Connection refused (os error 111)

Here is my docker run command:

docker run -it --network=host ^
-e RUST_LOG="debug" ^
-e SSL_CERT_FILE=/etc/ssl/certs/ca-certificates.crt ^
-e AZURE_STORAGE_ACCOUNT_NAME=XXX ^
-e "AZURE_STORAGE_ACCOUNT_KEY=XXX" ^
-e RUST_BACKTRACE=full ^
kdi:0.1 ^
ingest web_requests abfss://[email protected]/web_requests ^
--allowed_latency 5 ^
--kafka XXX.servicebus.windows.net:9093 ^
--kafka_setting security.protocol=SASL_SSL ^
--kafka_setting sasl.mechanism=PLAIN ^
--kafka_setting sasl.username=$ConnectionString ^
--kafka_setting sasl.password=Endpoint=sb://XXX.servicebus.windows.net/;SharedAccessKeyName=XXX;SharedAccessKey=XXX ^
--kafka_setting socket.keepalive.enable=true ^
--kafka_setting metadata.max.age.ms=180000 ^
--kafka_setting heartbeat.interval.ms=3000 ^
--kafka_setting session.timeout.ms=30000 ^
--kafka_setting debug=broker,security,protocol ^
--app_id web_requests ^
--transform "date: substr(meta.producer.timestamp, '0', '10')" ^
--transform "meta.kafka.offset: kafka.offset" ^
--transform "meta.kafka.partition: kafka.partition" ^
--transform "meta.kafka.topic: kafka.topic" ^
--auto_offset_reset earliest

Submitted the standard payload (see below) using the Events Hub Generate data feature within the Azure Portal and this was successfully sent (confirmed by kcat).

Also successfully uploaded the following first Delta transaction containing the schema (https://github.com/delta-io/kafka-delta-ingest/blob/main/tests/data/web_requests/_delta_log/00000000000000000000.json) to the relevant directory:

{"commitInfo":{"timestamp":1564524295023,"operation":"CREATE TABLE","operationParameters":{"isManaged":"false","description":null,"partitionBy":"[]","properties":"{}"},"isBlindAppend":true}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"22ef18ba-191c-4c36-a606-3dad5cdf3830","format":{"provider":"parquet","options":{}},"schemaString":"{"type":"struct","fields":[{"name":"meta","type":{"type":"struct","fields":[{"name":"producer","type":{"type":"struct","fields":[{"name":"timestamp","type":"string","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"kafka","type":{"type":"struct","fields":[{"name":"offset","type":"long","nullable":true,"metadata":{}},{"name":"topic","type":"string","nullable":true,"metadata":{}},{"name":"partition","type":"integer","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"method","type":"string","nullable":true,"metadata":{}},{"name":"session_id","type":"string","nullable":true,"metadata":{}},{"name":"status","type":"integer","nullable":true,"metadata":{}},{"name":"url","type":"string","nullable":true,"metadata":{}},{"name":"uuid","type":"string","nullable":true,"metadata":{}},{"name":"date","type":"string","nullable":true,"metadata":{}}]}","partitionColumns":["date"],"configuration":{},"createdTime":1564524294376}}

To continue to use Rust version 1.67, I tweaked the Cargo.lock and Cargo,toml files (see below for Cargo.toml), but not expected to be the cause of any errors.

Really appreciate any help you can offer.


Standard Payload:

{
"status": 200,
"session_id": "7c28bcf9-be26-4d0b-931a-3374ab4bb458",
"method": "GET",
"meta": {
"producer": {
"timestamp": "2021-03-24T15:06:17.321710+00:00"
}
},
"uuid": "831c6afa-375c-4988-b248-096f9ed101f8",
"url": "http://www.example.com"
}


Cargo.toml:

[package]
name = "kafka-delta-ingest"
version = "0.2.0"
authors = ["R. Tyler Croy [email protected]", "Christian Williams [email protected]"]
edition = "2018"

[dependencies]
anstream = "=0.3.2"
anyhow = "=1.0.26"
async-trait = "=0.1.53"
apache-avro = "=0.14.0"
base64 = "=0.13.0"
bytes = "=1.1.0"
chrono = "=0.4.24"
clap = { version = "=4.3.0", features = ["color"] }
clap_builder = "=4.3.0"
clap_lex = "=0.5.0"
dipstick = "=0.9.0"
dotenv = "=0.15"
env_logger = "=0.10.0"
futures = "=0.3.15"
half = "=2.2.1"
jmespatch = { version = "=0.3.0", features = ["sync"] }
lazy_static = "=1.4.0"
log = "=0.4.17"
maplit = "=1.0.2"
rdkafka = { version = "=0.28.0", features = ["ssl-vendored"] }
rusoto_core = { version = "=0.46.0" }
rusoto_credential = { version = "=0.46.0" }
rusoto_s3 = { version = "=0.46.0" }
schema_registry_converter = { version = "=3.1.0", features = ["easy", "json", "avro"] }
serde = { version = "=1.0.140", features = ["derive"] }
serde_json = "=1.0.82"
strum = "=0.20.0"
strum_macros = "=0.20.0"
thiserror = "=1.0.31"
tokio = { version = "=1.25.0", features = ["full"] }
tokio-stream = { version = "=0.1.8", features = ["fs"] }
tokio-util = "=0.6.3"
uuid = { version = "=0.8.2", features = ["serde", "v4"] }

deltalake = { version = "=0.11.0", features = ["s3", "azure"] }
dynamodb_lock = "=0.4.3"

sentry

sentry = { version = "=0.23.0", optional = true }
url = "2.3"

[features]
sentry-ext = ["sentry"]
dynamic-linking = [ "rdkafka/dynamic-linking" ]

[dev-dependencies]
deltalake = { version = "=0.11.0", features = ["s3", "azure", "json"] }
utime = "0.3"
serial_test = "=0.6.0"
tempfile = "=3.2.0"
azure_core = "=0.10.0"
azure_storage = "=0.10.0"
azure_storage_blobs = "=0.10.0"
time = "0.3"

[profile.release]
lto = true

Track rebalance in tests

A follow up issue ticket for #23 (review) discussion. There's different behaviours when new consumer added before/after other consumers processing the messages. Both cases need to be tested. However there's no straightforward way yet to capture that. Please refer to the linked discussion for possible solutions

Run latency timer on an interval (instead of on message received)

Kafka delta ingest currently requires a message to trigger the latency timer check for flushing buffers. It would be better if we ran the latency timer on a separate thread to trigger flushes - especially for low volume topics that receive periodic writes. For these low volume topics that are triggered periodically - we suck everything in from Kafka and it just sits in buffer until we get another message.

Update 2021-10-22:

An alternative/probably better solution than running the latency timer on a separate thread is to switch from the rdkafka async StreamConsumer to the non-async BasicConsumer which requires a poll and then a check on Option. We have to process messages in order end-to-end so we can’t take much advantage of async anyway. Managing the consumer.poll ourselves would give us an opportunity to check latency on an interval within the run loop whether a message was received or not.

no support for gzip?

Here's a test case with redpanda and vector.

git clone https://github.com/bbigras/test-kdi-gzip

docker-compose up -d
# start kdi
# add a couple of lines to the `log` file
# wait for one "Delta write for version x has completed in x millis"
# stop kdi

# uncomment "compression" in vector.toml
docker-compose up -d --force-recreate

# start kdi again

run kdi with:

target/release/kafka-delta-ingest ingest my-topic ~/checkout_folder/delta \
  --checkpoints \
  -l 5 \
  --max_messages_per_batch 2 \
  --kafka 127.0.0.1:9092 \
  -K "auto.offset.reset=earliest" \
  -t \
  	  'date: substr(timestamp, `0`, `10`)' \
  	  'message: message' \
	  'timestamp: timestamp' \
      'meta.kafka.offset: kafka.offset' \
      'meta.kafka.partition: kafka.partition' \
      'meta.kafka.topic: kafka.topic'

and you'll get:

[2021-10-22T20:06:43Z ERROR kafka_delta_ingest] Error getting BorrowedMessage while processing stream KafkaError (Message consumption error: NotImplemented (Local: Not implemented))

No snapshot or version 0 found

Hello guys, I'm trying to set up kafka-delta-ingest with AWS S3, but get an error

Ingest service exited with error DeltaTable { source: NotATable("No snapshot or version 0 found, perhaps s3://mybucket/rust-deltalake/raw.test.mytopic is an empty dir?") }

I'm running the command in a local Docker container using a local Kafka broker.

command: ["/usr/local/bin/kafka-delta-ingest", "ingest",
              "raw.test.mytopic",                                                   
              "s3://mybucket/rust-deltalake/raw.test.mytopic", 
              "--allowed_latency", "300",
              "--kafka", "host.docker.internal:9092",
              "--consumer_group_id", "my_consumer_group"]

Did I miss a step or a configuration option? Should I create the empty deltalake myself?

Thanks,
T

How to ingest the key?

I have looked in the repo but could not find the configuration to include the key while ingesting. Can someone help with this please?

Sample message in my topic: student_id,{"first_name":"john","last_name":"doe","courses":["math","english"]}
I want to write all these fields (including the key student_id) to the Delta table.

Update arrow_schema from metadata after write

arrow_schema_ref is stale once the arrow writer is created. However it's expected that delta table's schema could change, but it's not going to be tracked.

AC: Update arrow_schema_ref from table.get_metadata once in a while, after writers or new version for example.

Remove some annoying debug messages

Debug is inreadable now, for example

DEBUG kafka_delta_ingest::instrumentation] StatsHandler received stat MessageTransformed with value 1
DEBUG kafka_delta_ingest::instrumentation] StatsHandler awaiting channel.

and more

Send stats async in background

Our stats logging code are written as self.log_message_buffered(msg, state.value_buffers.len()).await, which blocks the current task from execution until the stats send async function has completed. It's better to handle these kind of instrumentation io in a separate tokio task in the background.

Add in logic for "Create Delta Table if not exist"

In playing around with KDI I realize it doesn't have logic to "Create Delta Table if not exist". If you point the container to a Filesystem that has no Delta table, it complains:
image

Whereas if you point it to an existing Delta Table, works fine:
image

I think this should be trivial to add in, I created a "KDI Java" just for the heck it of it after chatting with @thovoll and it's a matter of checking if the Trx log version is !(log.snapshot().getVersion() > -1):
https://github.com/mdrakiburrahman/kafka-delta-ingest-adls/blob/2802eead5174e5fc00da047470572d5fd4c76981/1.KDI-Java/src/main/java/com/microsoft/kdi/KDI.java#L261

My use case is I'm pointing KDI at a large number of Change Data Capture Topics from Debezium - so it can start pumping out Delta Tables at scale. I can't assume the Delta Tables already exist - so this would be great to have!

Can't write to a table with `DECIMAL(38,0)` field.

I have a column with type DECIMAL(38,0, but when I try to start ingestion I get the following error:

Ingest service exited with error DeltaWriteFailed { ending_offsets: "{\"0\":4999}", partition_counts: "{\"0\":5000}", source: Arrow { source: JsonError("Decimal(38, 0) type is not supported") } }

I believe this is a compatibility issue with the www.github.com/delta-io/delta-rs, a help would be appreciated. I am ready to provide more information.

Thanks in advance !

Add commit loop with conflict resolution for txn actions

#23 replaces the write ahead log with a txn per partition written to the delta log πŸŽ‰. After this is merged, we must follow up with another change to add a retry loop in case two workers try to commit the same delta log version and one fails. The failing worker will need to do conflict resolution in case of overlapping partition assignment between the two commits.

Remove clap_app macro and use builder syntax

Great project! I ran into an issue building the project and ended up adding builder syntax to the project. Are the authors open to something like this instead of using clap_app?

let matches = App::new("kafka-delta-ingest-program")
        .version("CARGO_PKG_VERSION")
        .about("Service for ingesting messages from a Kafka topic and writing them to a Delta table")
        .subcommand(App::new("ingest")
            .about("Starts a stream that consumes from a Kafka topic and and writes to a Delta table")

            .arg(Arg::new("TOPIC").required(true).help("The Kafka topic to stream from"))
                        .arg(Arg::new("TABLE_LOCATION").required(true).help("The Delta table location to write to"))

                        .arg(Arg::new("KAFKA_BROKERS").short('k').long("kafka").takes_value(true).help("The Kafka broker connection string to use when connecting to Kafka"))
                        .arg(Arg::new("CONSUMER_GROUP").short('g').long("consumer_group_id").takes_value(true).help("The consumer group id to use when subscribing to Kafka."))

Kafka client collaboration

Hi there,

I see you are using rdkafka for your Kafka interaction. We (= InfluxDB IOx) implemented our own pure-Rust async client called rskafka which you might find useful. There is also an intro blog post: https://www.influxdata.com/blog/building-simple-pure-rust-async-apache-kafka-client/

I think you might be missing a feature or two, but nothing fundamental (to my knowledge).

Let me know if you have questions, or want to chat.

Cheers,
Marco

PS: Also feel free to close this ticket.

To connect with aws

what are the files that needs to be changed to connect with aws.
I am facing issues to ingest data to aws s3.
Can anyone please help?

Ability to not use write-ahead-log

There can be cases where we wouldn't want the extra overhead of dynamo-db, with the understanding of increasing risk of data-duplication.

This would mean relying entirely on the Kafka consumer offsets for restarts and recoveries.

Do not reset state when there's only addition of new partitions to the consumer

Whenever rebalance event is happening, each workers is revoked and then assigned the list of partitions each. As a result, consumer might get new partitions or remove existing. In sake of simplicity and safety in both cases the consumer state is reset and it start consuming messages from previous commit/offset.

However, in theory if the partitions is only added to the consumers and all of existing are kept with a consumer it makes sense to not reset the state for performance reasons.

See https://github.com/delta-io/kafka-delta-ingest/pull/27/files/8e598e35406f5d2978306134ddde9d9d54b4dccc#diff-b1a35a68f14e696205874893c07fd24fdb88882b47c23cc0e0c80a30c7d53759R420 for inspiration

Getting "unexpected argument" error for all argument IDs when running KDI

Apologies if this ends up being my own error/misunderstanding, I'm very new here!

I'm working on getting kafka-delta-ingest up and running locally, but when I try following the "Starting Worker Processes" steps in the Using Azure Event Hubs section of the README, I get "unexpected argument" errors.

When I try running it as it appears in the README:

$ RUST_LOG=debug cargo run ingest web_requests ./tests/data/web_requests \
>   --allowed_latency 60 \
>   --app_id web_requests \
>   --transform 'date: substr(meta.producer.timestamp, `0`, `10`)' \
>               'meta.kafka.offset: kafka.offset' \
>               'meta.kafka.partition: kafka.partition' \
>               'meta.kafka.topic: kafka.topic' \
>   --auto_offset_reset earliest
    Finished dev [unoptimized + debuginfo] target(s) in 1.18s
     Running `target\debug\kafka-delta-ingest.exe ingest web_requests ./tests/data/web_requests --allowed_latency 60 --app_id web_requests --transform 'date: substr(meta.producer.timestamp, `0`, `10`)' 'meta.kafka.offset: kafka.offset' 'meta.kafka.partition: kafka.partition' 'meta.kafka.topic: kafka.topic' --auto_offset_reset earliest`
error: unexpected argument '--allowed_latency' found

  tip: to pass '--allowed_latency' as a value, use '-- --allowed_latency'

Usage: kafka-delta-ingest.exe ingest <topic> <table_location>

For more information, try '--help'.
error: process didn't exit successfully: `target\debug\kafka-delta-ingest.exe ingest web_requests ./tests/data/web_requests --allowed_latency 60 --app_id web_requests --transform 'date: substr(meta.producer.timestamp, `0`, `10`)' 'meta.kafka.offset: kafka.offset' 'meta.kafka.partition: kafka.partition' 'meta.kafka.topic: kafka.topic' --auto_offset_reset earliest` (exit code: 2)

When I try running it with a space between the dashes and the argument IDs, I get the same error but with this line changed slightly:
error: unexpected argument 'allowed_latency' found
When I change the order of the argument IDs, it gives me the error for whichever one is first.

When I use the short argument IDs (like "-k" or "-a"), it stops giving me unexpected argument errors, but gives me a different error instead:

$ RUST_LOG=debug cargo run ingest web_requests ./tests/data/web_requests \
>   -l 60 \
>   -a web_requests \
>   -K "auto.offset.reset=earliest" \
>   -t 'date: substr(meta.producer.timestamp, `0`, `10`)'
    Finished dev [unoptimized + debuginfo] target(s) in 1.23s
     Running `target/debug/kafka-delta-ingest ingest web_requests ./tests/data/web_requests -l 60 -a web_requests -K auto.offset.reset=earliest -t 'date: substr(meta.producer.timestamp, `0`, `10`)'`
thread 'main' panicked at 'Mismatch between definition and access of `APP_ID`. Unknown argument or group id.  Make sure you are using the argument id and not the short or long flags
', src/main.rs:65:18
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

I have also tried following the steps in the README for Using Azure Event Hubs in order to connect my existing Kafka stream (in an Event Hub instance) to a delta table (in ADLS) but I'm getting the same exact errors.

Is anyone else experiencing this issue with the argument IDs? Is the documentation outdated and I should be trying something else? Or did I do something wrong in setup (forking and cloning the repo in VS Code then just following the steps in the readme)?

Any help would be hugely appreciated. Thank you!

Safe seek to the next messages

Whenever there's reset state in consumer state, it reads latest offsets from delta, adds 1 and seeks to them. However this will not work if the resulting offset is missing (offsets are no contiguous in kafka) or not yet occupied (no new messages since). In latter case the consumer will read messages from the beginning

Consumer cannot reseek back to offset 0 on conflict resolution

2021-08-19T16:38:45 kafka_delta_ingest [INFO] - WORKER-1: pre-process message 1:10 -- id:46
2021-08-19T16:38:46 kafka_delta_ingest::instrumentation [INFO] - WORKER-1: Delta write for version 7 has completed in 414 millis
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-1: REBALANCE - Partition assignments revoked
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: REBALANCE - Received new partition assignment list [0, 1]
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-1: REBALANCE - Received new partition assignment list [2, 3]
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 1:0 -- id:4
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: Resetting state with partitions: [1, 0]
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: Seeking consumer to 1:0
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: Partition 0 has no recorded offset. Not seeking consumer.
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 1:0 -- id:4
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:0 -- id:3
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:0 -- id:3
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:1 -- id:7
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:1 -- id:7
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:2 -- id:10
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:2 -- id:10
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:3 -- id:11
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:3 -- id:11
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:4 -- id:14
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:4 -- id:14
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:5 -- id:15
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:5 -- id:15
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:6 -- id:24
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:6 -- id:24
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:7 -- id:25
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:7 -- id:25
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:8 -- id:28
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:8 -- id:28
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:9 -- id:29
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:9 -- id:29
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-1: pre-process message 2:0 -- id:0
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-1: Resetting state with partitions: [3, 2]
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-1: Seeking consumer to 3:31
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-1: Seeking consumer to 2:26
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-1: pre-process message 2:0 -- id:0
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: Conflict offset for partition 1: state=Some(0), delta=10
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: Resetting state with partitions: [1, 0]
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: Seeking consumer to 1:10
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: Partition 0 has no recorded offset. Not seeking consumer.
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:10 -- id:36
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:10 -- id:36
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:11 -- id:37
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:11 -- id:37
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:12 -- id:38
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:12 -- id:38
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:13 -- id:39
2021-08-19T16:38:46 kafka_delta_ingest [INFO] - WORKER-2: pre-process message 0:13 -- id:39

Worker 2 is assigned to partition 0,1. But worker 1 is still working meanwhile. So when worker 2 tries to write new version, it faces the conflict on partition 1. On conflict the worker 2, gets latest offsets from delta and seeks. But there's no stored offset for partition 0, so it does not seek it, however worker 2 already consumed some messages from partition 0, which result in their absence.

The buffer_len function in DeltaWriter is inefficient

Description

The body of buffer_len is

self.cursor.data().len()

The .data() call is copying all bytes from cursor and returns new vector. Since we just need a len() this is inefficient, moreover it's being called on each new message. I couldn't find a possible easy workaround for it, also we don't have an access to internal buffer in in-memory cursor. However it seems that introducing a new function in parquet::file::writer::InMemoryWriteableCursor should resolve that, e.g

    pub fn len(&self) -> usize {
        let inner = self.buffer.lock().unwrap();
        inner.get_ref().len()
    }

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    πŸ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. πŸ“ŠπŸ“ˆπŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❀️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.