Code Monkey home page Code Monkey logo

delta-rs's Introduction

delta-rs logo

A native Rust library for Delta Lake, with bindings to Python
Python docs · Rust docs · Report a bug · Request a feature · Roadmap

Deltalake Crate Deltalake Deltalake #delta-rs in the Delta Lake Slack workspace

The Delta Lake project aims to unlock the power of the Deltalake for as many users and projects as possible by providing native low-level APIs aimed at developers and integrators, as well as a high-level operations API that lets you query, inspect, and operate your Delta Lake with ease.

Source Downloads Installation Command Docs
PyPi Downloads pip install deltalake Docs
Crates.io Downloads cargo add deltalake Docs

Table of contents

Quick Start

The deltalake library aims to adopt patterns from other libraries in data processing, so getting started should look familiar.

from deltalake import DeltaTable, write_deltalake
import pandas as pd

# write some data into a delta table
df = pd.DataFrame({"id": [1, 2], "value": ["foo", "boo"]})
write_deltalake("./data/delta", df)

# Load data from the delta table
dt = DeltaTable("./data/delta")
df2 = dt.to_pandas()

assert df.equals(df2)

The same table can also be loaded using the core Rust crate:

use deltalake::{open_table, DeltaTableError};

#[tokio::main]
async fn main() -> Result<(), DeltaTableError> {
    // open the table written in python
    let table = open_table("./data/delta").await?;

    // show all active files in the table
    let files: Vec<_> = table.get_file_uris()?.collect();
    println!("{:?}", files);

    Ok(())
}

You can also try Delta Lake docker at DockerHub | Docker Repo

Get Involved

We encourage you to reach out, and are committed to provide a welcoming community.

Integrations

Libraries and frameworks that interoperate with delta-rs - in alphabetical order.

Features

The following section outlines some core features like supported storage backends and operations that can be performed against tables. The state of implementation of features outlined in the Delta protocol is also tracked.

Cloud Integrations

Storage Rust Python Comment
Local done done
S3 - AWS done done requires lock for concurrent writes
S3 - MinIO done done requires lock for concurrent writes
S3 - R2 done done No lock required when using AmazonS3ConfigKey::CopyIfNotExists
Azure Blob done done
Azure ADLS Gen2 done done
Microsoft OneLake done done
Google Cloud Storage done done

Supported Operations

Operation Rust Python Description
Create done done Create a new table
Read done done Read data from a table
Vacuum done done Remove unused files and log entries
Delete - partitions done Delete a table partition
Delete - predicates done done Delete data based on a predicate
Optimize - compaction done done Harmonize the size of data file
Optimize - Z-order done done Place similar data into the same file
Merge done done Merge a target Delta table with source data
FS check done done Remove corrupted files from table

Protocol Support Level

Writer Version Requirement Status
Version 2 Append Only Tables done
Version 2 Column Invariants done
Version 3 Enforce delta.checkpoint.writeStatsAsJson open
Version 3 Enforce delta.checkpoint.writeStatsAsStruct open
Version 3 CHECK constraints semi-done
Version 4 Change Data Feed
Version 4 Generated Columns
Version 5 Column Mapping
Version 6 Identity Columns
Version 7 Table Features
Reader Version Requirement Status
Version 2 Column Mapping
Version 3 Table Features (requires reader V7)

delta-rs's People

Contributors

aersam avatar avriiil avatar blajda avatar cmackenzie1 avatar dandandan avatar dennyglee avatar dependabot[bot] avatar dimonchik-suvorov avatar dispanser avatar eeroel avatar emcake avatar fvaleye avatar haruband avatar houqp avatar iajoiner avatar ion-elgreco avatar marijncv avatar mightyshazam avatar mosyp avatar mrpowers avatar nevi-me avatar r3stl355 avatar roeap avatar rtyler avatar smurphy000 avatar universalmind303 avatar viirya avatar wjones127 avatar xianwill avatar zijie0 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

delta-rs's Issues

Duplicate files after update

Hi,

After playing with the update() function for #124 I noticed that the number of files in one of my test runs was off.

I was able to reproduce the behavior in my branch. The test loads version 0 of a data set, and then updates to the newest version. The expected outcome is that get_files() produces the same set of files as when directly loading the newest version. However, the file that was already in version 0 appears twice in the list.

update() is the only call site to restore_checkpoint that does not start empty, and I believe it should start from an empty state when loading a checkpoint, as checkpoints are typically self-contained. Instead, it applies the actions read from the checkpoint to the previously stored state.

I may be misunderstanding how checkpoints are supposed to work, but the provided dataset has been produced by a very simple spark function (delta 0.8.0), and I guess that defines a compatibility baseline.

Trailing slashes cause S3 binding to throw NotATable

The difference between deltalake::open_table("s3://mybucket/path/") and deltalake::open_table("s3://mybucket/path") is that the former throws an error, the latter succeeds.

IMHO Delta-rs shouldn't be so silly about redundant slashes.

Cannot build on rust 1.46

This is an error I saw on a Linux/amd64 machine while building the rust bindings. My guess is we just need to document the right version of the compiler and toolchain we should be relying upon

➜  rust git:(40443d6) rustc --version
rustc 1.46.0 (04488afe3 2020-08-24)
➜  rust git:(40443d6) cargo test --features azure               
   Compiling deltalake v0.2.0 (/home/tyler/source/github/delta-io/delta.rs/rust)
error: no rules expected the token `,`
  --> rust/tests/read_error_test.rs:12:46
   |
12 |         deltalake::DeltaTableError::NotATable,
   |                                              ^ no rules expected this token in macro call

error: aborting due to previous error

error: could not compile `deltalake`.

To learn more, run the command again with --verbose.

Investigate possibility of WASM target

Discussion with @nevi-me brought up an interesting idea, there may be a way to compile delta.rs to a wasm (Web Assembly) target and access a Delta table directly from within the browser.

This isn't something I'm personally going to have time to tinker with, but it would be a really interesting experiment to see how far we can push this library.

Call to DeltaTable from Python

Hello guys,

I am using delta-rs from Python and it seems that calling DeltaTable() blocks after calling it once.
I tested with DeltaTable on AWS S3.

Context information:

deltalake="==0.2.0" and deltalake="==0.1.4" installed via pip 20.3.3
Python 3.7.8

Scenario:

table_list = ["s3://bucket-test/delta_table1",  "s3://bucket-test/delta_table2"]
for table in table_list:
    delta_table = DeltaTable(table)
    print(delta_table.files())

The first call to DeltaTable() with files() worked and the files are printed. But on the second call, it blocks my application.
I tried to use it without the loop, with sequential calls, but it is the same.

Is it normal?

Thank you for your help :)

Small bug on Schema json method

In Schema class, the json method is returning wrong attribute:

--- a/python/deltalake/schema.py
+++ b/python/deltalake/schema.py
class Schema:
         return self.__str__()
 
     def json(self) -> Dict[str, Any]:
-        return self.json_schema
+        return self.json_value

Maybe add a test:

def test_schema_json():
    table_path = "../rust/tests/data/simple_table"
    dt = DeltaTable(table_path)
    schema = dt.schema()
    assert schema.json() == {
        "fields": [{"metadata": {}, "name": "id", "nullable": True, "type": "long"}],
        "type": "struct",
    }

list_objs is likely going to be problematic with large buckets/directories

The way find_latest_check_point_for_version is interacting with list_objs in a way that I think is going to be inefficient for a large storage bucket (think thousands of objects).

Right now list_objs is returning a stream of items in the bucket/directory and then find_latest_check_point_for_version will match against those results.

At least with the S3 ListObjects API, the caller can specify a prefix to limit the responses to, which may be one way we can prevent list_objs from returning too many results just to find the latest checkpoint.

Since I'm brainstorming right now, list_objs could just take some Option<Predicate> assuming Azure Blob storage has a similar functionality, that would allow delta.rs to shunt the initial filtering of objects into the cloud provider.

File not found exception for a delta table stored on cloud storage

Passing a list of files to pyarrow.dataset.dataset is treated as a list of local files, quoting the docs:

List of file paths:

    Create a FileSystemDataset from explicitly given files. The files must be located on the same filesystem given by the filesystem parameter. Note that in contrary of construction from a single file, passing URIs as paths is not allowed.

Reading tables off of cloud storage fails with a file not found exception.
To fix this, dataset should be called with the filesystem parameter and the file paths should be only the keys and not the full URIs.

I'll submit a PR.

Implement optimize command

Databricks delta lake provides an optimize command that is incredibly useful for compacting small files (especially for files written by streaming jobs which are inevitably small). delta-rs should provide a similar optimize command.

I think a first pass could ignore the bin-pack and zorder features provided by Databricks and simply combine small files into large files while also setting dataChange: false in the delta log entry. An MVP might look like:

  • Query the delta log for file stats relevant to the current optimize run (based on predicate)
  • Group files that may be compacted based on common partition membership and less than optimal size (1 GB)
  • Combine files by looping through each input file and writing its record batches into the new output file.
  • Commit a new delta log entry with a single add action and a separate remove action for each compacted file. All committed add and remove actions must set the dataChange flag as false to prevent re-processing by stream consumers.

Files no longer relavant to the log may be cleaned up later by vacuum (see #97)

Feature gate some modules/dependencies

Specifically the S3 (and Azure, if #30 is implemented) functionality could be feature gated fairly easily.

This would also be required for #22 because currently Rusoto depends on hyper, which won't compile to WASM (rusoto/rusoto#1708 would be the likely eventual solution there).

to_pyarrow_table() should use full schema provided by delta table

Hi

First of all thanks for your work on delta RS this package is super useful for our work and our team led by @fvaleye 👍

We have an issue using the function to_pyarrow_table()

DeltaTable(table_path).to_pyarrow_table() returns the dataset but with an incomplete schema, we miss one column which is definitely in one ouf our parquet files.

Using DeltaTable(table_path).schema() we got the complete schema including the missing column.

Thanks in advance

Transaction log commit with filesystem storage backend is not atomic

We should write out the new transaction log into a temporary file first, then use atomic file rename to create the new transaction log instead. Current implementation writes directly to the new transaction log path, which could lead to log corruption if process crashes before completing the write.

Datafusion integration assumes table's data files are local

The Datafusion integration passes a list of file paths representing a table's actual data to Datafusion's ParquetExec, but if the Delta table's StorageBackend is anything other than the FileStorageBackend then this fails because the files aren't local.

I'm not sure where this should be handled though - it feels like this should be part of Datafusion or an extension crate?

Unexpected behavior in `update()`

I believe there's a subtle bug in the update() method, and how it decides on
either loading from a checkpoint or applying the log from json files.

Here's the important bits:

Ok(last_check_point) => {
	if self.last_check_point.is_none()
		|| self.last_check_point == Some(last_check_point)
	{
		self.last_check_point = Some(last_check_point);
		self.restore_checkpoint(last_check_point).await?;
		self.version = last_check_point.version + 1;
	}
}

What this is doing is

  • load from checkpoint if new checkpoint is the same as previously used checkpoint
  • if the new checkpoint is different, don't use it but instead apply the json deltas.

I believe what we want instead is use the json deltas if the new checkpoint is the
same as the previous one, and use the checkpoint if it is newer than what we previously
had loaded.

Note that the final result - an up to date delta table - is still achieved, but it
unnecessarily loads checkpoints or json deltas in either scenario, so it could be
more efficient.

I wasn't able to write a self-contained test inside rust, as I'm not sure about
the status of write support (and checkpointing in particular), but I validated
my assumptions by running a spark shell session to generate commits and a rust
session (with some println! sparkles) side by side, based on the following
"data generator" in scala:

def createCommits(numCommits: Int, deltaUrl: String): Unit =
     (0 until numCommits).foreach { c =>
		 Seq(c).toDF("version").write.format("delta").mode("append").save(deltaUrl)
	 }
  1. start: 5 commits createCommits(5, "<some delta path>")
reading delta table: "<some delta path>"
[apply_logs_after_current_version] applied 0
[apply_logs_after_current_version] applied 1
[apply_logs_after_current_version] applied 2
[apply_logs_after_current_version] applied 3
[apply_logs_after_current_version] applied 4
[apply_logs_after_current_version] end of log : 5, rollback
initial table loaded, version 4
waiting on keypress for update, current version 4

Init: loading the table, all good.

  1. 4 more commits: no checkpoints yet
updating ...
[update] process started
[update] no previous checkpoint, trying version = 5 ff
[apply_logs_after_current_version] applied 5
[apply_logs_after_current_version] applied 6
[apply_logs_after_current_version] applied 7
[apply_logs_after_current_version] applied 8
[apply_logs_after_current_version] end of log : 9, rollback
waiting on keypress for update, current version 8

Expected behavior: start with our current version and apply new comits in sequence.

  1. 4 more commits: new checkpoint at v10
updating ...
[update] process started
[update] retrieved checkpoint CheckPoint { version: 10, size: 13, parts: None }, previous = None
[update] determined to load a checkpoint: CheckPoint { version: 10, size: 13, parts: None }
[restore_checkpoint]: CheckPoint { version: 10, size: 13, parts: None }
[apply_logs_after_current_version] applied 11
[apply_logs_after_current_version] applied 12
[apply_logs_after_current_version] end of log : 13, rollback
waiting on keypress for update, current version 12

Expected behavior: load checkpoint, apply changes from there

  1. 4 more commits: no new checkpoint, still at v10
updating ...
[update] process started
[update] retrieved checkpoint CheckPoint { version: 10, size: 13, parts: None }, previous = Some(CheckPoint { version: 10, size: 13, parts: None })
[update] determined to load a checkpoint: CheckPoint { version: 10, size: 13, parts: None }
[restore_checkpoint]: CheckPoint { version: 10, size: 13, parts: None }
[apply_logs_after_current_version] applied 11
[apply_logs_after_current_version] applied 12
[apply_logs_after_current_version] applied 13
[apply_logs_after_current_version] applied 14
[apply_logs_after_current_version] applied 15
[apply_logs_after_current_version] applied 16
[apply_logs_after_current_version] end of log : 17, rollback
waiting on keypress for update, current version 16

Unexpected behavior: we already have read until version 12, but we reload from
checkpoint at version 10, applying more json than necessary (and a checkpoint
that does not help, either).

  1. no new commits, should do nothing
updating ...
[update] process started
[update] retrieved checkpoint CheckPoint { version: 10, size: 13, parts: None }, previous = Some(CheckPoint { version: 10, size: 13, parts: None })
[update] determined to load a checkpoint: CheckPoint { version: 10, size: 13, parts: None }
[restore_checkpoint]: CheckPoint { version: 10, size: 13, parts: None }
[apply_logs_after_current_version] applied 11
[apply_logs_after_current_version] applied 12
[apply_logs_after_current_version] applied 13
[apply_logs_after_current_version] applied 14
[apply_logs_after_current_version] applied 15
[apply_logs_after_current_version] applied 16
[apply_logs_after_current_version] end of log : 17, rollback
waiting on keypress for update, current version 16

Unexpected behavior: we reload starting from the previous checkpoint, even though
now new commits where added to the delta log.

  1. 4 more commits: new checkpoint at v20
updating ...
[update] process started
[update] retrieved checkpoint CheckPoint { version: 20, size: 23, parts: None }, previous = Some(CheckPoint { version: 10, size: 13, parts: None })
[apply_logs_after_current_version] applied 16
[apply_logs_after_current_version] applied 17
[apply_logs_after_current_version] applied 18
[apply_logs_after_current_version] applied 19
[apply_logs_after_current_version] applied 20
[apply_logs_after_current_version] end of log : 21, rollback
waiting on keypress for update, current version 20

Unexpected behavior: despite having a checkpoint at 20, we use json all
the way up for versions 16 .. 20.

I believe the expected behavior can be achieved with a single-character change
in the logical expression. I'd gladly provide a pull request.

Separating the log from the state

In the current architecture, the concepts of the delta table and the delta log are available in one single abstraction, DeltaTable. To update to the newest state, one can call update() and the table state follows accordingly.

However, I can see several use cases, mostly around stream processing, that are not actually interested in the current table state, but instead in the stream of changes. For such a use case, subscribing to a stream of actions would probably provide the better abstraction.

Ultimately, DeltaTable is just a representation of the aggregated state of the log up to a specific commit, so it would just be another subscriber to the delta log changes.

I wonder if it would make sense to have a first-class citizen, DeltaLog, exposing commits as a stream of batches of actions.

DeltaTable likely not handling log_path in a platform-safe manner

Lots of DeltaTable code is doing a primitive "join" (e.g. format!("{}/{}", log_path, other)) when creating paths. In the cases where this path is going to be a file system path, this isn't a safe way to create paths.

@houqp I think we're going to have to find some way to handle these path creations better than just munging strings.

avoid loading checkpoints created for older versions

Follow up from #124.

Since transaction version commit and checkpointing are not transactional, newer transaction versions can become visible to readers before a checkpoint is created. When we are reloading table from source, we could also compare current in memory table version with the version from a new latest checkpoint, if current in memory version is already ahead, then we can skip the checkpoint restoration.

Implement rename_obj (atomic rename) API in storage backend

As discussed in #89, for multi-writer support, the S3 storage backend requires an atomic rename operation that uses a distributed lock lease to ensure that log files are not overwritten by other writers.

Ultimately, the high-level logic of a DeltaTransaction commit should abstract over storage backends and look like:

* write temp file
* optimistic commit loop:
  * atomic rename
  * if version already exists
    * do conflict resolution if update, update temp file
  * increase version

Usage of the distributed lock should use a two-phase-commit and a compare-and-swap (i.e. conditional write) approach to guard the file write. The two-phase-commit is necessary so other writers can detect a stale transaction, determine state and continue or repair the previous transaction state. The compare-and-swap is required so a writer that has acquired the lock can fail in case it paused without knowing and exceeded the lock lease timeout.

Further discussion about the implementation (including a proposal for a DynamoDb schema that represents a lock) should continue in #89 before this issue is taken on.

Default to avoid checkpoint restore in update method

Follow up for #125 (comment).

When pulling in table updates, It would be more efficient to just load all the all the new versions commit by commit instead of rebuilding the state from the latest checkpoints. Checkpoints are mostly useful for bootstrapping purpose. Running tables are usually not too far behind from the latest commits.

Implement delta log checkpointing

JSON delta log entries should be consolidated into a parquet snapshot periodically so that log readers can deserialize the log quickly. The spark delta reference implementation consolidates json log entries into parquet checkpoint snapshots after every 10th JSON log formatted log entry. This behavior is also mentioned under the Checkpoints section in the delta protocol. delta-rs should offer a utility function to create parquet checkpoint snapshots similarly. An MVP implementation should minimally provide a parquet checkpoint utility function so that callers can schedule this appropriately.

Handle commitInfo action

This is likely a low priority issue, but is mentioned in the TODO list and is an aspect of the delta log protocol that is not yet implemented in delta-rs. As described in the protocol doc, commit provenance information, may be included in delta logs within a commitInfo action so delta-rs should be able to read and write these.

The reference implementation creates this action and prepends it to the others before writing the log entry.

A TODO exists in DeltaTransaction where we should be able to do something similar.

Expose schema API in python binding

Does the reader use the Parquet files' metadata to infer the schema?
If so, relying on the Delta log that could improve performance for operations that require the schema (especially when there are a lot of files involved).

Deltalake 0.4.3 (and 0.4.2) breaks our CI

Hi Again :)

We tried to upgrade our base code with deltalake 0.4.3 (also with 0.4.2) but we got an error during our CI process (when launching unit tests).

The error is
dags/utils/delta_lake_utils.py:4: in <module> from deltalake import DeltaTable, Schema ../../../virtualenv/python3.7.1/lib/python3.7/site-packages/deltalake/__init__.py:1: in <module> from .deltalake import RawDeltaTable, rust_core_version E ImportError: /home/travis/virtualenv/python3.7.1/lib/python3.7/site-packages/deltalake/deltalake.abi3.so: undefined symbol: renameat2

Everything is working well with deltalake 0.4.1 do you have any idea about this issue ?

FileNotFoundError

Hi. I run the following code to open an delta table on Azure Datalake Gen 2

`python

from deltalake import DeltaTable
import os
os.environ['AZURE_STORAGE_ACCOUNT'] = 'xxxxxx'
os.environ['AZURE_STORAGE_KEY'] = 'xxxxxxxxxxxxxxxxx'

dt = DeltaTable("abfss://[email protected]/delta/silver/rawdata/holdings/taxhld/v1.0")

dt.version()
dt.files()
dt.file_paths()
`
This all works fine and it lists all the parquet files in the folders, but when I do

df = dt.to_pyarrow_table()
I get this error

Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/usr/local/lib/python3.8/site-packages/deltalake/__init__.py", line 29, in to_pyarrow_table return self.to_pyarrow_dataset().to_table() File "/usr/local/lib/python3.8/site-packages/deltalake/__init__.py", line 26, in to_pyarrow_dataset return dataset(self._table.file_paths(), format="parquet") File "/usr/local/lib/python3.8/site-packages/pyarrow/dataset.py", line 674, in dataset return _filesystem_dataset(source, **kwargs) File "/usr/local/lib/python3.8/site-packages/pyarrow/dataset.py", line 426, in _filesystem_dataset fs, paths_or_selector = _ensure_multiple_sources(source, filesystem) File "/usr/local/lib/python3.8/site-packages/pyarrow/dataset.py", line 312, in _ensure_multiple_sources raise FileNotFoundError(info.path) FileNotFoundError: abfss://[email protected]/delta/silver/rawdata/holdings/taxhld/v1.0/company_name=xxx/source_db_name=xxx/source_fund_name=02/file_date=2020-01-01/part-00002-ab186831-cb3b-4294-8d2b-c2377e8eea52.c000.snappy.parquet
but the file does exist and is listed in the dt.file_paths()

WebPKIError when running azure test code

As of my current tree (40443d6390674092b8763cbeba1b1b03144f446c) it seems that the azure tests cannot properly work locally, but they continue to work fine (seemingly) in CI. I'm wondering if there's something magic about the tests passing in CI since they're technically running within Azure via GitHub Actions already.

Error

test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out

     Running /home/tyler/source/github/delta-io/delta.rs/target/debug/deps/azure_test-a5a0db4320167091

running 1 test
test azure::test_azure_simple ... FAILED

failures:

---- azure::test_azure_simple stdout ----
thread 'azure::test_azure_simple' panicked at 'called `Result::unwrap()` on an `Err` value: LoadCheckpoint { source: Storage { source: Azure { source: HyperError(hyper::Error(Connect, Custom { kind: Other, error: Custom { kind: InvalidData, error: WebPKIError(UnknownIssuer) } })) } } }', rust/tests/azure_test.rs:18:18
stack backtrace:
   0: rust_begin_unwind
             at /rustc/7eac88abb2e57e752f3302f02be5f3ce3d7adfb4/library/std/src/panicking.rs:483
   1: core::panicking::panic_fmt
             at /rustc/7eac88abb2e57e752f3302f02be5f3ce3d7adfb4/library/core/src/panicking.rs:85
   2: core::option::expect_none_failed
             at /rustc/7eac88abb2e57e752f3302f02be5f3ce3d7adfb4/library/core/src/option.rs:1234
   3: core::result::Result<T,E>::unwrap
             at /home/tyler/.rustup/toolchains/stable-aarch64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/result.rs:973
   4: azure_test::azure::test_azure_simple::{{closure}}
             at ./tests/azure_test.rs:16
   5: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /home/tyler/.rustup/toolchains/stable-aarch64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/future/mod.rs:80
   6: tokio::runtime::basic_scheduler::BasicScheduler<P>::block_on::{{closure}}::{{closure}}
             at /home/tyler/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.24/src/runtime/basic_scheduler.rs:131
   7: tokio::coop::with_budget::{{closure}}
             at /home/tyler/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.24/src/coop.rs:127
   8: std::thread::local::LocalKey<T>::try_with
             at /home/tyler/.rustup/toolchains/stable-aarch64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/thread/local.rs:272
   9: std::thread::local::LocalKey<T>::with
             at /home/tyler/.rustup/toolchains/stable-aarch64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/thread/local.rs:248
  10: tokio::coop::with_budget
             at /home/tyler/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.24/src/coop.rs:120
  11: tokio::coop::budget
             at /home/tyler/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.24/src/coop.rs:96
  12: tokio::runtime::basic_scheduler::BasicScheduler<P>::block_on::{{closure}}
             at /home/tyler/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.24/src/runtime/basic_scheduler.rs:131
  13: tokio::runtime::basic_scheduler::enter::{{closure}}
             at /home/tyler/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.24/src/runtime/basic_scheduler.rs:213
  14: tokio::macros::scoped_tls::ScopedKey<T>::set
             at /home/tyler/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.24/src/macros/scoped_tls.rs:63
  15: tokio::runtime::basic_scheduler::enter
             at /home/tyler/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.24/src/runtime/basic_scheduler.rs:213
  16: tokio::runtime::basic_scheduler::BasicScheduler<P>::block_on
             at /home/tyler/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.24/src/runtime/basic_scheduler.rs:123
  17: tokio::runtime::Runtime::block_on::{{closure}}
             at /home/tyler/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.24/src/runtime/mod.rs:444
  18: tokio::runtime::context::enter
             at /home/tyler/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.24/src/runtime/context.rs:72
  19: tokio::runtime::handle::Handle::enter
             at /home/tyler/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.24/src/runtime/handle.rs:76
  20: tokio::runtime::Runtime::block_on
             at /home/tyler/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.24/src/runtime/mod.rs:441
  21: azure_test::azure::test_azure_simple
             at ./tests/azure_test.rs:9
  22: azure_test::azure::test_azure_simple::{{closure}}
             at ./tests/azure_test.rs:9
  23: core::ops::function::FnOnce::call_once
             at /home/tyler/.rustup/toolchains/stable-aarch64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ops/function.rs:227
  24: core::ops::function::FnOnce::call_once
             at /rustc/7eac88abb2e57e752f3302f02be5f3ce3d7adfb4/library/core/src/ops/function.rs:227
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.


failures:
    azure::test_azure_simple

test result: FAILED. 0 passed; 1 failed; 0 ignored; 0 measured; 0 filtered out

error: test failed, to rerun pass '--test azure_test'

Environment

❯ uname -a
Linux coconut 5.6.14-1-default #1 SMP Wed May 20 08:32:48 UTC 2020 (b0ab48a) aarch64 aarch64 aarch64 GNU/Linux

tyler in coconut in delta.rs on  azure-tests [?] via 🦀 v1.48.0 
❯ rustc --version
rustc 1.48.0 (7eac88abb 2020-11-16)

Add support for more authentication schemas for Azure

Currently the only way to authenticate against Azure is by providing accountname and/or key/sas token. For ADLS Gen2 with ACL enabled data lakes, we need also to authenticate by supplying service principals (Azure AD).
A good way of working would be to set env variables as follow:

  • AZURE_STORAGE_AUTH_TYPE
  • AZURE_STORAGE_SPN_CLIENT_ID
  • AZURE_STORAGE_SPN_TENANT_ID
  • AZURE_STORAGE_SPN_CLIENT_SECRET

race condition in s3 tests

Our S3 test is failing randomly at the moment. Based on my quick glance, the specific error message we are getting is usually caused by access of global hyper client from multiple threads. However, that doesn't seem to be the case for us because we are not using a global S3 client. Perhaps rusoto is doing something funny with hyper? One thing I noticed is if I only run one of the S3 tests, it passes reliably.

Test fails on Windows

Run cargo test

Output:

running 2 tests
test read_delta_2_0_table_without_version ... ok
test read_delta_2_0_table_with_version ... FAILED

---- read_delta_2_0_table_with_version stdout ----
thread 'read_delta_2_0_table_with_version' panicked at 'assertion failed: (left == right)
left: ["part-00000-7c2deba3-1994-4fb8-bc07-d46c948aa415-c000.snappy.parquet", "part-00001-c373a5bd-85f0-4758-815e-7eb62007a15c-c000.snappy.parquet", "part-00000-cb6b150b-30b8-4662-ad28-ff32ddab96d2-c000.snappy.parquet"],
right: ["part-00000-cb6b150b-30b8-4662-ad28-ff32ddab96d2-c000.snappy.parquet", "part-00000-7c2deba3-1994-4fb8-bc07-d46c948aa415-c000.snappy.parquet", "part-00001-c373a5bd-85f0-4758-815e-7eb62007a15c-c000.snappy.parquet"]', rust\tests\read_delta-0-2-0_test.rs:67:5

The arrays contain the same elements, but in a different order.

Expected layout and structure of Azure storage accounts should be documented

In order to use Delta Lake and Azure, the library expects the storage account to be configured with Azure Delta Lake Service Gen2 enabled, which is a non-default setting.

Some README or user-guide type documentation on how a Delta Table(s) should look in Azure, including the Azure configuration, would be beneficial for us to include!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.