Comments (10)
I think, combining consecutive
RepartitionExec(hash)
--RepartitionExec(round robin)
into
RepartitionExec(hash) (where inputs hashed in parallel)
produces much more readable plans. And I presume it would be better in terms of execution speed. Hence we should have this support. However, combining CoalesceBatchesExec
into RepartitionExec
may introduce complications as suggested by @ozankabak . I think, one of the causes of this problem is that CoalesceBatches
rule is to naive in its current state.
By refactoring CoalesceBatches
rule, we can produce better plans as is.
As an example, CoalesceBatchesExec
in the build side of the HashJoinExec
is unnecesary (given that HashJoinExec
already buffers up all data at its input) in the following plan.
ProjectionExec: expr=[name@1 as schoolname, name@3 as teachername]
CoalesceBatchesExec: target_batch_size=8192
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(id@0, class_id@0)]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([id@0], 8), input_partitions=8
RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
VirtualExecutionPlan
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([class_id@0], 8), input_partitions=8
RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
ProjectionExec: expr=[class_id@1 as class_id, name@2 as name]
VirtualExecutionPlan
However, this kind of analysis may require new API
s. We can also benefit from estimated selectivity in the Statistics
for the CoalesceBatchesExec
insertion decision.
Hence, I propose as a first step we should first do 2nd step in the @alamb s suggestion. In the meantime, I will explore how can we refactor CoalesceBatches
rule for better plans. I presume these 2 steps together would be sufficient for most of the use cases.
from arrow-datafusion.
We should definitely be careful when doing this, doing this gung-ho style has the potential to break many things. @mustafasrepo please watch any work on this carefully and discuss possible implications of proposals and guide any implementation efforts.
As a first step, can you please share your thoughts on @alamb's suggestion above? What are the possible implications of rolling batch-coalescing into a within-operator computation? One thing I can think of is that it turns any operator to a possibly delay introducing operator (where it wasn't otherwise), which could make DataFusion plans very hard to reason about in certain contexts. So maybe it is not a good idea after all -- but then maybe it is. We should be very careful in any case.
from arrow-datafusion.
@mustafasrepo also had some good thoughts from discord which I am copying here https://discord.com/channels/885562378132000778/1216380737410826270/1216684364805570560
"
I think, best approach would be Refactoring RepartitionExec such that it can parallelize hashing, when its input partition number is 1.
The point of
RepartitionExec(hash n_in=8, n_out=8)
--RepartitionExec(round robin n_in=1, n_out=8)
is to parallelizes hashing. Otherwise it is functionally equivalent to
RepartitionExec(hash n_in=1, n_out=8)
For this reason, if we can parallelize hashing algorithm when input partition number is 1. That would enable us to use plan above without worry.
I think, we primarily need to change implementation of RepartitionExec
. Once RepartitionExec
has this capacity, we need to update EnforceDistribution rule to generate plan Repartition(Hash n_in=1, n_out=8)
instead of Repartition(hash n_in=8, n_out=8
), Repartition(round robin n_in=1, n_out=8)
stack. However, this second stage is quite trivial. I am familiar with that part of the code.
"
from arrow-datafusion.
Hello @alamb, I'd love to work on this
from arrow-datafusion.
Hello @alamb, I'd love to work on this
Thanks @edmondop -- that would be great. I think this could get quite tricky if we are not careful so I would suggest taking it in phases.
Perhaps you can first try to remove CoalesceBatchesExec
by refactoring its code into a struct like
struct BatchCoalscer {
batches: Vec<RecordBatch>
target_batch_size: usize,
}
impl BatchCoalscer {
/// Buffers the specified record batch. If a more than `target_batch_size` rows are buffered,
/// clears the buffer and emits a RecordBatch with target_batch_size rows
fn push(&mut self, batch: RecordBatch) -> Option<RecordBatch>{ .. }
/// Completes this coalscer and emits any buffered rows
fn finish(mut self) -> Option<RecordBatch> { ... }
}
And then using that struct directly in RepartitionExec
and any other places that require CoalsceExec
from arrow-datafusion.
One thing I can think of is that it turns any operator to a possibly delay introducing operator (where it wasn't otherwise),
This is a good point. So if we introduced coalscing within the operator it may result in buffering that is not obvious from the plan. For streaming use cases this could be a substantial problem so perhaps we can not remove CoalesceBatchesExec
I think it would still be ok to extract BatchCoalscer
as suggested by refactoring (not yet removing) the code in CoalesceBatchesStream
, but maybe that wouldn't really improve things:
arrow-datafusion/datafusion/physical-plan/src/coalesce_batches.rs
Lines 176 to 280 in e622409
from arrow-datafusion.
@mustafasrepo just to confirm, you think the first step would be to have a single RepartitionExec in the section of the query plan where we have two, is that right?
RepartitionExec: partitioning=Hash([class_id@0], 8), input_partitions=8
RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
from arrow-datafusion.
Yes, we can have three modes of partitioning: Hash
, RoundRobin
and a new RoundRobinHash
which does both jointly. The RR
+ Hash
cascade does hashing in parallel, so you would need to be careful with the implementation to make sure you don't lose parallelism, but other than this it should be relatively straightforward.
from arrow-datafusion.
@mustafasrepo just to confirm, you think the first step would be to have a single RepartitionExec in the section of the query plan where we have two, is that right?
RepartitionExec: partitioning=Hash([class_id@0], 8), input_partitions=8 RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
Exactly.
from arrow-datafusion.
@edmondop asked me in slack, but I am posting here to make sure we are all on the same page. I believe the suggestion is:
- Change the execute method so it also supports the third method described by @ozankabak in #9370 (comment)
- Change the planner so that instead of making two
RepartitionExec
it makes a single one (that uses the third-method) as explained by #9370 (comment)
from arrow-datafusion.
Related Issues (20)
- Release DataFusion 37.0.0 HOT 19
- `to_parquet` with path not ending in a slash writes to a file instead of a directory since v36 HOT 2
- [EPIC] Tasks for a new Top Level Apache Project HOT 29
- Rename `common_runtime` directory to `common-runtime` for consistency with other crate directory names HOT 1
- Refine Serde for Scalar Function HOT 1
- Move lower, octet_length to datafusion-functions HOT 1
- Move btrim, ltrim and rtrim to datafusion-functions HOT 1
- Extract `range` and `gen_series` from `functions-array` subcrate' s `kernels` and `udf` containers HOT 1
- `array_union` and `array_intersect` cannot handle NULL columnar data HOT 1
- COPY TO fails on passing options for format through cli HOT 2
- Regression: Can no longer use `FORMAT PARQUET` in COPY command HOT 1
- Can not handle `'` characters in PARTITIONED BY clause HOT 3
- Regression: All formatting options in COPY commands require `format.` prefix, but did not in DataFusion 36.0.0 HOT 2
- Improve to_field performance by find field's data_type and nullable in dfschema in one time HOT 1
- CI fails with latest rustup updates.
- Complete support for `Expr --> String ` HOT 7
- Substrait serializer clippy error: not calling `truncate`
- adding Expr->String for IsTrue, IsFalse, IsUnknown
- Parallel parquet writer can panic when max_row_group_size < execution.batch_size HOT 4
- `DatafusionError` variant for parameter substitution during logical planning HOT 3
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from arrow-datafusion.