Code Monkey home page Code Monkey logo

Comments (10)

mustafasrepo avatar mustafasrepo commented on May 24, 2024 2

I think, combining consecutive


RepartitionExec(hash) 
--RepartitionExec(round robin) 

into


RepartitionExec(hash) (where inputs hashed in parallel)

produces much more readable plans. And I presume it would be better in terms of execution speed. Hence we should have this support. However, combining CoalesceBatchesExec into RepartitionExec may introduce complications as suggested by @ozankabak . I think, one of the causes of this problem is that CoalesceBatches rule is to naive in its current state.

By refactoring CoalesceBatches rule, we can produce better plans as is.
As an example, CoalesceBatchesExec in the build side of the HashJoinExec is unnecesary (given that HashJoinExec already buffers up all data at its input) in the following plan.

ProjectionExec: expr=[name@1 as schoolname, name@3 as teachername]
  CoalesceBatchesExec: target_batch_size=8192
    HashJoinExec: mode=Partitioned, join_type=Inner, on=[(id@0, class_id@0)]
      CoalesceBatchesExec: target_batch_size=8192
        RepartitionExec: partitioning=Hash([id@0], 8), input_partitions=8
          RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
            VirtualExecutionPlan
      CoalesceBatchesExec: target_batch_size=8192
        RepartitionExec: partitioning=Hash([class_id@0], 8), input_partitions=8
          RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
            ProjectionExec: expr=[class_id@1 as class_id, name@2 as name]
              VirtualExecutionPlan

However, this kind of analysis may require new APIs. We can also benefit from estimated selectivity in the Statistics for the CoalesceBatchesExec insertion decision.

Hence, I propose as a first step we should first do 2nd step in the @alamb s suggestion. In the meantime, I will explore how can we refactor CoalesceBatches rule for better plans. I presume these 2 steps together would be sufficient for most of the use cases.

from arrow-datafusion.

ozankabak avatar ozankabak commented on May 24, 2024 1

We should definitely be careful when doing this, doing this gung-ho style has the potential to break many things. @mustafasrepo please watch any work on this carefully and discuss possible implications of proposals and guide any implementation efforts.

As a first step, can you please share your thoughts on @alamb's suggestion above? What are the possible implications of rolling batch-coalescing into a within-operator computation? One thing I can think of is that it turns any operator to a possibly delay introducing operator (where it wasn't otherwise), which could make DataFusion plans very hard to reason about in certain contexts. So maybe it is not a good idea after all -- but then maybe it is. We should be very careful in any case.

from arrow-datafusion.

alamb avatar alamb commented on May 24, 2024 1

@mustafasrepo also had some good thoughts from discord which I am copying here https://discord.com/channels/885562378132000778/1216380737410826270/1216684364805570560

"

I think, best approach would be Refactoring RepartitionExec such that it can parallelize hashing, when its input partition number is 1.

The point of

RepartitionExec(hash n_in=8, n_out=8)
--RepartitionExec(round robin n_in=1, n_out=8)

is to parallelizes hashing. Otherwise it is functionally equivalent to

RepartitionExec(hash n_in=1, n_out=8)

For this reason, if we can parallelize hashing algorithm when input partition number is 1. That would enable us to use plan above without worry.
I think, we primarily need to change implementation of RepartitionExec. Once RepartitionExec has this capacity, we need to update EnforceDistribution rule to generate plan Repartition(Hash n_in=1, n_out=8) instead of Repartition(hash n_in=8, n_out=8), Repartition(round robin n_in=1, n_out=8) stack. However, this second stage is quite trivial. I am familiar with that part of the code.

"

from arrow-datafusion.

edmondop avatar edmondop commented on May 24, 2024

Hello @alamb, I'd love to work on this

from arrow-datafusion.

alamb avatar alamb commented on May 24, 2024

Hello @alamb, I'd love to work on this

Thanks @edmondop -- that would be great. I think this could get quite tricky if we are not careful so I would suggest taking it in phases.

Perhaps you can first try to remove CoalesceBatchesExec by refactoring its code into a struct like

struct BatchCoalscer {
  batches: Vec<RecordBatch> 
  target_batch_size: usize,
}

impl BatchCoalscer { 
  /// Buffers the specified record batch. If a more than `target_batch_size` rows are buffered, 
  /// clears the buffer and emits a RecordBatch with target_batch_size rows
 fn push(&mut self, batch: RecordBatch) -> Option<RecordBatch>{ .. }

 /// Completes this coalscer and emits any buffered rows
 fn finish(mut self) -> Option<RecordBatch> { ... }
}

And then using that struct directly in RepartitionExec and any other places that require CoalsceExec

from arrow-datafusion.

alamb avatar alamb commented on May 24, 2024

One thing I can think of is that it turns any operator to a possibly delay introducing operator (where it wasn't otherwise),

This is a good point. So if we introduced coalscing within the operator it may result in buffering that is not obvious from the plan. For streaming use cases this could be a substantial problem so perhaps we can not remove CoalesceBatchesExec

I think it would still be ok to extract BatchCoalscer as suggested by refactoring (not yet removing) the code in CoalesceBatchesStream, but maybe that wouldn't really improve things:

struct CoalesceBatchesStream {
/// The input plan
input: SendableRecordBatchStream,
/// The input schema
schema: SchemaRef,
/// Minimum number of rows for coalesces batches
target_batch_size: usize,
/// Buffered batches
buffer: Vec<RecordBatch>,
/// Buffered row count
buffered_rows: usize,
/// Whether the stream has finished returning all of its data or not
is_closed: bool,
/// Execution metrics
baseline_metrics: BaselineMetrics,
}
impl Stream for CoalesceBatchesStream {
type Item = Result<RecordBatch>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let poll = self.poll_next_inner(cx);
self.baseline_metrics.record_poll(poll)
}
fn size_hint(&self) -> (usize, Option<usize>) {
// we can't predict the size of incoming batches so re-use the size hint from the input
self.input.size_hint()
}
}
impl CoalesceBatchesStream {
fn poll_next_inner(
self: &mut Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<RecordBatch>>> {
// Get a clone (uses same underlying atomic) as self gets borrowed below
let cloned_time = self.baseline_metrics.elapsed_compute().clone();
if self.is_closed {
return Poll::Ready(None);
}
loop {
let input_batch = self.input.poll_next_unpin(cx);
// records time on drop
let _timer = cloned_time.timer();
match input_batch {
Poll::Ready(x) => match x {
Some(Ok(batch)) => {
if batch.num_rows() >= self.target_batch_size
&& self.buffer.is_empty()
{
return Poll::Ready(Some(Ok(batch)));
} else if batch.num_rows() == 0 {
// discard empty batches
} else {
// add to the buffered batches
self.buffered_rows += batch.num_rows();
self.buffer.push(batch);
// check to see if we have enough batches yet
if self.buffered_rows >= self.target_batch_size {
// combine the batches and return
let batch = concat_batches(
&self.schema,
&self.buffer,
self.buffered_rows,
)?;
// reset buffer state
self.buffer.clear();
self.buffered_rows = 0;
// return batch
return Poll::Ready(Some(Ok(batch)));
}
}
}
None => {
self.is_closed = true;
// we have reached the end of the input stream but there could still
// be buffered batches
if self.buffer.is_empty() {
return Poll::Ready(None);
} else {
// combine the batches and return
let batch = concat_batches(
&self.schema,
&self.buffer,
self.buffered_rows,
)?;
// reset buffer state
self.buffer.clear();
self.buffered_rows = 0;
// return batch
return Poll::Ready(Some(Ok(batch)));
}
}
other => return Poll::Ready(other),
},
Poll::Pending => return Poll::Pending,
}
}
}
}

from arrow-datafusion.

edmondop avatar edmondop commented on May 24, 2024

@mustafasrepo just to confirm, you think the first step would be to have a single RepartitionExec in the section of the query plan where we have two, is that right?

        RepartitionExec: partitioning=Hash([class_id@0], 8), input_partitions=8
          RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1

from arrow-datafusion.

ozankabak avatar ozankabak commented on May 24, 2024

Yes, we can have three modes of partitioning: Hash, RoundRobin and a new RoundRobinHash which does both jointly. The RR + Hash cascade does hashing in parallel, so you would need to be careful with the implementation to make sure you don't lose parallelism, but other than this it should be relatively straightforward.

from arrow-datafusion.

mustafasrepo avatar mustafasrepo commented on May 24, 2024

@mustafasrepo just to confirm, you think the first step would be to have a single RepartitionExec in the section of the query plan where we have two, is that right?

        RepartitionExec: partitioning=Hash([class_id@0], 8), input_partitions=8
          RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1

Exactly.

from arrow-datafusion.

alamb avatar alamb commented on May 24, 2024

@edmondop asked me in slack, but I am posting here to make sure we are all on the same page. I believe the suggestion is:

  1. Change the execute method so it also supports the third method described by @ozankabak in #9370 (comment)
  2. Change the planner so that instead of making two RepartitionExec it makes a single one (that uses the third-method) as explained by #9370 (comment)

from arrow-datafusion.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.