lightcopy / parquet-index Goto Github PK
View Code? Open in Web Editor NEWSpark SQL index for Parquet tables
License: Apache License 2.0
Spark SQL index for Parquet tables
License: Apache License 2.0
This issue is for updating index catalog and source strategy tests to make it easy to verify filter effect on partition selection, e.g. testing GreaterThan
filter on selecting only these partitions and not touching ones that do not have matched statistics, etc.
String and binary predicates for Parquet are disabled in Spark, we should include those observations when we build index, and need to modify statistics to show correct values for non-ascii strings.
I mean the SQL language in Spark SQL, rather than spark sql api.
For example:
Dataset sqlDF =
spark.sql("SELECT * FROM parquet.examples/src/main/resources/users.parquet
");
This is inverse of #26. Since index check is based on verifying that directory exists, it returns true, if directory already created, but when loading index, fails to look up _table_metadata
for Parquet index. It would be better to create _SUCCESS
file in root index directory to know whether or not folder is index directory.
Will try adding it next week.
Currently we have flag to enable bloom filters, we should update it to enable filter statistics and type of those filters, e.g. bloom
, dict
, etc. I am proposing these options:
spark.sql.index.parquet.filter.enabled
- whether or not filter statistics are enabled, default is true
spark.sql.index.parquet.filter.type
- filter statistics type, e.g. bloom
, dict
, and others, default is bloom
spark.sql.index.parquet.filter.eagerLoading
- whether or not load filters eagerly when we read index catalog (everything will be in memory), or lazily, only loading certain filters that are required to evaluate predicate (same as what we currently do)Fails when trying to index table with all nulls column, NullPointerException
when converting statistics.
It can be very slow to load filters for the first time. We can load them in parallel when incremental loading is selected. It does not apply for eager loading.
Add flag spark.sql.index.createIfNotExists
or something similar to support situation of creating index when first index scan is made for a table. Subsequent scans will use created index.
Currently bucketing is not supported, meaning that package does not take advantage of bucketing, so bucketed table would be processed and indexed like standard partitioned table. We should investigate if we can leverage that information in index.
Well, need to add cache, as it is in-memory index.
Currently ColumnFilterStatistics
does not enforce types, we should change to unify types for ColumnStatistics
and ColumnFilterStatistics
.
This one can be tricky, since we need to hack sql parser in Spark to support this feature. This will enable usage of JDBC in Spark.
Should add ParquetSource
, ParquetIndex
, and FileIndexSource
for generic file system implementation.
When indexing table with large partitions, bloom filters might OOM when data is >= 1,000,000
. We should restrict filters to store up to certain threshold, and increase percentage of false positives instead.
When running in Yarn mode, I get a NPE, same code works in local mode
`context.index.create.indexBy('driverId', 'timestamp').parquet('s3://archival-store/test/bench1day7/gps/packet_date=2018-03-25/')
Traceback (most recent call last):
File "", line 1, in
File "/home/anshuls/parquet-index/python/src/lightcopy/index.py", line 122, in parquet
self._createIndex(path)
File "/home/anshuls/parquet-index/python/src/lightcopy/index.py", line 103, in _createIndex
jcreate.createIndex(path)
File "/usr/local/spark/latest/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in call
File "/usr/local/spark/latest/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/usr/local/spark/latest/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o62.createIndex.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 36 in stage 1.0 failed 4 times, most recent failure: Lost task 36.3 in stage 1.0 (TID 116, ip-172-18-54-234.us-west-2.compute.internal, executor 1): java.lang.NullPointerException
at org.apache.hadoop.fs.FileSystem.getDefaultUri(FileSystem.java:232)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:223)
at org.apache.spark.sql.execution.datasources.parquet.ParquetStatisticsRDD.compute(ParquetStatisticsRDD.scala:122)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
at org.apache.spark.sql.execution.datasources.parquet.ParquetMetastoreSupport.createIndex(ParquetMetastoreSupport.scala:140)
at org.apache.spark.sql.execution.datasources.IndexedDataSource$$anonfun$createIndex$2.apply(IndexedDataSource.scala:108)
at org.apache.spark.sql.execution.datasources.IndexedDataSource$$anonfun$createIndex$2.apply(IndexedDataSource.scala:107)
at org.apache.spark.sql.execution.datasources.Metastore.create(Metastore.scala:162)
at org.apache.spark.sql.execution.datasources.IndexedDataSource.createIndex(IndexedDataSource.scala:107)
at org.apache.spark.sql.CreateIndexCommand.createIndex(DataFrameIndexManager.scala:225)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at org.apache.hadoop.fs.FileSystem.getDefaultUri(FileSystem.java:232)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:223)
at org.apache.spark.sql.execution.datasources.parquet.ParquetStatisticsRDD.compute(ParquetStatisticsRDD.scala:122)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
`
It looks like the hadoopConf is somehow Null, although I'm able to access it in my shell just fine
Currently with statistics patch we will have only column metadata, block metadata will be one per file, regardless how large file is. We should investigate if it is worth maintaining separate statistics for each block (e.g. every 1,000,000 records).
NULL as Title
Currently when building dictionary filters we have to keep it memory. This should spill to disk after certain threshold.
Improve column filter statistics for dictionary type.
Fails to create due to #9. Exception stack trace:
java.lang.IllegalArgumentException: requirement failed: Min é is greater than max a
at scala.Predef$.require(Predef.scala:224)
at org.apache.spark.sql.execution.datasources.parquet.ParquetStringStatistics.<init>(statistics.scala:171)
at org.apache.spark.sql.execution.datasources.parquet.ParquetStatisticsRDD$.convertStatistics(ParquetStatisticsRDD.scala:238)
at org.apache.spark.sql.execution.datasources.parquet.ParquetStatisticsRDD$$anonfun$5.apply(ParquetStatisticsRDD.scala:219)
at org.apache.spark.sql.execution.datasources.parquet.ParquetStatisticsRDD$$anonfun$5.apply(ParquetStatisticsRDD.scala:212)
Seq("a", "é").toDF("name").coalesce(1).write.parquet("temp/bad.parquet")
spark.read.parquet("temp/bad.parquet").where("name > 'a'").show
When trying the same query with index, exception above is thrown.
Seq("aa", "bé", "bb").toDF("name").coalesce(1).write.parquet("temp/bad2.parquet")
spark.read.parquet("temp/bad2.parquet").where("name > 'bb'").show
returns this result:
+----+
|name|
+----+
| bé|
+----+
spark.index.create.indexByAll.parquet("temp/bad2.parquet")
spark.index.parquet("temp/bad2.parquet").where("name > 'bb'").show
returns wrong result:
+----+
|name|
+----+
+----+
Expected to return boolean, throws exception instead in case when looking up non-existent directory:
scala> spark.index.exists.parquet("temp/abc")
17/01/02 19:49:01 INFO Metastore: Resolved metastore directory to file:/Users/sadikovi/Developer/parquet-index/index_metastore
17/01/02 19:49:01 INFO Metastore: Registered file system org.apache.hadoop.fs.LocalFileSystem@529b0328
17/01/02 19:49:01 INFO Metastore: Registered cache com.google.common.cache.LocalCache$LocalManualCache@5cf2ed6f
java.io.FileNotFoundException: File temp/abc does not exist
at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:537)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:750)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:527)
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:409)
at org.apache.spark.sql.execution.datasources.IndexedDataSource$.resolveTablePath(IndexedDataSource.scala:150)
at org.apache.spark.sql.execution.datasources.IndexedDataSource.tablePath$lzycompute(IndexedDataSource.scala:43)
at org.apache.spark.sql.execution.datasources.IndexedDataSource.tablePath(IndexedDataSource.scala:41)
at org.apache.spark.sql.execution.datasources.IndexedDataSource$$anonfun$existsIndex$1.apply(IndexedDataSource.scala:102)
at org.apache.spark.sql.execution.datasources.IndexedDataSource$$anonfun$existsIndex$1.apply(IndexedDataSource.scala:102)
at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
at org.apache.spark.sql.execution.datasources.IndexedDataSource.logInfo(IndexedDataSource.scala:32)
at org.apache.spark.sql.execution.datasources.IndexedDataSource.existsIndex(IndexedDataSource.scala:102)
at org.apache.spark.sql.ExistsIndexCommand.table(DataFrameIndexManager.scala:221)
at org.apache.spark.sql.ExistsIndexCommand.parquet(DataFrameIndexManager.scala:227)
... 50 elided
Currently we are using Spark Parquet reader, this issue is about investigating if we can extract data pages and index those including each page statistics. During scan we would select only those pages that match predicate and read data from them.
Questions:
When filtering by 2 columns, only one of them is indexed, you would see similar to this log:
col("col1") === new java.sql.Date(300000000L) || col("col2") === new java.sql.Timestamp(330000000000L)
scala> df.show
17/02/05 20:36:01 INFO IndexSourceStrategy: Pruning directories with:
17/02/05 20:36:01 INFO IndexSourceStrategy: Index columns List(col1#10)
17/02/05 20:36:01 INFO IndexSourceStrategy: Index set {col1#10}
17/02/05 20:36:01 INFO IndexSourceStrategy: Normalized filters List(((col1#10 = 3) || (col2#11 = 330000000000000)))
17/02/05 20:36:01 INFO IndexSourceStrategy: Applying index filters:
17/02/05 20:36:01 INFO IndexSourceStrategy: Post-Scan filters: ((col1#10 = 3) || (col2#11 = 330000000000000))
17/02/05 20:36:01 INFO IndexSourceStrategy: Output data schema: struct<col1: date, col2: timestamp, col3: string, col4: boolean ... 2 more fields>
17/02/05 20:36:01 INFO IndexSourceStrategy: Pushed filters: Or(EqualTo(col1,1970-01-04),EqualTo(col2,1980-06-16 22:40:00.0))
It does not apply index filters on line 17/02/05 20:36:01 INFO IndexSourceStrategy: Applying index filters:
. This should be documented in comments, it happens because there is no way to separate predicates for filtering, e.g. we would not know if col2 value exists, therefore need to scan entire table, so index is disabled in this case.
Convert MessageType
into StructType
for supported field types.
Parquet-index should load table metadata from metastore/table, and generally work with saveAsTable
and table
APIs.
This means that Source
should become part of internal catalog.
Also these methods should be revisited in API:
def fs: FileSystem
def metastorePath: String
def getFreshIndexDirectory(): Path
Querying table fails if one of the Parquet files is empty, because of the condition on non-empty blocks.
17/01/08 17:41:43 INFO IndexSourceStrategy: Applying index filters: IsNotNull(col1),EqualTo(col1,2)
java.lang.IllegalArgumentException: requirement failed: Parquet file status has empty blocks, required at least one block metadata
at scala.Predef$.require(Predef.scala:224)
at org.apache.spark.sql.execution.datasources.parquet.ParquetIndexCatalog.resolveSupported(ParquetIndexCatalog.scala:131)
at org.apache.spark.sql.execution.datasources.parquet.ParquetIndexCatalog$$anonfun$pruneIndexedPartitions$2$$anonfun$8.apply(ParquetIndexCatalog.scala:144)
at org.apache.spark.sql.execution.datasources.parquet.ParquetIndexCatalog$$anonfun$pruneIndexedPartitions$2$$anonfun$8.apply(ParquetIndexCatalog.scala:143)
We should replace dir
with Map[String, String]
of options, that has path
for file system, see line https://github.com/lightcopy/parquet-index/blob/master/src/main/scala/com/github/lightcopy/index/interfaces.scala#L119
Also replace getRoot
method with getIndexIdentifier
which for file system based index returns fully-qualified path, see line https://github.com/lightcopy/parquet-index/blob/master/src/main/scala/com/github/lightcopy/index/interfaces.scala#L43
I have tested parquet-index with spark-2.0.1 in local model for a long time:
driver: --master local[1]
spark.driver.memory 1g
the data total count is: 464946
test data: wget http://tsdata.bts.gov/PREZIP/On_Time_On_Time_Performance_2015_09.zip
i convert csv to parquet partition by year and month.
first query base on parquet file, and the time cost:
/Library/Java/JavaVirtualMachines/jdk8/bin/java -cp /Users/nathan/Tool/spark/spark2/conf/:/Users/nathan/Tool/spark/spark2/jars/*:/Users/nathan/Tool/hadoop/hadoop2/etc/hadoop/:/Users/nathan/Tool/hadoop/hadoop2/share/hadoop/common/lib/*:/Users/nathan/Tool/hadoop/hadoop2/share/hadoop/common/*:/Users/nathan/Tool/hadoop/hadoop2/share/hadoop/hdfs/:/Users/nathan/Tool/hadoop/hadoop2/share/hadoop/hdfs/lib/*:/Users/nathan/Tool/hadoop/hadoop2/share/hadoop/hdfs/*:/Users/nathan/Tool/hadoop/hadoop2/share/hadoop/yarn/lib/*:/Users/nathan/Tool/hadoop/hadoop2/share/hadoop/yarn/*:/Users/nathan/Tool/hadoop/hadoop2/share/hadoop/mapreduce/lib/*:/Users/nathan/Tool/hadoop/hadoop2/share/hadoop/mapreduce/*:/Users/nathan/Tool/hadoop/hadoop2/contrib/capacity-scheduler/*.jar -Xmx1g -server -Xms256m -XX:+UseG1GC -XX:+UseCompressedOops -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark org.apache.spark.deploy.SparkSubmit --master local[1] --class com.alipay.spark.App /Users/nathan/Tool/spark/spark2/spark-demo.jar sql
------------->>>>>/Users/nathan/Tool/spark/spark2/conf/config.properties
{csv=file:/Users/nathan/data/ontime/csv/*.csv, index=false, parquet=file:/Users/nathan/data/ontime/parq/}
sql>select count(1) from ontime where Quarter=3;
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
+--------+
|count(1)|
+--------+
| 464946|
+--------+
time spend: 1927ms
total 1 records
i restart the spark shell , and still first query with parquet-index, index key: Quarter
, the time cost is very fantastic:
nathan@/Users/nathan/Tool/spark/spark2➜ ./run.local.sh
/Library/Java/JavaVirtualMachines/jdk8/bin/java -cp /Users/nathan/Tool/spark/spark2/conf/:/Users/nathan/Tool/spark/spark2/jars/*:/Users/nathan/Tool/hadoop/hadoop2/etc/hadoop/:/Users/nathan/Tool/hadoop/hadoop2/share/hadoop/common/lib/*:/Users/nathan/Tool/hadoop/hadoop2/share/hadoop/common/*:/Users/nathan/Tool/hadoop/hadoop2/share/hadoop/hdfs/:/Users/nathan/Tool/hadoop/hadoop2/share/hadoop/hdfs/lib/*:/Users/nathan/Tool/hadoop/hadoop2/share/hadoop/hdfs/*:/Users/nathan/Tool/hadoop/hadoop2/share/hadoop/yarn/lib/*:/Users/nathan/Tool/hadoop/hadoop2/share/hadoop/yarn/*:/Users/nathan/Tool/hadoop/hadoop2/share/hadoop/mapreduce/lib/*:/Users/nathan/Tool/hadoop/hadoop2/share/hadoop/mapreduce/*:/Users/nathan/Tool/hadoop/hadoop2/contrib/capacity-scheduler/*.jar -Xmx1g -server -Xms256m -XX:+UseG1GC -XX:+UseCompressedOops -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark org.apache.spark.deploy.SparkSubmit --master local[1] --class com.alipay.spark.App /Users/nathan/Tool/spark/spark2/spark-demo.jar sql
------------->>>>>/Users/nathan/Tool/spark/spark2/conf/config.properties
{csv=file:/Users/nathan/data/ontime/csv/*.csv, index=true, parquet=file:/Users/nathan/data/ontime/parq/}
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
sql>select count(1) from ontime where Quarter=3;
+--------+
|count(1)|
+--------+
| 464946|
+--------+
time spend: 931ms
total 1 records
Currently all options are static, we need to make some options dynamic this would allow testing in spark-shell.
It throws "SparkException: Failed to merge incompatible data types LongType and StringType" while I create the index.
How does it come? And how to solve it?
Mainly need to remove Parquet logging when reading a file. Would be good to log only relevant classes in org.apache.spark.sql.
Currently filters are loaded lazily, which will still be the case in refactoring work for statistics. I think we should also consider eager filter loading, when everything is loaded at the time catalog is loaded and cached; possible issue might be when we add dictionary filters which can be take fair amount of space, and it would be better to load them on demand.
From time to time loading index fails with permissions error:
java.lang.IllegalStateException: Expected directory with rwxrw-rw-, found file:/Users/sadikovi/Developer/parquet-index/index_metastore(rwxr-xr-x)
at org.apache.spark.sql.execution.datasources.Metastore.validateMetastoreStatus(Metastore.scala:112)
at org.apache.spark.sql.execution.datasources.Metastore.resolveMetastore(Metastore.scala:93)
Currently we cache filter statistics and table metadata for each queried table. This issue is about caching query plan, so when we hit the same plan, we can yield result immediately without resolving filters at all.
Currently we only support equality predicate, we should also add range check and isNull
.
Add performance tests to compare with Parquet implementation or compare performance against releases. This should be run as part of CI to determine if there is a regression in performance.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.