apache / arrow-datafusion Goto Github PK
View Code? Open in Web Editor NEWApache DataFusion SQL Query Engine
Home Page: https://datafusion.apache.org/
License: Apache License 2.0
Apache DataFusion SQL Query Engine
Home Page: https://datafusion.apache.org/
License: Apache License 2.0
When the left side of the join is very small (compared to the right side) it is better to load the left side once (and broadcast it in Ballista).
In DF this avoids hash partitioning, in Ballista this also avoids data shuffling.
Describe the bug
TPC-H Query 19 consumes a lot of memory and takes forever in memory on a 8 core machine on SF=1 in memory (with 100%cpu usage across cores) to execute. That likely has to do with the plan misses a equi-join and turns it into a cross join.
To Reproduce
Run query 19, check plan and execution time.
Expected behavior
The query is expected to finish faster (<1s) and shouldn't need a cross join.
Additional context
@alamb :
In order to get Q19 to work, the optimizer needs to be clever enough to recognize that p_partkey = l_partkey which appears in each of the three clauses is a join predicate. The query shouldn't be doing a CROSS JOIN, it should be doing an INNER JOIN on p_partkey = l_partkey
Note this is Q19:
select
sum(l_extendedprice * (1 - l_discount) ) as revenue
from
lineitem,
part
where
(
p_partkey = l_partkey
and p_brand = ‘[BRAND1]’
and p_container in ( ‘SM CASE’, ‘SM BOX’, ‘SM PACK’, ‘SM PKG’)
and l_quantity >= [QUANTITY1] and l_quantity <= [QUANTITY1] + 10
and p_size between 1 and 5
and l_shipmode in (‘AIR’, ‘AIR REG’)
and l_shipinstruct = ‘DELIVER IN PERSON’
)
or
(
p_partkey = l_partkey
and p_brand = ‘[BRAND2]’
and p_container in (‘MED BAG’, ‘MED BOX’, ‘MED PKG’, ‘MED PACK’)
and l_quantity >= [QUANTITY2] and l_quantity <= [QUANTITY2] + 10
and p_size between 1 and 10
and l_shipmode in (‘AIR’, ‘AIR REG’)
and l_shipinstruct = ‘DELIVER IN PERSON’
)
or
(
p_partkey = l_partkey
and p_brand = ‘[BRAND3]’
and p_container in ( ‘LG CASE’, ‘LG BOX’, ‘LG PACK’, ‘LG PKG’)
and l_quantity >= [QUANTITY3] and l_quantity <= [QUANTITY3] + 10
and p_size between 1 and 15
and l_shipmode in (‘AIR’, ‘AIR REG’)
and l_shipinstruct = ‘DELIVER IN PERSON’
);
ASF supports enabling GitHub pages via asf.yaml [1] and we could consider using this to publish end user documentation for DataFusion and Ballista.
Ballista already has a minimal user guide in mdbook format that is part of this repo [2] and is currently published to https://ballistacompute.org/docs/ but it would be better to publish it here as part of the DataFusion user guide?
[1] https://cwiki.apache.org/confluence/display/INFRA/git+-+.asf.yaml+features
[2] https://github.com/apache/arrow-datafusion/tree/master/ballista/docs/user-guide
Is your feature request related to a problem or challenge? Please describe what you are trying to do.
Ballista provides its own execution context but uses the DataFusion DataFrame. Calling collect
on the DataFrame will run the query in-memory rather than distributed and Ballista users must instead extract the logical plan from the DataFrame and call BallistaContext.collect
instead. This is not good UX.
Describe the solution you'd like
As a user, I would just like to call DataFrame.collect()
and have it run either in-memory or distributed depending on how I created the context.
I think the way to do this is by making it possible to customize ExecutionContext
and override the behavior when a DataFrame is collected.
Describe alternatives you've considered
None
Additional context
None
Is your feature request related to a problem or challenge? Please describe what you are trying to do.
Add TPC-H query 19 to regression test
Describe the solution you'd like
n/a
Describe alternatives you've considered
n/a
Additional context
n/a
As suggested by @andygrove and @returnString on #33 (comment)
Make it easy for crates that depend on DataFusion to get the correct version of the arrow-rs crate without having to manually keep versions in check
DataFusion should pub
lically re-export the Arrow crate so that dependent crates automatically get the same Arrow version
Updating the hash aggregate implementation to use vectorized hashing should give a decent speed up to queries that are dependant on fast hash aggregate implementations.
Currently keys are generated of type Vec<u8>
and are hashed row-by-row which causes
The implementation should also solve hash collisions, so the original should be able to be compared with the values.
There is some WIP code here apache/arrow#9213 which can be used as a starting point / to continue from.
Describe the bug
The README.md
files in the root and in the datafusion
directory are (mostly) copies of each other.
There are small differences between the two, e.g. added functionality that is not included.
**Expected outcome **
Differences between the two are removed. Unneeded details in the root readme should be avoided.
Is your feature request related to a problem or challenge? Please describe what you are trying to do.
Further optimize the hash join algorithm
Describe the solution you'd like
There are a couple of optimizations we could implement:
equal_rows
functions. We should be able to speed this up by vectorizing this, and also specialize it for handling non-null batches too. We probably can utilize the kernels take
and eq
here.Hashmap
but a Vec
(or similar) with a certain amount of buckets (proportional to the number of rows or the expected number of keys in the left side). I tried this, but as it causes much more collisions than we have currently, it causes a big (3x) slowdown, so vectorizing the collision check is a prerequisite.Additional context
https://www.cockroachlabs.com/blog/vectorized-hash-joiner/
https://dare.uva.nl/search?identifier=5ccbb60a-38b8-4eeb-858a-e7735dd37487
Describe the bug
TPC-H Query 9 consumes a lot of memory and takes ~8s in memory on a 8 core machine on SF=1 in memory (with 100%cpu usage across cores) to execute. That likely has to do with the plan misses a equi-join and turns it into a cross join.
To Reproduce
Run query 9, check plan and execution time.
Expected behavior
The query is expected to finish faster (<1s) and shouldn't need a cross join.
Additional context
N/A
When data is partitioned by an expression from a table source or an intermediate hash repartion (from joins), and the expression is the same, we should remove the repartion.
This can be added as an optimization rule on the physical plan, as the partitioning information is not available on the logical plan.
Is your feature request related to a problem or challenge? Please describe what you are trying to do.
The main issue limiting scalability in Ballista today is that joins are implemented as hash joins where each partition of the probe side causes the entire left side to be loaded into memory.
Describe the solution you'd like
To make this scalable we need to hash partition left and right inputs so that we can join the left and right partitions in parallel.
There is already work underway in DataFusion to implement this that we can leverage.
Describe alternatives you've considered
None
Additional context
None
This is (up to 4x in my earlier tests) faster than the current implementation that collects all parts to one "full" for cases with very high cardinality in the aggregate (think deduplication code). However, not hash partitioning is faster for very "simple" aggregates as less work needs to be done.
We probably need some fast way to have a rough estimate on the number of distinct values in the aggregate keys, maybe dynamically based on the first batch(es).
Also this work creates a building block for ballista to distribute data across workers, parallelizing it, avoiding collecting it to one worker, and making it scale to bigger datasets.
Note: migrated from original JIRA: https://issues.apache.org/jira/browse/ARROW-8910
Add support for explicit casts between signed and unsigned ints.
Note that the type coercion optimizer rule shoud never implicity perform casts between types when data would be lost e.g. from negative value to unsigned type.
Is your feature request related to a problem or challenge? Please describe what you are trying to do.
StandaloneClient
is only being created by using temporary file for storage in executor, though it has option to define specific path.
Describe the solution you'd like
Add option value, to decide using temporary file/specific file for storage in executor.
Is your feature request related to a problem or challenge? Please describe what you are trying to do.
Currently, any DataFrame implementation must load the entire result set into RAM in the collect()
method because it has to return a Vec<RecordBatch>
.
Describe the solution you'd like
Change the signature to return an async stream of batches.
Describe alternatives you've considered
None
Additional context
None
Is your feature request related to a problem or challenge? Please describe what you are trying to do.
Arrow kernels are implemented and tested and should be used when available.
Describe the solution you'd like
Use more of arrows kernels to benefit more from arrow speed / potential improvements.
Describe alternatives you've considered
n/a
Additional context
n/a
Is your feature request related to a problem or challenge? Please describe what you are trying to do.
Provide a convenient way for developers to build and run the UI during development without installing additional software locally.
Describe the solution you'd like
Dockerfile.
Describe alternatives you've considered
None
Additional context
None
There is a good chance, that DataFusion can support wasm target, if configured correctly.
Related issue: https://issues.apache.org/jira/browse/ARROW-11615
Polars proof of concept (shows that arrow and most of the query plan magic work already): https://github.com/ritchie46/polars/blob/master/js-polars/app.js
Is your feature request related to a problem or challenge? Please describe what you are trying to do.
The Ballista scheduler breaks a query down into stages based on changes in partitioning in the plan, where each stage is broken down into tasks that can be executed concurrently.
Rather than trying to run all the partitions at once, Ballista executors process n concurrent tasks at a time and then request new tasks from the scheduler.
This approach would help DataFusion scale better and it would be ideal to use the same scheduler to scale across cores in DataFusion and across nodes in Ballista.
Describe the solution you'd like
Implement an extensible scheduler in DataFusion and have Ballista extend it to provide distributed execution.
Describe alternatives you've considered
None
Additional context
None
As suggested by @returnString on #33 (comment)
Keep dependencies up to date, with as little manual intervention as possile
Add some sort of automation / bot that automatically updates dependencies. @returnString suggests that dependable is a good potential choice
❯ ./tpch-gen.sh
./tpch-gen.sh: line 21: ./dev/build-set-env.sh: No such file or directory
[description coming]
Ballista is currently excluded from the Cargo workspace and we should now include it.
Is your feature request related to a problem or challenge? Please describe what you are trying to do.
There are no examples for Ballista, other than the benchmark suite.
Describe the solution you'd like
A simple example for Ballista.
Describe alternatives you've considered
N/A
Additional context
N/A
Note: migrated from original JIRA: https://issues.apache.org/jira/browse/ARROW-10597
The idea is to get the code to the point where all clippy lints can be enabled.
Here is a list of clippy lints that was disabled:
The goal of this ticket is to enable them (maybe it would be better to break it into multiple PRs/subtasks)
Currently the Datafusion crates pin a version of arrow by git revision. Unfortunately this makes it complicated to use Datafusion with other libraries that also depend on arrow, making it is necessary to update all the version pins in lock-step across potentially multiple repositories. #39 helps somewhat with this, but doesn't help with crates that Datafusion isn't dependent on that themselves depend on arrow, or if you want to configure different feature flags from what Datafusion sets.
Unfortunately there is an outstanding bug /feature omission with cargo that makes patch not work for overriding git version pins, and the workarounds on the ticket no longer seem to work.
If Datafusion instead depended instead on a released version of arrow, Cargo's semver compatibility rules would avoid a lot of this pain for most users, and would allow users to use the patch syntax to override the arrow version in use for all their dependencies within their workspace, if they want to use an unreleased arrow version.
I'm not sure how feasible any of this is, but thought I would raise a ticket to maybe kickstart a discussion on this
Ballista has some scripts for building Docker images and they have hard-coded versions, such as:
BALLISTA_VERSION=0.4.2-SNAPSHOT
We should change the scripts to get the version number from cargo instead. There are probably cargo utilities available to help with this.
This issue is for creating a starting point for a user guide in mdbook format, covering both DataFusion and Ballista.
The intent is to get the basic structure in place so that anyone can start contributing content.
Having the ability to set an external/advertised hostname is great since it provides users a lot of flexibility in network deployments. However, having it a required argument is a pain for the most common scenario, where the scheduler, client and executors talk to each other in the same network (e.g. k8s or docker-compose).
We should make the external hostname optional. If the scheduler receives executor metadata without a hostname, it should register the caller's IP address as the hostname.
This will make it easier to deploy the executors as a kubernetes deployment, or to docker-compose scale ballista-executor= in the integration tests.
Is your feature request related to a problem or challenge? Please describe what you are trying to do.
There is no need to support multiple executor clusters from a scheduler, so the namespace of an executor is implicitly defined by the scheduler it connects to. See https://the-asf.slack.com/archives/C01QUFS30TD/p1618679585211100 for more context
Describe the solution you'd like
See apache/arrow#10084
There are now two copies of the benchmark crate, one for DataFusion and one for Ballista, and we should be able to combine them now.
See PR in #35
hen building the Ballista POC it was necessary to implement a new DataFrame API that wrapped the DataFusion API.
One issue is that it wasn't possible to override the behavior of the collect method to make it use the Ballista context rather than the DataFusion context.
Now that the projects are in the same repo it should be easier to fix this and have users always use the DataFusion DataFrame API.
The Cargo.toml files need updating to point to this new repo instead of the original Arrow monorepo.
The hash join algorithm is core
Optimize some hot functions in the hash join algorithm.
Note: migrated from original JIRA: https://issues.apache.org/jira/browse/ARROW-11697
select database();
we want to get database name in function database(), so add a provider for user defined function.
Is your feature request related to a problem or challenge? Please describe what you are trying to do.
Support self join queries with table alias:
SELECT * FROM t1 JOIN t1 as t2 ON t1.id = t2.id
Describe the solution you'd like
Datafusion should execute this query without error.
Note: migrated from original JIRA: https://issues.apache.org/jira/browse/ARROW-8260
This is an edge case but the query "SELECT 1 FROM t" causes an error in the Parquet reader because we are not reading any columns. We should have the query planner recognize this and fail the query is invalid.
Now that Ballista has dependencies on DataFusion in the same repo, the integration test scripts and Dockerfile need updating to add DataFusion source into the docker image.
We may want to add Ballista to the default workspace as part of this fix.
Note: migrated from original JIRA: https://issues.apache.org/jira/browse/ARROW-8464
Usecases: Efficiently process large columns of low cardinality Strings
Describe the bug
IN the announcement blog post there is a link to DataFusion that goes to the monorepo and we should have it point to this repo instead.
Is your feature request related to a problem or challenge? Please describe what you are trying to do.
I would like to be able to execute the following SQL queries with datafusion:
SELECT t1.id, t2.user FROM t1 JOIN t2 ON t1.id = t2.id
Describe the solution you'd like
Datafusion should be able to execute this query without error.
Note: migrated from original JIRA: https://issues.apache.org/jira/browse/ARROW-5170
See dockerfile: https://github.com/apache/arrow/pull/4147/files#diff-2233b99f64d6acbef4e9d964e29fa76bR18
Note: migrated from original JIRA: https://issues.apache.org/jira/browse/ARROW-4863
Once ARROW-4466 is merged, I would like to add support for reading parquet files that contain LIST and STRUCT.
Note: migrated from original JIRA: https://issues.apache.org/jira/browse/ARROW-6892
Currently we have code in the SQL query planner that wraps aggregate queries in a projection (if needed) to preserve the order of the final results. This is needed because the aggregate query execution always returns a result with grouping expressions first and then aggregate expressions.
It would be better (simpler, more readable code) to always wrap aggregates in projections and have an optimizer rule to remove redundant projections. There are likely other use cases where redundant projections might exist too.
Is your feature request related to a problem or challenge? Please describe what you are trying to do.
Add https://github.com/DSLAM-UMD/Squirtle to README where we list projects that depend on DataFusion
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.