Code Monkey home page Code Monkey logo

Comments (10)

Jefffrey avatar Jefffrey commented on May 28, 2024 3

It seems it is because of this in your code:

fn supports_filters_pushdown(
        &self,
        _filters: &[&datafusion::prelude::Expr],
    ) -> datafusion::error::Result<Vec<datafusion::logical_expr::TableProviderFilterPushDown>> {
        Ok(vec![TableProviderFilterPushDown::Unsupported])
    }

It is actually important to consider the _filters argument as the function should return for each filter, whether it supports being pushed down or not. See the default implementation:

/// Tests whether the table provider can make use of any or all filter expressions
/// to optimise data retrieval.
#[allow(deprecated)]
fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>> {
filters
.iter()
.map(|f| self.supports_filter_pushdown(f))
.collect()
}

Because you returned a Vec with only a single element, later in the push_down_filter optimizer, when it is checking each filter, it seems it won't push down the first filter (id > 1) but pushes down the second (id < 3):

let results = scan
.source
.supports_filters_pushdown(filter_predicates.as_slice())?;
let zip = filter_predicates.iter().zip(results);
let new_scan_filters = zip
.clone()
.filter(|(_, res)| res != &TableProviderFilterPushDown::Unsupported)
.map(|(pred, _)| *pred);

  • Since zip will only be as long as the shortest

Pushing down the second kinda pushes it into the void hence only id > 1 took effect.

I think we need to improve the documentation of supports_filters_pushdown(...) to clarify the expected behaviour, to prevent footguns like this in the future. Not sure if we can do any checks in the push_down_filter optimizer side, but worth investigating? 🤔

from arrow-datafusion.

Denys-Bushulyak avatar Denys-Bushulyak commented on May 28, 2024 1

I will try to create simple container, but not sure how fast it will be, maybe today.

from arrow-datafusion.

SteveLauC avatar SteveLauC commented on May 28, 2024 1

It seems it is because of this in your code:

Can confirm that commenting this function will make the test pass.


However I am still surprised at DataFusion's behavior:

The following is the generated physical plan:

CoalesceBatchesExec {
    input: FilterExec {
        predicate: BinaryExpr {
            left: Column {
                name: "id",
                index: 0,
            },
            op: Gt,
            right: Literal {
                value: Int32(1),
            },
        },
        input: RepartitionExec {
            input: TestExecutionPlan {
                schema: Schema {
                    fields: [
                        Field {
                            name: "id",
                            data_type: Int32,
                            nullable: false,
                            dict_id: 0,
                            dict_is_ordered: false,
                            metadata: {},
                        },
                        Field {
                            name: "title",
                            data_type: Utf8,
                            nullable: false,
                            dict_id: 0,
                            dict_is_ordered: false,
                            metadata: {},
                        },
                    ],
                    metadata: {},
                },
            },
            partitioning: RoundRobinBatch(
                32,
            ),
            state: Mutex {
                data: RepartitionExecState {
                    channels: {},
                    abort_helper: AbortOnDropMany(
                        [],
                    ),
                },
            },
            metrics: ExecutionPlanMetricsSet {
                inner: Mutex {
                    data: MetricsSet {
                        metrics: [],
                    },
                },
            },
            preserve_order: false,
        },
        metrics: ExecutionPlanMetricsSet {
            inner: Mutex {
                data: MetricsSet {
                    metrics: [],
                },
            },
        },
        default_selectivity: 20,
    },
    target_batch_size: 8192,
    metrics: ExecutionPlanMetricsSet {
        inner: Mutex {
            data: MetricsSet {
                metrics: [],
            },
        },
    },
}

We can see that the first filter cannot be pushed down so there is a FilterExec for it, but the second one, DataFusion didn't put it in the FilterExec, and didn't pushed it down:

TestTableProvider::scan() filters: []

That incorrect implementation of TableProvider::supports_filters_pushdown() seems to be an edge case that hasn't be covered by the code :D

from arrow-datafusion.

Jefffrey avatar Jefffrey commented on May 28, 2024 1

Glad it solved the issue for you. 👍

I'm gonna reopen this because I think we should at the very least enhance the documentation for supports_filters_pushdown, and maybe even introduce a runtime check somewhere to catch this behaviour (e.g. in push_down_filter optimizer)

Edit: marking as good first issue as it should be relatively straightforward from the discussion here what needs to be done and where

from arrow-datafusion.

colommar avatar colommar commented on May 28, 2024 1

Hi! I was a newbie at datafusion. I want to have a try! : )

from arrow-datafusion.

alamb avatar alamb commented on May 28, 2024

This certainly sounds incorrect. Thank you for filing the issue @Denys-Bushulyak

Is there any chance you can provide a self contained reproducer?

from arrow-datafusion.

Denys-Bushulyak avatar Denys-Bushulyak commented on May 28, 2024

tests/select_and_test.rs

use std::{any::Any, fmt, sync::Arc};
use axum::async_trait;
use datafusion::{
    arrow::{
        array::{ArrayRef, Int32Array, RecordBatch, StringArray},
        datatypes::{DataType, Field, Schema, SchemaRef},
        util::pretty::pretty_format_batches,
    },
    catalog::{schema::SchemaProvider, CatalogProvider},
    datasource::{TableProvider, TableType},
    execution::context::{SessionContext, SessionState},
    logical_expr::{Expr, TableProviderFilterPushDown},
    physical_plan::{
        stream::RecordBatchStreamAdapter, DisplayAs, DisplayFormatType, ExecutionPlan,
    },
};

#[tokio::test]
async fn test_selection_with_and() {
    let ctx = SessionContext::new();
    ctx.register_catalog("test", Arc::new(TestCatalogProvider()));
    let results = ctx
        .sql("SELECT * FROM test.default.test_table WHERE id > 1 AND id < 3")
        .await
        .unwrap()
        .collect()
        .await
        .unwrap();

    assert_eq!(
        &pretty_format_batches(&results).unwrap().to_string(),
        "+----+-------+\n| id | title |\n+----+-------+\n| 2  | b     |\n+----+-------+\n"
    );
}

struct TestCatalogProvider();
impl CatalogProvider for TestCatalogProvider {
    fn as_any(&self) -> &dyn std::any::Any {
        self
    }
    fn schema_names(&self) -> Vec<String> {
        vec!["default".to_string()]
    }
    fn schema(&self, _name: &str) -> Option<Arc<dyn SchemaProvider>> {
        Some(Arc::new(TestSchemaProvider))
    }
}

struct TestSchemaProvider;

#[async_trait]
impl SchemaProvider for TestSchemaProvider {
    fn as_any(&self) -> &dyn Any {
        self
    }
    fn table_names(&self) -> Vec<String> {
        vec!["test_table".to_string()]
    }
    async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
        if name == "test_table" {
            Some(Arc::new(TestTableProvider {
                schema: Arc::new(Schema::new(vec![
                    Field::new("id", DataType::Int32, false),
                    Field::new("title", DataType::Utf8, false),
                ])),
            }))
        } else {
            None
        }
    }
    fn table_exist(&self, name: &str) -> bool {
        name == "test_table"
    }
}

struct TestTableProvider {
    schema: SchemaRef,
}

#[async_trait]
impl TableProvider for TestTableProvider {
    fn as_any(&self) -> &dyn Any {
        self
    }
    fn schema(&self) -> SchemaRef {
        self.schema.clone()
    }
    fn table_type(&self) -> TableType {
        TableType::View
    }
    async fn scan(
        &self,
        _state: &SessionState,
        projection: Option<&Vec<usize>>,
        _filters: &[Expr],
        _limit: Option<usize>,
    ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
        // schema for execution plan depends on the projection,
        // that's why we need to create a new schema
        let schema = {
            let fields = match projection {
                Some(projection) => projection
                    .iter()
                    .map(|index| self.schema.field(*index).clone())
                    .collect(),
                None => self.schema.fields().clone(),
            };
            Arc::new(Schema::new(fields))
        };

        Ok(Arc::new(TestExecutionPlan { schema }))
    }

    fn supports_filters_pushdown(
        &self,
        _filters: &[&datafusion::prelude::Expr],
    ) -> datafusion::error::Result<Vec<datafusion::logical_expr::TableProviderFilterPushDown>> {
        Ok(vec![TableProviderFilterPushDown::Unsupported])
    }
}

#[derive(Debug)]
struct TestExecutionPlan {
    schema: SchemaRef,
}

impl DisplayAs for TestExecutionPlan {
    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "TestExecutionPlan")
    }
}

impl ExecutionPlan for TestExecutionPlan {
    fn as_any(&self) -> &dyn Any {
        self
    }
    fn schema(&self) -> SchemaRef {
        self.schema.clone()
    }
    fn output_partitioning(&self) -> datafusion::physical_plan::Partitioning {
        datafusion::physical_plan::Partitioning::UnknownPartitioning(1)
    }
    fn output_ordering(&self) -> Option<&[datafusion::physical_expr::PhysicalSortExpr]> {
        None
    }
    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
        vec![]
    }
    fn with_new_children(
        self: Arc<Self>,
        _children: Vec<Arc<dyn ExecutionPlan>>,
    ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
        Ok(self)
    }
    fn execute(
        &self,
        _partition: usize,
        _context: Arc<datafusion::execution::context::TaskContext>,
    ) -> datafusion::error::Result<datafusion::physical_plan::SendableRecordBatchStream> {
        let batch = RecordBatch::try_new(
            self.schema.clone(),
            vec![
                Arc::new(Int32Array::from(vec![1, 2, 3])),
                Arc::new(StringArray::from(vec![
                    "a".to_string(),
                    "b".to_string(),
                    "c".to_string(),
                ])) as ArrayRef,
            ],
        );
        let fut = futures::future::ready(batch.map_err(|e| e.into()));

        let stream = futures::stream::once(fut);

        Ok(Box::pin(RecordBatchStreamAdapter::new(
            self.schema.clone(),
            stream,
        )))
    }
}

from arrow-datafusion.

Denys-Bushulyak avatar Denys-Bushulyak commented on May 28, 2024

I have updated the title because it talks about "AND." Also, I am testing about "AND" too.

from arrow-datafusion.

Denys-Bushulyak avatar Denys-Bushulyak commented on May 28, 2024

@Jefffrey and @SteveLauC thank you both a lot for help!

from arrow-datafusion.

alamb avatar alamb commented on May 28, 2024

I'm gonna reopen this because I think we should at the very least enhance the documentation for supports_filters_pushdown, and maybe even introduce a runtime check somewhere to catch this behaviour (e.g. in push_down_filter optimizer)

Yes, I agree thank you @Jefffrey for your help debugging this issue as well as the suggestion on first good issues

To make this easier for others to help with, here are some specific suggestions:

  1. Update the comment on TableProvider::supports_filters_pushdown() to say that the returned vector much have the same size as the filters argument.

    /// Tests whether the table provider can make use of any or all filter expressions
    /// to optimise data retrieval.
    #[allow(deprecated)]
    fn supports_filters_pushdown(
    &self,
    filters: &[&Expr],
    ) -> Result<Vec<TableProviderFilterPushDown>> {
    filters
    .iter()
    .map(|f| self.supports_filter_pushdown(f))
    .collect()
    }

  2. Add a check at the callsite that the vec returned from supports_filters_pushdown is the same size as the filters passed:

    let filter_predicates = split_conjunction(&filter.predicate);
    let results = scan
    .source
    .supports_filters_pushdown(filter_predicates.as_slice())?;

from arrow-datafusion.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.