Code Monkey home page Code Monkey logo

arrow-datafusion's Introduction

Apache DataFusion

Crates.io Apache licensed Build Status Discord chat

Website | Guides | API Docs | Chat

logo

Apache DataFusion is a very fast, extensible query engine for building high-quality data-centric systems in Rust, using the Apache Arrow in-memory format. Python Bindings are also available. DataFusion offers SQL and Dataframe APIs, excellent performance, built-in support for CSV, Parquet, JSON, and Avro, extensive customization, and a great community.

Here are links to some important information

What can you do with this crate?

DataFusion is great for building projects such as domain specific query engines, new database platforms and data pipelines, query languages and more. It lets you start quickly from a fully working engine, and then customize those features specific to your use. Click Here to see a list known users.

Contributing to DataFusion

Please see the contributor guide and communication pages for more information.

Crate features

This crate has several features which can be specified in your Cargo.toml.

Default features:

  • array_expressions: functions for working with arrays such as array_to_string
  • compression: reading files compressed with xz2, bzip2, flate2, and zstd
  • crypto_expressions: cryptographic functions such as md5 and sha256
  • datetime_expressions: date and time functions such as to_timestamp
  • encoding_expressions: encode and decode functions
  • parquet: support for reading the Apache Parquet format
  • regex_expressions: regular expression functions, such as regexp_match
  • unicode_expressions: Include unicode aware functions such as character_length
  • unparser : enables support to reverse LogicalPlans back into SQL

Optional features:

  • avro: support for reading the Apache Avro format
  • backtrace: include backtrace information in error messages
  • pyarrow: conversions between PyArrow and DataFusion types
  • serde: enable arrow-schema's serde feature

Rust Version Compatibility Policy

DataFusion's Minimum Required Stable Rust Version (MSRV) policy is to support each stable Rust version for 6 months after it is released. This generally translates to support for the most recent 3 to 4 stable Rust versions.

We enforce this policy using a MSRV CI Check

arrow-datafusion's People

Contributors

alamb avatar andygrove avatar berkaysynnada avatar comphead avatar crepererum avatar dandandan avatar dependabot[bot] avatar devinjdangelo avatar izveigor avatar jackwener avatar jayzhan211 avatar jefffrey avatar jimexist avatar jonahgao avatar jorgecarleitao avatar kou avatar kszucs avatar liukun4515 avatar mustafasrepo avatar nealrichardson avatar pitrou avatar ted-jiang avatar tustvold avatar viirya avatar waynexia avatar weijun-h avatar wesm avatar xhochy avatar xudong963 avatar yjshen avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

arrow-datafusion's Issues

Optimize hash join inner workings

The hash join algorithm is core

Optimize some hot functions in the hash join algorithm.

  • Don't store hashes in hashtable
  • Do some more optimization for hashing primitives

Support JOIN table alias

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

Support self join queries with table alias:

SELECT * FROM t1 JOIN t1 as t2 ON t1.id = t2.id

Describe the solution you'd like

Datafusion should execute this query without error.

[Rust] Implement optimizer rule to remove redundant projections

Note: migrated from original JIRA: https://issues.apache.org/jira/browse/ARROW-6892

Currently we have code in the SQL query planner that wraps aggregate queries in a projection (if needed) to preserve the order of the final results. This is needed because the aggregate query execution always returns a result with grouping expressions first and then aggregate expressions.

It would be better (simpler, more readable code) to always wrap aggregates in projections and have an optimizer rule to remove redundant projections. There are likely other use cases where redundant projections might exist too.

Support column qualifer in queries

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

I would like to be able to execute the following SQL queries with datafusion:

SELECT t1.id, t2.user FROM t1 JOIN t2 ON t1.id = t2.id

Describe the solution you'd like
Datafusion should be able to execute this query without error.

Use released version of arrow

Currently the Datafusion crates pin a version of arrow by git revision. Unfortunately this makes it complicated to use Datafusion with other libraries that also depend on arrow, making it is necessary to update all the version pins in lock-step across potentially multiple repositories. #39 helps somewhat with this, but doesn't help with crates that Datafusion isn't dependent on that themselves depend on arrow, or if you want to configure different feature flags from what Datafusion sets.

Unfortunately there is an outstanding bug /feature omission with cargo that makes patch not work for overriding git version pins, and the workarounds on the ticket no longer seem to work.

If Datafusion instead depended instead on a released version of arrow, Cargo's semver compatibility rules would avoid a lot of this pain for most users, and would allow users to use the patch syntax to override the arrow version in use for all their dependencies within their workspace, if they want to use an unreleased arrow version.

I'm not sure how feasible any of this is, but thought I would raise a ticket to maybe kickstart a discussion on this

Implement scalable distributed joins

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

The main issue limiting scalability in Ballista today is that joins are implemented as hash joins where each partition of the probe side causes the entire left side to be loaded into memory.

Describe the solution you'd like

To make this scalable we need to hash partition left and right inputs so that we can join the left and right partitions in parallel.

There is already work underway in DataFusion to implement this that we can leverage.

Describe alternatives you've considered
None

Additional context
None

Create starting point for user guide in mdbook format

This issue is for creating a starting point for a user guide in mdbook format, covering both DataFusion and Ballista.

The intent is to get the basic structure in place so that anyone can start contributing content.

Add TPC-H query 19 to regression test

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
Add TPC-H query 19 to regression test

Describe the solution you'd like
n/a

Describe alternatives you've considered
n/a

Additional context
n/a

Make external hostname in executor optional

Having the ability to set an external/advertised hostname is great since it provides users a lot of flexibility in network deployments. However, having it a required argument is a pain for the most common scenario, where the scheduler, client and executors talk to each other in the same network (e.g. k8s or docker-compose).

We should make the external hostname optional. If the scheduler receives executor metadata without a hostname, it should register the caller's IP address as the hostname.

This will make it easier to deploy the executors as a kubernetes deployment, or to docker-compose scale ballista-executor= in the integration tests.

Vectorized hashing for hash aggregation code

Updating the hash aggregate implementation to use vectorized hashing should give a decent speed up to queries that are dependant on fast hash aggregate implementations.

Currently keys are generated of type Vec<u8> and are hashed row-by-row which causes

  • more memory usage
  • slow re-hashing of the backing hashmap
  • type un-aware hashing for simple primitive values

The implementation should also solve hash collisions, so the original should be able to be compared with the values.

There is some WIP code here apache/arrow#9213 which can be used as a starting point / to continue from.

Vectorize hash join collision check

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
Further optimize the hash join algorithm

Describe the solution you'd like
There are a couple of optimizations we could implement:

  • Vectorize the row-equality check which now uses the equal_rows functions. We should be able to speed this up by vectorizing this, and also specialize it for handling non-null batches too. We probably can utilize the kernels take and eq here.
  • Don't use a Hashmap but a Vec (or similar) with a certain amount of buckets (proportional to the number of rows or the expected number of keys in the left side). I tried this, but as it causes much more collisions than we have currently, it causes a big (3x) slowdown, so vectorizing the collision check is a prerequisite.

Additional context

https://www.cockroachlabs.com/blog/vectorized-hash-joiner/
https://dare.uva.nl/search?identifier=5ccbb60a-38b8-4eeb-858a-e7735dd37487

Remove hard-coded Ballista version from scripts

Ballista has some scripts for building Docker images and they have hard-coded versions, such as:

BALLISTA_VERSION=0.4.2-SNAPSHOT

We should change the scripts to get the version number from cargo instead. There are probably cargo utilities available to help with this.

Address performance/execution plan of TPCH query 19

Describe the bug
TPC-H Query 19 consumes a lot of memory and takes forever in memory on a 8 core machine on SF=1 in memory (with 100%cpu usage across cores) to execute. That likely has to do with the plan misses a equi-join and turns it into a cross join.

To Reproduce
Run query 19, check plan and execution time.

Expected behavior
The query is expected to finish faster (<1s) and shouldn't need a cross join.

Additional context
@alamb :

In order to get Q19 to work, the optimizer needs to be clever enough to recognize that p_partkey = l_partkey which appears in each of the three clauses is a join predicate. The query shouldn't be doing a CROSS JOIN, it should be doing an INNER JOIN on p_partkey = l_partkey

Note this is Q19:

select
sum(l_extendedprice * (1 - l_discount) ) as revenue
from
lineitem,
part
where
(
p_partkey = l_partkey
and p_brand = ‘[BRAND1]’
and p_container in ( ‘SM CASE’, ‘SM BOX’, ‘SM PACK’, ‘SM PKG’)
and l_quantity >= [QUANTITY1] and l_quantity <= [QUANTITY1] + 10
and p_size between 1 and 5
and l_shipmode in (‘AIR’, ‘AIR REG’)
and l_shipinstruct = ‘DELIVER IN PERSON’
)
or
(
p_partkey = l_partkey
and p_brand = ‘[BRAND2]’
and p_container in (‘MED BAG’, ‘MED BOX’, ‘MED PKG’, ‘MED PACK’)
and l_quantity >= [QUANTITY2] and l_quantity <= [QUANTITY2] + 10
and p_size between 1 and 10
and l_shipmode in (‘AIR’, ‘AIR REG’)
and l_shipinstruct = ‘DELIVER IN PERSON’
)
or
(
p_partkey = l_partkey
and p_brand = ‘[BRAND3]’
and p_container in ( ‘LG CASE’, ‘LG BOX’, ‘LG PACK’, ‘LG PKG’)
and l_quantity >= [QUANTITY3] and l_quantity <= [QUANTITY3] + 10
and p_size between 1 and 15
and l_shipmode in (‘AIR’, ‘AIR REG’)
and l_shipinstruct = ‘DELIVER IN PERSON’
);

Deduplicate `README.md` files in root and datafusion directory

Describe the bug
The README.md files in the root and in the datafusion directory are (mostly) copies of each other.
There are small differences between the two, e.g. added functionality that is not included.

**Expected outcome **
Differences between the two are removed. Unneeded details in the root readme should be avoided.

Integrate Ballista scheduler with DataFusion

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

The Ballista scheduler breaks a query down into stages based on changes in partitioning in the plan, where each stage is broken down into tasks that can be executed concurrently.

Rather than trying to run all the partitions at once, Ballista executors process n concurrent tasks at a time and then request new tasks from the scheduler.

This approach would help DataFusion scale better and it would be ideal to use the same scheduler to scale across cores in DataFusion and across nodes in Ballista.

Describe the solution you'd like

Implement an extensible scheduler in DataFusion and have Ballista extend it to provide distributed execution.

Describe alternatives you've considered
None

Additional context
None

DataFusion logo needs a white background

Describe the bug
The logo looks bad for users using GitHub in dark mode:

dflogodarkmode

To Reproduce
Use dark mode.

Expected behavior
Logo should look good. Perhaps a white background?

Additional context
None

Add Dockerfile for Ballista scheduler UI

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
Provide a convenient way for developers to build and run the UI during development without installing additional software locally.

Describe the solution you'd like
Dockerfile.

Describe alternatives you've considered
None

Additional context
None

Use arrow eq kernels in CaseWhen expression evaluation

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
Arrow kernels are implemented and tested and should be used when available.

Describe the solution you'd like
Use more of arrows kernels to benefit more from arrow speed / potential improvements.

Describe alternatives you've considered
n/a

Additional context
n/a

tpch-gen is broken

❯ ./tpch-gen.sh
./tpch-gen.sh: line 21: ./dev/build-set-env.sh: No such file or directory

[Ballista] Fix integration test script

Now that Ballista has dependencies on DataFusion in the same repo, the integration test scripts and Dockerfile need updating to add DataFusion source into the docker image.

We may want to add Ballista to the default workspace as part of this fix.

DataFrame.collect() should return async stream rather than a Vec<RecordBatch>

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
Currently, any DataFrame implementation must load the entire result set into RAM in the collect() method because it has to return a Vec<RecordBatch>.

Describe the solution you'd like
Change the signature to return an async stream of batches.

Describe alternatives you've considered
None

Additional context
None

Support hash repartion elimination

When data is partitioned by an expression from a table source or an intermediate hash repartion (from joins), and the expression is the same, we should remove the repartion.

This can be added as an optimization rule on the physical plan, as the partitioning information is not available on the logical plan.

Consolidate TPC-H benchmarks

There are now two copies of the benchmark crate, one for DataFusion and one for Ballista, and we should be able to combine them now.

Ballista should not have separate DataFrame implementation

hen building the Ballista POC it was necessary to implement a new DataFrame API that wrapped the DataFusion API.

One issue is that it wasn't possible to override the behavior of the collect method to make it use the Ballista context rather than the DataFusion context.

Now that the projects are in the same repo it should be easier to fix this and have users always use the DataFusion DataFrame API.

Add example of distributed query execution with DataFusion/Ballista

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
There are no examples for Ballista, other than the benchmark suite.

Describe the solution you'd like
A simple example for Ballista.

Describe alternatives you've considered
N/A

Additional context
N/A

Consider using GitHub pages for DataFusion/Ballista documentation

ASF supports enabling GitHub pages via asf.yaml [1] and we could consider using this to publish end user documentation for DataFusion and Ballista.

Ballista already has a minimal user guide in mdbook format that is part of this repo [2] and is currently published to https://ballistacompute.org/docs/ but it would be better to publish it here as part of the DataFusion user guide?

[1] https://cwiki.apache.org/confluence/display/INFRA/git+-+.asf.yaml+features
[2] https://github.com/apache/arrow-datafusion/tree/master/ballista/docs/user-guide

Define data store path for `standalone` mode

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
StandaloneClient is only being created by using temporary file for storage in executor, though it has option to define specific path.

Describe the solution you'd like
Add option value, to decide using temporary file/specific file for storage in executor.

DataFrame.collect() should be extensible

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
Ballista provides its own execution context but uses the DataFusion DataFrame. Calling collect on the DataFrame will run the query in-memory rather than distributed and Ballista users must instead extract the logical plan from the DataFrame and call BallistaContext.collect instead. This is not good UX.

Describe the solution you'd like
As a user, I would just like to call DataFrame.collect() and have it run either in-memory or distributed depending on how I created the context.

I think the way to do this is by making it possible to customize ExecutionContext and override the behavior when a DataFrame is collected.

Describe alternatives you've considered
None

Additional context
None

Implement hash-partitioned hash aggregate

This is (up to 4x in my earlier tests) faster than the current implementation that collects all parts to one "full" for cases with very high cardinality in the aggregate (think deduplication code). However, not hash partitioning is faster for very "simple" aggregates as less work needs to be done.

We probably need some fast way to have a rough estimate on the number of distinct values in the aggregate keys, maybe dynamically based on the first batch(es).

Also this work creates a building block for ballista to distribute data across workers, parallelizing it, avoiding collecting it to one worker, and making it scale to bigger datasets.

Address performance/execution plan of TPCH query 9

Describe the bug
TPC-H Query 9 consumes a lot of memory and takes ~8s in memory on a 8 core machine on SF=1 in memory (with 100%cpu usage across cores) to execute. That likely has to do with the plan misses a equi-join and turns it into a cross join.

To Reproduce
Run query 9, check plan and execution time.

Expected behavior
The query is expected to finish faster (<1s) and shouldn't need a cross join.

Additional context
N/A

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.