Code Monkey home page Code Monkey logo

Comments (6)

adriangb avatar adriangb commented on July 3, 2024 1

I do think that example would be nice, it's basically what I was trying to build 😄

My approach was going to be something like:

async fn scan(
    &self,
    state: &SessionState,
    projection: Option<&Vec<usize>>,
    filters: &[Expr],
    limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
    let object_store_url = ObjectStoreUrl::parse("file://")?;
    let mut file_scan_config = FileScanConfig::new(object_store_url, self.schema())
        .with_projection(projection.cloned())
        .with_limit(limit);

    // Use the index to get row groups to be scanned
    // Index does best effort to parse filters and push them down into the metadata store
    let partitioned_files_with_row_group_selection = self.index.get_files(filters).await?;

    for file in partitioned_files_with_row_group_selection {
         file_scan_config = file_scan_config.with_file(PartitionedFile::new(
            file.canonical_path.display().to_string(),
            file.file_size,
        ).with_extensions(Arc::new(file.access_plan())));
    }

    let df_schema = DFSchema::try_from(self.schema())?;
    // convert filters like [`a = 1`, `b = 2`] to a single filter like `a = 1 AND b = 2`
    let predicate = conjunction(filters.to_vec());
    let predicate = predicate
        .map(|predicate| state.create_physical_expr(predicate, &df_schema))
        .transpose()?
        .unwrap_or_else(|| datafusion_physical_expr::expressions::lit(true));

    let exec = ParquetExec::builder(file_scan_config)
        .with_predicate(predicate)
        .build_arc();

    Ok(exec)
}

(several functions and types made up)

Does this sound about in line with what you would think of as an example? I think implementing the async store as a familiar RDMS (SQLite via SQLx?) would be a good example.

from arrow-datafusion.

alamb avatar alamb commented on July 3, 2024

Update here is I have a basic example #10549 ready for review / merge

from arrow-datafusion.

adriangb avatar adriangb commented on July 3, 2024

Sorry for jumping in here, maybe this isn't the best issue but it's hard to keep up with all of the amazing work you're doing @alamb!

I wanted to pitch a use case I've been thinking about of storing a secondary index on a searchable async location. Think a relational database with ACID guarantees. In particular the key would be that hooks to do selections / pruning be async and that they pass in filters: I'd push down the filters into filters in the metadata store and run an actual query there that returns the files / row groups to scan. This is in contrast to #10549 for example where the index is in memory and fully materialized. I realize that TableProvider.scan already serves this purpose, but it'd be nice to integrate into these new APIs instead of having to implement more things oneself because you're hooking in at a higher (lower?) level.

from arrow-datafusion.

alamb avatar alamb commented on July 3, 2024

Sorry for jumping in here, maybe this isn't the best issue but it's hard to keep up with all of the amazing work you're doing @alamb!

Thanks @adriangb ❤️

I wanted to pitch a use case I've been thinking about of storing a secondary index on a searchable async location. Think a relational database with ACID guarantees. In particular the key would be that hooks to do selections / pruning be async and that they pass in filters: I'd push down the filters into filters in the metadata store and run an actual query there that returns the files / row groups to scan. This is in contrast to #10549 for example where the index is in memory and fully materialized.

Yes, I agree this is a very common usecase in modern database / data systems and one I hope will be easier to implement with some of these APIs (btw see #10813 for an even lower level API which I think brings this idea to its lowest leve.)

I realize that TableProvider.scan already serves this purpose, but it'd be nice to integrate into these new APIs instead of having to implement more things oneself because you're hooking in at a higher (lower?) level.

I agree that you could do an async call as part of TableProvider::scan to fetch the relevant information from the remote store. Specifically, here

async fn scan(
&self,
state: &SessionState,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let df_schema = DFSchema::try_from(self.schema())?;
// convert filters like [`a = 1`, `b = 2`] to a single filter like `a = 1 AND b = 2`
let predicate = conjunction(filters.to_vec());
let predicate = predicate
.map(|predicate| state.create_physical_expr(predicate, &df_schema))
.transpose()?
// if there are no filters, use a literal true to have a predicate
// that always evaluates to true we can pass to the index
.unwrap_or_else(|| datafusion_physical_expr::expressions::lit(true));
// Use the index to find the files that might have data that matches the
// predicate. Any file that can not have data that matches the predicate
// will not be returned.
let files = self.index.get_files(predicate.clone())?;
let object_store_url = ObjectStoreUrl::parse("file://")?;
let mut file_scan_config = FileScanConfig::new(object_store_url, self.schema())
.with_projection(projection.cloned())
.with_limit(limit);
// Transform to the format needed to pass to ParquetExec
// Create one file group per file (default to scanning them all in parallel)
for (file_name, file_size) in files {
let path = self.dir.join(file_name);
let canonical_path = fs::canonicalize(path)?;
file_scan_config = file_scan_config.with_file(PartitionedFile::new(
canonical_path.display().to_string(),
file_size,
));
}
let exec = ParquetExec::builder(file_scan_config)
.with_predicate(predicate)
.build_arc();

One thing that is still unclear in my mind is what other APIs we could offer to make it easier to implement an external index. Most of the the code in parquet_index.rs is to create the in memory index. Maybe we could create an example that shows how to implement a remote index 🤔

from arrow-datafusion.

alamb avatar alamb commented on July 3, 2024

Does this sound about in line with what you would think of as an example? I think implementing the async store as a familiar RDMS (SQLite via SQLx?) would be a good example.

Yes that is very much in line.

Using SQLite via sql-x would be cool, though I don't think we would want to add new dependencies into the core datafusion crates themselves.

I made a new repo in datafusion-contrib here https://github.com/datafusion-contrib/datafusion-async-parquet-index and invited you to be an admin, in case you want to do things there

from arrow-datafusion.

adriangb avatar adriangb commented on July 3, 2024

datafusion-contrib/datafusion-async-parquet-index#1 😃

from arrow-datafusion.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.