Comments (10)
It seems it is because of this in your code:
fn supports_filters_pushdown(
&self,
_filters: &[&datafusion::prelude::Expr],
) -> datafusion::error::Result<Vec<datafusion::logical_expr::TableProviderFilterPushDown>> {
Ok(vec![TableProviderFilterPushDown::Unsupported])
}
It is actually important to consider the _filters
argument as the function should return for each filter, whether it supports being pushed down or not. See the default implementation:
arrow-datafusion/datafusion/expr/src/table_source.rs
Lines 89 to 100 in 2a490e4
Because you returned a Vec with only a single element, later in the push_down_filter optimizer, when it is checking each filter, it seems it won't push down the first filter (id > 1
) but pushes down the second (id < 3
):
arrow-datafusion/datafusion/optimizer/src/push_down_filter.rs
Lines 857 to 865 in 2a490e4
- Since zip will only be as long as the shortest
Pushing down the second kinda pushes it into the void hence only id > 1
took effect.
I think we need to improve the documentation of supports_filters_pushdown(...)
to clarify the expected behaviour, to prevent footguns like this in the future. Not sure if we can do any checks in the push_down_filter
optimizer side, but worth investigating? 🤔
from arrow-datafusion.
I will try to create simple container, but not sure how fast it will be, maybe today.
from arrow-datafusion.
It seems it is because of this in your code:
Can confirm that commenting this function will make the test pass.
However I am still surprised at DataFusion's behavior:
The following is the generated physical plan:
CoalesceBatchesExec {
input: FilterExec {
predicate: BinaryExpr {
left: Column {
name: "id",
index: 0,
},
op: Gt,
right: Literal {
value: Int32(1),
},
},
input: RepartitionExec {
input: TestExecutionPlan {
schema: Schema {
fields: [
Field {
name: "id",
data_type: Int32,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "title",
data_type: Utf8,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
],
metadata: {},
},
},
partitioning: RoundRobinBatch(
32,
),
state: Mutex {
data: RepartitionExecState {
channels: {},
abort_helper: AbortOnDropMany(
[],
),
},
},
metrics: ExecutionPlanMetricsSet {
inner: Mutex {
data: MetricsSet {
metrics: [],
},
},
},
preserve_order: false,
},
metrics: ExecutionPlanMetricsSet {
inner: Mutex {
data: MetricsSet {
metrics: [],
},
},
},
default_selectivity: 20,
},
target_batch_size: 8192,
metrics: ExecutionPlanMetricsSet {
inner: Mutex {
data: MetricsSet {
metrics: [],
},
},
},
}
We can see that the first filter cannot be pushed down so there is a FilterExec
for it, but the second one, DataFusion didn't put it in the FilterExec
, and didn't pushed it down:
TestTableProvider::scan() filters: []
That incorrect implementation of TableProvider::supports_filters_pushdown()
seems to be an edge case that hasn't be covered by the code :D
from arrow-datafusion.
Glad it solved the issue for you. 👍
I'm gonna reopen this because I think we should at the very least enhance the documentation for supports_filters_pushdown
, and maybe even introduce a runtime check somewhere to catch this behaviour (e.g. in push_down_filter optimizer)
Edit: marking as good first issue as it should be relatively straightforward from the discussion here what needs to be done and where
from arrow-datafusion.
Hi! I was a newbie at datafusion. I want to have a try! : )
from arrow-datafusion.
This certainly sounds incorrect. Thank you for filing the issue @Denys-Bushulyak
Is there any chance you can provide a self contained reproducer?
from arrow-datafusion.
tests/select_and_test.rs
use std::{any::Any, fmt, sync::Arc};
use axum::async_trait;
use datafusion::{
arrow::{
array::{ArrayRef, Int32Array, RecordBatch, StringArray},
datatypes::{DataType, Field, Schema, SchemaRef},
util::pretty::pretty_format_batches,
},
catalog::{schema::SchemaProvider, CatalogProvider},
datasource::{TableProvider, TableType},
execution::context::{SessionContext, SessionState},
logical_expr::{Expr, TableProviderFilterPushDown},
physical_plan::{
stream::RecordBatchStreamAdapter, DisplayAs, DisplayFormatType, ExecutionPlan,
},
};
#[tokio::test]
async fn test_selection_with_and() {
let ctx = SessionContext::new();
ctx.register_catalog("test", Arc::new(TestCatalogProvider()));
let results = ctx
.sql("SELECT * FROM test.default.test_table WHERE id > 1 AND id < 3")
.await
.unwrap()
.collect()
.await
.unwrap();
assert_eq!(
&pretty_format_batches(&results).unwrap().to_string(),
"+----+-------+\n| id | title |\n+----+-------+\n| 2 | b |\n+----+-------+\n"
);
}
struct TestCatalogProvider();
impl CatalogProvider for TestCatalogProvider {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn schema_names(&self) -> Vec<String> {
vec!["default".to_string()]
}
fn schema(&self, _name: &str) -> Option<Arc<dyn SchemaProvider>> {
Some(Arc::new(TestSchemaProvider))
}
}
struct TestSchemaProvider;
#[async_trait]
impl SchemaProvider for TestSchemaProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn table_names(&self) -> Vec<String> {
vec!["test_table".to_string()]
}
async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
if name == "test_table" {
Some(Arc::new(TestTableProvider {
schema: Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("title", DataType::Utf8, false),
])),
}))
} else {
None
}
}
fn table_exist(&self, name: &str) -> bool {
name == "test_table"
}
}
struct TestTableProvider {
schema: SchemaRef,
}
#[async_trait]
impl TableProvider for TestTableProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn table_type(&self) -> TableType {
TableType::View
}
async fn scan(
&self,
_state: &SessionState,
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
// schema for execution plan depends on the projection,
// that's why we need to create a new schema
let schema = {
let fields = match projection {
Some(projection) => projection
.iter()
.map(|index| self.schema.field(*index).clone())
.collect(),
None => self.schema.fields().clone(),
};
Arc::new(Schema::new(fields))
};
Ok(Arc::new(TestExecutionPlan { schema }))
}
fn supports_filters_pushdown(
&self,
_filters: &[&datafusion::prelude::Expr],
) -> datafusion::error::Result<Vec<datafusion::logical_expr::TableProviderFilterPushDown>> {
Ok(vec![TableProviderFilterPushDown::Unsupported])
}
}
#[derive(Debug)]
struct TestExecutionPlan {
schema: SchemaRef,
}
impl DisplayAs for TestExecutionPlan {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "TestExecutionPlan")
}
}
impl ExecutionPlan for TestExecutionPlan {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn output_partitioning(&self) -> datafusion::physical_plan::Partitioning {
datafusion::physical_plan::Partitioning::UnknownPartitioning(1)
}
fn output_ordering(&self) -> Option<&[datafusion::physical_expr::PhysicalSortExpr]> {
None
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
Ok(self)
}
fn execute(
&self,
_partition: usize,
_context: Arc<datafusion::execution::context::TaskContext>,
) -> datafusion::error::Result<datafusion::physical_plan::SendableRecordBatchStream> {
let batch = RecordBatch::try_new(
self.schema.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(StringArray::from(vec![
"a".to_string(),
"b".to_string(),
"c".to_string(),
])) as ArrayRef,
],
);
let fut = futures::future::ready(batch.map_err(|e| e.into()));
let stream = futures::stream::once(fut);
Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema.clone(),
stream,
)))
}
}
from arrow-datafusion.
I have updated the title because it talks about "AND." Also, I am testing about "AND" too.
from arrow-datafusion.
@Jefffrey and @SteveLauC thank you both a lot for help!
from arrow-datafusion.
I'm gonna reopen this because I think we should at the very least enhance the documentation for supports_filters_pushdown, and maybe even introduce a runtime check somewhere to catch this behaviour (e.g. in push_down_filter optimizer)
Yes, I agree thank you @Jefffrey for your help debugging this issue as well as the suggestion on first good issues
To make this easier for others to help with, here are some specific suggestions:
-
Update the comment on
TableProvider::supports_filters_pushdown()
to say that the returned vector much have the same size as thefilters
argument.arrow-datafusion/datafusion/core/src/datasource/provider.rs
Lines 167 to 178 in a1ae158
-
Add a check at the callsite that the vec returned from
supports_filters_pushdown
is the same size as the filters passed:arrow-datafusion/datafusion/optimizer/src/push_down_filter.rs
Lines 856 to 859 in a1ae158
from arrow-datafusion.
Related Issues (20)
- `/benchmark` command seems not to work HOT 9
- Port `ArrayExcept` to `functions-array` subcrate HOT 1
- Port `ArrayRemove`, `ArrayRemoveN`, `ArrayRemoveAll` to `functions-array` subcrate HOT 1
- [EPIC] Stop copying `LogicalPlan` during OptimizerPasses HOT 46
- Simplify CI benchmark comparison development HOT 1
- Display number of row groups matched by statistics or bloom filters
- core/tests/parquet/row_group_pruning.rs is missing tests for string columns
- Add support for Bloom filters on binary columns in Parquet tables
- Add support for Statistics and Bloom filters on FixedSizeBinary columns in Parquet tables
- [question]when using the datafusion reading csv in rust project, it went wrong HOT 4
- Windows CI Intermittent Fail in Setup Rust Toolchain
- Partitioned object store lists all files on every query when using hive-partitioned parquet files HOT 2
- Break-down `functions-array` subcreate' s `kernels` and `udf` containers HOT 1
- Avoid cast numbers to string in filter HOT 4
- Add `DataFrame::with_column` HOT 10
- DataFusion weekly project plan (Andrew Lamb) - March 18, 2024 HOT 5
- Regression: incorrect result for aliased arguments in select list after TreeNode rewrite HOT 2
- Query hangs on collecting stream from recursive CTE HOT 4
- Release DataFusion 37.0.0 HOT 19
- `to_parquet` with path not ending in a slash writes to a file instead of a directory since v36 HOT 2
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from arrow-datafusion.