Code Monkey home page Code Monkey logo

qbeast-spark's People

Contributors

adricu8 avatar alexeiakimov avatar cdelfosse avatar cugni avatar eavilaes avatar fpj avatar jiaweihu08 avatar joancarles-qbeast avatar osopardo1 avatar victorcuevasv avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

qbeast-spark's Issues

NULL values on indexed columns support

What went wrong?

We tried to index a dataset from the TPC-DS, and two of the chosen columns for indexing had null values. We should add support to null values.

21/09/28 09:17:18 ERROR Executor: Exception in task 10.0 in stage 4.0 (TID 28)
org.apache.spark.sql.AnalysisException: Column to index contains null values. Please initialize them before indexing

How to reproduce?

  1. Code that triggered the bug, or steps to reproduce:
    Indexing notebook s3 public dataset by "ss_customer_sk", "ss_item_sk", "ss_sold_date_sk"
parquet_df.write.mode("overwrite").format("qbeast").option("columnsToIndex", "ss_customer_sk,ss_item_sk,ss_sold_date_sk").save(qbeast_table_path)
  1. Branch and commit id:
    Main on 15667c2

  2. Spark version:
    3.1.1

  3. Hadoop version:
    3.2.2

  4. Are you running Spark inside a container? Are you launching the app on a remote K8s cluster? Or are you just running the tests on a local computer?
    I am running Spark shell on a local computer.

  5. Stack trace:

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2253)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2202)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2201)
  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2201)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078)
  at scala.Option.foreach(Option.scala:407)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1078)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2440)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2382)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2371)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.spark.sql.AnalysisException: Column to index contains null values. Please initialize them before indexing
  at org.apache.spark.sql.AnalysisExceptionFactory$.create(AnalysisExceptionFactory.scala:36)
  at io.qbeast.spark.index.OTreeAlgorithmImpl.$anonfun$rowValuesToPoint$1(OTreeAlgorithm.scala:322)
  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
  at io.qbeast.spark.index.OTreeAlgorithmImpl.rowValuesToPoint(OTreeAlgorithm.scala:317)
  at io.qbeast.spark.index.OTreeAlgorithmImpl.$anonfun$index$3(OTreeAlgorithm.scala:233)
  at scala.collection.Iterator.foreach(Iterator.scala:941)
  at scala.collection.Iterator.foreach$(Iterator.scala:941)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
  at io.qbeast.spark.index.OTreeAlgorithmImpl.$anonfun$index$2(OTreeAlgorithm.scala:231)
  at org.apache.spark.sql.execution.MapPartitionsExec.$anonfun$doExecute$3(objects.scala:195)
  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
  at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
  at org.apache.spark.scheduler.Task.run(Task.scala:131)
  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)

Add support for Column-Level statistics

With the release of Delta v1.2.0 , they include support for data skipping using column statistics. That means that statistical information of the columns is gathered in order to perform a finer data skipping technique.

Aside from #98, this is another major improvement that is relevant to the qbeast-spark project. In this case, we should:

  1. Upgrade to the newest version of Delta.
  2. Solve compatibility problems.
  3. Understand what statistics are gathered and what are missing.
  4. Understand how those statistics are used for file/data skipping.
  5. Possibility: use generated columns to address the weight min/max. Would it make sense? What are the limitations?
  6. Implement new functionalities if needed.
  7. Add tests

Collect in OTreeAlgorithm takes a lot of memory and time

What went wrong?
It takes around 7 minutes to index 12.4GB of parquet with Qbeast using 2 numerical columns. The cluster has 8 workers with 4 executors each. One of the responsables is a collect on OTreeAlgorithm:

slowCollect

Hypothesis and know issue

Our current implementation for writing data is split into two stages

  1. Calculation of an estimated CubeWeight map.
    It is done by partition and requires a collect() in order to broadcast the variable and be able to put each row in a correspondent cube. Plus, aggregation of min/max of the columns indexed is required to know the space we are indexing.

  2. Reorganization of data and writing of multiple rows of the same cube in a single parquet file

    1. Repartition the data by cube and state. Minor penalization is equivalent to delta writing time.
    2. File I/O writing. All the records from a cube are written in a single parquet file, not allowing the parallelization of the write operation.

As documented by @alexeiakimov :

An attempt to index 1GB of data on a single node with qbeast-spark-nodep_2.12-nightly-d44ce4c.jar has shown that the following three jobs are the main contributors to the time:

  1. execute and collect() at QbeastWriter- 2.4 min. Method collect is called on the dataset after all the records are written to destination media

  2. min/max aggregation. Model - 1.8 min. Method agg is called on the dataset to compute min and max values for each dimension
  3. collect() on OTreeAlgorithm - 1.7 min. Method collect is called on the dataset to compute the cube weights.

The hypothesis is that in all three cases the traversing of the full dataset is responsible for the big latency. It is clear that in the ColumnInfo case our code has minimal impact, because it simply computes min and max for two columns, however, it has a latency of the same magnitude as QbeastWriter and OTreeAlgorithm, where we do a lot of computations.

How to reproduce?

  1. Code that triggered the bug, or steps to reproduce:
  • Relevant config for the SparkSession in the Azure Cluster:
            --packages org.apache.hadoop:hadoop-azure:2.7.0,com.azure:azure-storage-blob:12.8.0,com.azure:azure-storage-common:12.8.0,com.microsoft.azure:azure-storage:2.0.0,io.delta:delta-core_2.12:0.8.0 \
            --conf spark.executor.cores=3 \
            --conf spark.driver.memory=12g \
            --conf spark.executor.memory=12g \
            --conf spark.eventLog.enabled=true \
            --conf spark.sql.extensions=io.qbeast.spark.sql.QbeastSparkSessionExtension \
            --conf spark.driver.extraJavaOptions="-Dqbeast.index.size=2000000""

  • We load the table store_sales from the TPC-DS 100Gb dataset, which has a size of 12.4GB in delta format. We index it by using the primary key attributes and we write them on qbeast format:
val store_sales = spark.read.format("delta").load("wasb://[email protected]/data-1gb/store_sales")
val tmpDir = "wasb://[email protected]/qb_indexed_1gb/store_sales"
store_sales.write.mode("overwrite").format("qbeast").option("columnsToIndex", "ss_item_sk,ss_ticket_number").save(tmpDir)
  1. Branch and commit id:
    Main on 15667c2

  2. Spark version:
    3.1.1

  3. Hadoop version:
    2.7

  4. Are you running Spark inside a container? Are you launching the app on a remote K8s cluster? Or are you just running the tests in a local computer?
    Running Spark Shell from the Jupyter Notebook in the Kubernetes deployment on Azure

CI failing on repository Forks, due to GitHub Package Registry key missing on them

What went wrong?
When a fork of this project is synchronized, CI workflows always fail. The reason is that the CI tries to publish the artifacts, while the GitHub Package Registry (GHPR) key is only available in this repository.
You can see an example of failing workflow in my fork here: https://github.com/eavilaes/qbeast-spark/runs/4468737606?check_suite_focus=true
This can be annoying for all users who fork the project.

I suggest fixing this step from the CI, but I thought about different alternatives:

  1. Add a check in the publishing step, which verifies that the current repo is Qbeast-io/qbeast-spark. I think this can be done with ${{ github.repository }}
  2. Enable this step as workflow_dispatch-triggered; which will let the workflow to be executed manually.
  3. Publish the artifact manually, apart from the CI.

How to reproduce?

  1. Code that triggered the bug, or steps to reproduce:
    N/A

  2. Branch and commit id:
    main, d9bd04a

  3. Spark version:
    N/A

  4. Hadoop version:
    N/A

  5. Are you running Spark inside a container? Are you launching the app on a remote K8s cluster? Or are you just running the tests in a local computer?
    N/A

  6. Stack trace:
    N/A

CubeWeights not properly used

What went wrong?
The update method from CubeWeights duplicates its content each time the queue exceeds the bufferSize

Basically the behavior is:

def update(...){
    ...
    if (queue.size >= bufferSize) {
        // resultBuffer ++= resultInternal() ++ resultBuffer
        resultBuffer ++= result()
        ...
    }
}

def result(): Seq[CubeNormalizedWeight] = {
    resultInternal() ++ resultBuffer
}

So aside from adding the elements from resultInternal(), each time queue is full, the resultBuffer also duplicates its content.

This is obviously going to affect the performance of the OTreeDataAnalyzer, requiring a much large memory space to index than it should. For example, indexing a store_sales table of 1.8GB, the current implementation won't be able to pass the OTreeDataAnalyzer phase.

How to reproduce?

  1. Code that triggered the bug, or steps to reproduce:

  2. Branch and commit id:
    main, 16735d6

  3. Spark version:
    3.1.1

  4. Hadoop version:
    3.2

  5. Are you running Spark inside a container? Are you launching the app on a remote K8s cluster? Or are you just running the tests in a local computer?
    Local mode

  6. Stack trace:

Indexing categorical types

One important missing feature is to be able to index strings with discrete values (e.g. cities, products_id or name). If the data has a relatively high cardinality, but it is not ordinal, ( e.g. Torino > Milano feels right but it is not meaningful) we can simply use a hash function. The problems are:

  • Define the formats that we support. Not only by type but also any other characteristics of the data that we should preserve and consider (like cardinality).
  • How do we define the columns that are cardinal/nominal?
  • Does this change require modifying the CubeID/CubeIterator? Right now when we have a number, we map to (0,1), and then we multiply it for a factor and we round it (more or less). However, with hashed value, this is no longer necessary. ( right? @alexeiakimov ?)

On the first problem, I can propose an alternative. Either we can define the types ( Nominal/categorical, ordinal, ratio/percentage, etc etc. ) and then we define how to manage them, or we allow the user to define how to transform this value into an indexable column using a specific transformation.

The first case would look like:

parquetDf.write
    .mode("overwrite")
    .format("qbeast")   
    .option("columnsToIndex", "ss_cdemo_sk:ordinal,ss_cdemo_sk:ordinal,city:nominal")     
    .save(qbeastTablePath)

Alternatively, we can allow the user to specify which implementation of the Transformation trait to use for every specific class. (By default, it should be LinearTransformation)

parquetDf.write
    .mode("overwrite")
    .format("qbeast")   
    .option("columnsToIndex", "ss_cdemo_sk:Linear,ss_cdemo_sk:Gaussian,city:Hash")     
    .save(qbeastTablePath)

It would be also possible to define something in the middle, using different symbols (e.g. city@Hash or city/nominal), but it might get too complex to be practical.

Implement data format versioning

What went wrong?
During the development and cleaning of the software, we missed having a version control on each of the implementations.
Without this track, in the main branch, we have two different ways of writing the metadata with no possible hint to differentiate between the releases.

For example, one dataset written before 7eb77dd would contain this information:

{
  "add" : {
    "..." : {},
    "tags" : {
      "cube" : "A",
      "indexedColumns" : "ss_sales_price,ss_ticket_number",
      "maxWeight" : "462168771",
      "minWeight" : "-2147483648",
      "rowCount" : "508765",
      "space" : "{\"timestamp\":1631692406506,\"transformations\":[{\"min\":-99.76,\"max\":299.28000000000003,\"scale\":0.0025060144346431435},{\"min\":-119998.5,\"max\":359999.5,\"scale\":2.083342013925058E-6}]}",
      "state" : "FLOODED"
    }
  }
}

While the new ones have a different structures:

{
  "add": {
    "path": "4b36340e-0bf7-44ec-97da-f7a03bf06ea3.parquet",
    ...
    "tags": {
      "state": "FLOODED",
      "rowCount": "3",
      "cube": "gA",
      "revision": "1634196697656",
      "minWeight": "-2147483648",
      "maxWeight": "-857060062"
    }
  }
}

We should: [EDITED]

  • Add the File Format Version in the Delta Log. Version should be an increasing number starting from 1
  • Check if we are reading the right version when loading a new DataFrame
  • Update the docs, listing the versions and tagging the latest version of the code that can read it.

'Table implementation does not support writes' while calling 'saveAsTable' from DataFrameWriter using qbeast

What went wrong?
Table implementation does not support writes while attempting to save a dataframe to the metastore_db using the saveAsTable method.

How to reproduce?

val spark = SparkSession
      .builder()
      .master("local[*]")
      .appName("QbeastDataSource")
      .withExtensions(new QbeastSparkSessionExtension())
      .getOrCreate()

val parquet_path = "src/test/resources/ecommerce100K_2019_Oct.csv"

val df = spark.read
      .format("csv")
      .option("header", true)
      .option("inferSchema", true)
      .load(parquet_path)

df.write
      .format("qbeast")
      .option("columnsToIndex", "product_id,category_id,price,user_id")
      .saveAsTable("default.qbeast_table")
  1. Branch and commit id:
    Fork of qbeast-spark from @osopardo1, branch 28-dimension-filtering, commit id 4d642af

  2. Spark version:
    Spark 3.1.1

  3. Hadoop version:
    Hadoop 3.2.0

  4. Are you running Spark inside a container? Are you launching the app on a remote K8s cluster? Or are you just running the tests in a local computer?
    Standalone mode

  5. Stack trace:

21/11/16 10:50:18 ERROR Utils: Aborting task
org.apache.spark.SparkException: Table implementation does not support writes: default.qbeast_table
	at org.apache.spark.sql.execution.datasources.v2.TableWriteExecHelper.$anonfun$writeToTable$1(WriteToDataSourceV2Exec.scala:489)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
	at org.apache.spark.sql.execution.datasources.v2.TableWriteExecHelper.writeToTable(WriteToDataSourceV2Exec.scala:465)
	at org.apache.spark.sql.execution.datasources.v2.TableWriteExecHelper.writeToTable$(WriteToDataSourceV2Exec.scala:460)
	at org.apache.spark.sql.execution.datasources.v2.CreateTableAsSelectExec.writeToTable(WriteToDataSourceV2Exec.scala:62)
	at org.apache.spark.sql.execution.datasources.v2.CreateTableAsSelectExec.run(WriteToDataSourceV2Exec.scala:84)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:40)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:40)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.doExecute(V2CommandExec.scala:55)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
	at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:686)
	at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:623)
	at io.qbeast.spark.sql.sources.saveAsTableTest.$anonfun$new$1(saveAsTableTest.scala:49)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
	at org.scalatest.Transformer.apply(Transformer.scala:20)
	at org.scalatest.flatspec.AnyFlatSpecLike$$anon$5.apply(AnyFlatSpecLike.scala:1684)
	at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
	at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
	at org.scalatest.flatspec.AnyFlatSpec.withFixture(AnyFlatSpec.scala:1685)
	at org.scalatest.flatspec.AnyFlatSpecLike.invokeWithFixture$1(AnyFlatSpecLike.scala:1682)
	at org.scalatest.flatspec.AnyFlatSpecLike.$anonfun$runTest$1(AnyFlatSpecLike.scala:1694)
	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
	at org.scalatest.flatspec.AnyFlatSpecLike.runTest(AnyFlatSpecLike.scala:1694)
	at org.scalatest.flatspec.AnyFlatSpecLike.runTest$(AnyFlatSpecLike.scala:1676)
	at org.scalatest.flatspec.AnyFlatSpec.runTest(AnyFlatSpec.scala:1685)
	at org.scalatest.flatspec.AnyFlatSpecLike.$anonfun$runTests$1(AnyFlatSpecLike.scala:1752)
	at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
	at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
	at org.scalatest.flatspec.AnyFlatSpecLike.runTests(AnyFlatSpecLike.scala:1752)
	at org.scalatest.flatspec.AnyFlatSpecLike.runTests$(AnyFlatSpecLike.scala:1751)
	at org.scalatest.flatspec.AnyFlatSpec.runTests(AnyFlatSpec.scala:1685)
	at org.scalatest.Suite.run(Suite.scala:1112)
	at org.scalatest.Suite.run$(Suite.scala:1094)
	at org.scalatest.flatspec.AnyFlatSpec.org$scalatest$flatspec$AnyFlatSpecLike$$super$run(AnyFlatSpec.scala:1685)
	at org.scalatest.flatspec.AnyFlatSpecLike.$anonfun$run$1(AnyFlatSpecLike.scala:1797)
	at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
	at org.scalatest.flatspec.AnyFlatSpecLike.run(AnyFlatSpecLike.scala:1797)
	at org.scalatest.flatspec.AnyFlatSpecLike.run$(AnyFlatSpecLike.scala:1795)
	at org.scalatest.flatspec.AnyFlatSpec.run(AnyFlatSpec.scala:1685)
	at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1320)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1314)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1314)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:993)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:971)
	at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1480)
	at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:971)
	at org.scalatest.tools.Runner$.run(Runner.scala:798)
	at org.scalatest.tools.Runner.run(Runner.scala)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:38)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:25)


Table implementation does not support writes: default.qbeast_table
org.apache.spark.SparkException: Table implementation does not support writes: default.qbeast_table
	at org.apache.spark.sql.execution.datasources.v2.TableWriteExecHelper.$anonfun$writeToTable$1(WriteToDataSourceV2Exec.scala:489)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
	at org.apache.spark.sql.execution.datasources.v2.TableWriteExecHelper.writeToTable(WriteToDataSourceV2Exec.scala:465)
	at org.apache.spark.sql.execution.datasources.v2.TableWriteExecHelper.writeToTable$(WriteToDataSourceV2Exec.scala:460)
	at org.apache.spark.sql.execution.datasources.v2.CreateTableAsSelectExec.writeToTable(WriteToDataSourceV2Exec.scala:62)
	at org.apache.spark.sql.execution.datasources.v2.CreateTableAsSelectExec.run(WriteToDataSourceV2Exec.scala:84)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:40)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:40)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.doExecute(V2CommandExec.scala:55)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
	at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:686)
	at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:623)
	at io.qbeast.spark.sql.sources.saveAsTableTest.$anonfun$new$1(saveAsTableTest.scala:49)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
	at org.scalatest.Transformer.apply(Transformer.scala:20)
	at org.scalatest.flatspec.AnyFlatSpecLike$$anon$5.apply(AnyFlatSpecLike.scala:1684)
	at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
	at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
	at org.scalatest.flatspec.AnyFlatSpec.withFixture(AnyFlatSpec.scala:1685)
	at org.scalatest.flatspec.AnyFlatSpecLike.invokeWithFixture$1(AnyFlatSpecLike.scala:1682)
	at org.scalatest.flatspec.AnyFlatSpecLike.$anonfun$runTest$1(AnyFlatSpecLike.scala:1694)
	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
	at org.scalatest.flatspec.AnyFlatSpecLike.runTest(AnyFlatSpecLike.scala:1694)
	at org.scalatest.flatspec.AnyFlatSpecLike.runTest$(AnyFlatSpecLike.scala:1676)
	at org.scalatest.flatspec.AnyFlatSpec.runTest(AnyFlatSpec.scala:1685)
	at org.scalatest.flatspec.AnyFlatSpecLike.$anonfun$runTests$1(AnyFlatSpecLike.scala:1752)
	at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
	at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
	at org.scalatest.flatspec.AnyFlatSpecLike.runTests(AnyFlatSpecLike.scala:1752)
	at org.scalatest.flatspec.AnyFlatSpecLike.runTests$(AnyFlatSpecLike.scala:1751)
	at org.scalatest.flatspec.AnyFlatSpec.runTests(AnyFlatSpec.scala:1685)
	at org.scalatest.Suite.run(Suite.scala:1112)
	at org.scalatest.Suite.run$(Suite.scala:1094)
	at org.scalatest.flatspec.AnyFlatSpec.org$scalatest$flatspec$AnyFlatSpecLike$$super$run(AnyFlatSpec.scala:1685)
	at org.scalatest.flatspec.AnyFlatSpecLike.$anonfun$run$1(AnyFlatSpecLike.scala:1797)
	at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
	at org.scalatest.flatspec.AnyFlatSpecLike.run(AnyFlatSpecLike.scala:1797)
	at org.scalatest.flatspec.AnyFlatSpecLike.run$(AnyFlatSpecLike.scala:1795)
	at org.scalatest.flatspec.AnyFlatSpec.run(AnyFlatSpec.scala:1685)
	at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1320)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1314)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1314)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:993)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:971)
	at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1480)
	at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:971)
	at org.scalatest.tools.Runner$.run(Runner.scala:798)
	at org.scalatest.tools.Runner.run(Runner.scala)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:38)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:25)

Cast from List to Seq in QbeastDataSource.scala throws Exception

What went wrong?

When reading with PySpark the Github Archive dataset a class cast exception is thrown. The strange thing is that if we do the exact same thing from the Spark shell, it actually works. Also, if we run the code as local[*] (all happens in the Driver), the code works.

java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.sql.catalyst.expressions.objects.NewInstance.arguments of type scala.collection.Seq in instance of org.apache.spark.sql.catalyst.expressions.objects.NewInstance

How to reproduce?

  1. Code that triggered the bug, or steps to reproduce:
#  Build the Qbeast-spark
git checkout 26f49abaed89e964f8e598efbeda581ded03912c
sbt clean assembly -Dspark.version=3.1.1 -Dhadoop.version=3.2.0

# Download Spark and launch a Master and a Worker :
wget https://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
tar -xf spark-3.1.1-bin-hadoop3.2.tgz
cd spark-3.1.1-bin-hadoop3.2
sbin/start-all.sh
# Run using PySpark
from pyspark.sql import SparkSession 
deps = 'io.delta:delta-core_2.12:1.0.0,org.apache.hadoop:hadoop-aws:3.2.0'

spark = (SparkSession.builder
         .master("spark://localhost:7077")
         .config("spark.jars", "./target/scala-2.12/qbeast-spark-assembly-0.1.0.jar")
         .config("spark.sql.extensions", "io.qbeast.spark.sql.QbeastSparkSessionExtension")
         .config("spark.jars.packages", deps)
         .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")
         .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
         .getOrCreate())

gh_df = spark.read.format('qbeast').load("s3a://qbeast-public-datasets-eu/2days/qbeast-indexed/events")
  1. Branch and commit id:
    Main on 26f49ab

  2. Spark version:
    3.1.1

  3. Hadoop version:
    3.2.0

  4. Are you running Spark inside a container? Are you launching the app on a remote K8s cluster? Or are you just running the tests in a local computer?
    Happens both in K8s and standalone with 1 worker and 1 master.

  5. Stack trace:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-1-894fb624f13c> in <module>
     11          .getOrCreate())
     12 
---> 13 gh_df = spark.read.format('qbeast').load("s3a://qbeast-public-datasets-eu/2days/qbeast-indexed/events")

~/projects/spark-3.1.1-bin-hadoop3.2/python/pyspark/sql/readwriter.py in load(self, path, format, schema, **options)
    202         self.options(**options)
    203         if isinstance(path, str):
--> 204             return self._df(self._jreader.load(path))
    205         elif path is not None:
    206             if type(path) != list:

~/.local/lib/python3.9/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
   1306 

~/projects/spark-3.1.1-bin-hadoop3.2/python/pyspark/sql/utils.py in deco(*a, **kw)
    109     def deco(*a, **kw):
    110         try:
--> 111             return f(*a, **kw)
    112         except py4j.protocol.Py4JJavaError as e:
    113             converted = convert_exception(e.java_exception)

~/.local/lib/python3.9/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    324             value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325             if answer[1] == REFERENCE_TYPE:
--> 326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
    328                     format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling o49.load.
: java.util.concurrent.ExecutionException: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3) (192.168.1.50 executor 0): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.sql.catalyst.expressions.objects.NewInstance.arguments of type scala.collection.Seq in instance of org.apache.spark.sql.catalyst.expressions.objects.NewInstance
	at java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2205)
	at java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2168)
	at java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1422)
	at java.base/java.io.ObjectInputStream.defaultCheckFieldValues(ObjectInputStream.java:2480)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2387)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2464)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2358)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
	at java.base/java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:2419)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2368)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2464)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2358)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2464)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2358)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
	at java.base/java.io.ObjectInputStream.readArray(ObjectInputStream.java:2102)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2464)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2358)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
	at java.base/java.io.ObjectInputStream.readArray(ObjectInputStream.java:2102)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2464)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2358)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2464)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2358)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2464)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2358)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:493)
	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:451)
	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
	at jdk.internal.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at java.base/java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1175)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2325)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2464)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2358)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2464)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2358)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:493)
	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:451)
	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
	at jdk.internal.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at java.base/java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1175)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2325)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2464)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2358)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2464)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2358)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:493)
	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:451)
	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
	at jdk.internal.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at java.base/java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1175)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2325)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2464)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2358)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2464)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2358)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:493)
	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:451)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:85)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
	at com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)
	at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
	at com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
	at com.google.common.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410)
	at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2380)
	at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2257)
	at com.google.common.cache.LocalCache.get(LocalCache.java:4000)
	at com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4789)
	at org.apache.spark.sql.delta.DeltaLog$.apply(DeltaLog.scala:464)
	at org.apache.spark.sql.delta.DeltaLog$.forTable(DeltaLog.scala:391)
	at io.qbeast.spark.table.IndexedTableImpl.deltaLog(IndexedTable.scala:157)
	at io.qbeast.spark.table.IndexedTableImpl.snapshot(IndexedTable.scala:150)
	at io.qbeast.spark.table.IndexedTableImpl.exists(IndexedTable.scala:127)
	at io.qbeast.spark.sql.sources.QbeastDataSource.createRelation(QbeastDataSource.scala:94)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:354)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:326)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$1(DataFrameReader.scala:306)
	at scala.Option.map(Option.scala:230)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:266)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:240)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)

Execution time overhead when reading `qbeast` indexed data (not using sampling)

What went wrong?

I have tested three queries from TPC-DS v2.4, concretely queries 3, 7 and 15 on a 100Gb dataset (size on disk is quite less, because of parquet's compression). I found that there's an overhead on execution time when the data is written in qbeast format compared to delta, which you can see below.

The queries I have used are:

Query 3
 SELECT dt.d_year, item.i_brand_id brand_id, item.i_brand brand,SUM(ss_ext_sales_price) sum_agg
 FROM  date_dim dt, store_sales, item
 WHERE dt.d_date_sk = store_sales.ss_sold_date_sk
   AND store_sales.ss_item_sk = item.i_item_sk
   AND item.i_manufact_id = 128
   AND dt.d_moy=11
 GROUP BY dt.d_year, item.i_brand, item.i_brand_id
 ORDER BY dt.d_year, sum_agg desc, brand_id
 LIMIT 100
Query 7
 SELECT i_item_id,
        avg(ss_quantity) agg1,
        avg(ss_list_price) agg2,
        avg(ss_coupon_amt) agg3,
        avg(ss_sales_price) agg4
 FROM store_sales, customer_demographics, date_dim, item, promotion
 WHERE ss_sold_date_sk = d_date_sk AND
       ss_item_sk = i_item_sk AND
       ss_cdemo_sk = cd_demo_sk AND
       ss_promo_sk = p_promo_sk AND
       cd_gender = 'M' AND
       cd_marital_status = 'S' AND
       cd_education_status = 'College' AND
       (p_channel_email = 'N' or p_channel_event = 'N') AND
       d_year = 2000
 GROUP BY i_item_id
 ORDER BY i_item_id LIMIT 100
Query 15
 select ca_zip, SUM(cs_sales_price) sum_agg
 from catalog_sales, customer, customer_address, date_dim
 where cs_bill_customer_sk = c_customer_sk
 	and c_current_addr_sk = ca_address_sk
 	and ( substr(ca_zip,1,5) in ('85669', '86197','88274','83405','86475',
                                   '85392', '85460', '80348', '81792')
 	      or ca_state in ('CA','WA','GA')
 	      or cs_sales_price > 500)
 	and cs_sold_date_sk = d_date_sk
 	and d_qoy = 2 and d_year = 2001
 group by ca_zip
 order by ca_zip
 limit 100

I performed three different executions on the data. Each has iterated every query 10 times, to calculate an average time per query. The execution time for each execution and query follows.
As you can see on the following table, there's an increase in the time when the data is written in qbeast format. Using the average time, I have calculated the overhead percentage (you can find more details below the table):

Query delta format, read in delta qbeast format, read in delta qbeast format, read in qbeast
3 7.822s. 13.533s. (173,01%) 20.635s. (263,80%)
7 18.839s. 24.319s. (129.09%) 35.546s. (188,68%)
15 7.201s. 16.385s. (227,53%) 24.620s. (341,89%)

For more detailed values, I included maximum and minimum values for each execution:

Detailed values (AVG, MAX and MIN) for each execution

Data written in delta format, read in delta format

Query AVG MAX MIN
q3 7.822s. 10.579s. 7.203s.
q7 18.839s. 21.281s. 17.253s.
q15 7.201s. 9.448s. 6.344s.

Data written in qbeast format (index using PK), read in delta format

Query AVG MAX MIN
q3 13.533s. 17.612s. 12.467s.
q7 24.319s. 29.949s. 22.478s.
q15 16.385s. 21.824s. 14.882s.

Data written in qbeast format (index using PK), read in qbeast format

Query AVG MAX MIN
q3 20.635s. 31.006s. 18.756s.
q7 35.546s. 43.923s. 32.965s.
q15 24.620s. 28.866s. 22.850s.

How to reproduce?

  1. Code that triggered the bug, or steps to reproduce:
    I ran the mentioned queries using databricks/spark-sql-perf. The times provided in the tables correspond to the output of the mentioned application.

  2. Branch and commit id:
    main, on commit 15667c2

  3. Spark version:
    3.1.1

  4. Hadoop version:
    2.7.4

  5. Are you running Spark inside a container? Are you launching the app on a remote K8s cluster? Or are you just running the tests in a local computer?
    I'm running Spark in a remote K8s cluster, with 9 nodes, 8 spark-workers. Each node has 4 cores (3 for the executors) and 16Gb (12 for executors) of memory.

  6. Stack trace:
    N/A

Weight Contributor Columns inconsistency when using non nullable columns

What went wrong?
Indexing a dataframe with both nullable and non-nullable columns in the columnsToIndex option leads to inconsistency:

  • We index it using only the non-nullable columns.
  • Parquet changes all the columns' schema to nullable=true (*).
  • We read the data using all columnsToIndex, as all the columns are now nullable.

This produces an inconsistency, as the hash of the rows is computed with different columns.
Te easiest solution I find is to change getWeightContributorColumns() to compute the hash with all the columnsToIndex.

(*): From Spark documentation:

When reading Parquet files, all columns are automatically converted to be nullable for compatibility reasons.

How to reproduce?

  1. Code that triggered the bug, or steps to reproduce:
    Suppose a dataframe with the following schema (notice forEnc and colEnc columns with nullable=false):
root
 |-- _id: long (nullable = true)
 |-- released: string (nullable = true)
 |-- title: string (nullable = true)
 |-- for: string (nullable = true)
 |-- col: string (nullable = true)
 |-- forEnc: double (nullable = false)
 |-- colEnc: double (nullable = false)

If we index this dataframe using columnsToIndex=(forEnc, colEnc, _id) the hash is computed with the non-nullable columns, i.e. (forEnc, colEnc), as you can see in the OTreeAlgorithm:

val notNulls =
fields.filter(p => !p.nullable)

When saved, parquet will change all the columns to nullable=true. So when we load the indexed dataframe we'll be reading the following schema (notice the change in the last two columns):

root
 |-- _id: long (nullable = true)
 |-- released: string (nullable = true)
 |-- title: string (nullable = true)
 |-- for: string (nullable = true)
 |-- col: string (nullable = true)
 |-- forEnc: double (nullable = true)
 |-- colEnc: double (nullable = true)

Then we are computing the hash on read with (forEnc, colEnc, _id), while the hash on write was computed with (forEnc, colEnc), as they were non-nullable when the hash was computed.

  1. Branch and commit id:
    main, 49eaad5

  2. Spark version:
    3.1.1

  3. Hadoop version:
    3.2.0

  4. Are you running Spark inside a container? Are you launching the app on a remote K8s cluster? Or are you just running the tests in a local computer?
    Remote K8s cluster, working in standalone mode.

  5. Stack trace:
    N/A

Writing to S3 using the Quickstart setup fails with NoSuchMethodError

What went wrong?

After following the Quickstart I tried writing a dataset into S3 and failed. I tracked down the issue to the Hadoop version.

Spark 3.1.x is shipped with Hadoop 3.2.0 (See here) while the qbeast-spark quickstart recommends Hadoop 3.2.2 dependencies.

How to reproduce?

  1. Code that triggered the bug, or steps to reproduce:
val df = spark.read.format("csv").option("header","true").option("inferSchema", "true")
.load("./src/test/resources/ecommerce300k_2019_Nov.csv")

df.write.format("qbeast").option("columnsToIndex","product_id,price").save("s3a://my_bucket/qbeast-ecommerce")
  1. Branch and commit id:
    Commit 4f37b58 on branch Main

  2. Spark version:
    3.1.2

  3. Hadoop version:
    3.2.0

  4. Are you running Spark inside a container? Are you launching the app on a remote K8s cluster? Or are you just running the tests in a local computer?
    Local computer writing to remote S3

  5. Stack trace:

java.lang.NoSuchMethodError: 'void org.apache.hadoop.util.SemaphoredDelegatingExecutor.<init>(java.util.concurrent.ExecutorService, int, boolean)'
	at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:824)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1118)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1098)
	at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
	at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:248)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:390)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:349)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:36)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:149)
	at io.qbeast.spark.sql.qbeast.BlockWriter.buildWriter(BlockWriter.scala:135)
	at io.qbeast.spark.sql.qbeast.BlockWriter.$anonfun$writeRow$3(BlockWriter.scala:73)
	at scala.collection.immutable.Map$EmptyMap$.getOrElse(Map.scala:104)
	at io.qbeast.spark.sql.qbeast.BlockWriter.$anonfun$writeRow$1(BlockWriter.scala:73)
	at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:162)
	at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:162)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429)
	at io.qbeast.spark.sql.qbeast.BlockWriter.writeRow(BlockWriter.scala:64)
	at io.qbeast.spark.sql.qbeast.QbeastWriter.$anonfun$writeFiles$1(QbeastWriter.scala:172)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Support different data types to index

Currently, we only support indexing Numeric types, such as Double, Int, Float, Decimal... We should find a way to allow indexing anything that is serializable (and makes sense).

For that matter, this enhancement can be split into two tasks:

  • Define the formats that we support. Not only by type but also any other characteristics of the data that we should preserve and consider (like cardinality).
  • Change the Qbeast contract for processing the different types.

Optimize() on azure blob storage writing error

What went wrong?
The optimize operation does two different writes:

  1. replicates the data from analyzed cubes in new parquet files under the table directory
  2. save a Dataset[(Long, Array[Byte])] in parquet files under the _qbeast directory.

While trying to optimize a qbeast table saved in Azure Blob Storage, the following exception is thrown:

21/09/21 08:00:15 WARN TaskSetManager: Lost task 0.0 in stage 67.0 (TID 2739) (10.244.0.3 executor 5): org.apache.spark.SparkException: Task failed while writing rows.                                               (0 + 1) / 1]
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:296)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hadoop.fs.azure.AzureException: com.microsoft.azure.storage.StorageException: One of the request inputs is not valid.

How to reproduce?

  1. Code that triggered the bug, or steps to reproduce:
val pathW = "wasb://[email protected]/"
val dataDir = pathW + "data-10gb/store_sales"
val qbeastDir = pathW + "paola/data-10gb-indexed/store_sales_i_optimize"

val df = spark.read.format("delta").load(dataDir)
val columnsToIndex = Seq("ss_ticket_number", "ss_item_sk")

df.write.format("qbeast").option("columnsToIndex", columnsToIndex.mkString(",")).mode("overwrite").save(qbeastDir)

import io.qbeast.spark.table.QbeastTable
val qbeastTable = QbeastTable.forPath(spark, qbeastDir)

qbeastTable.analyze()
qbeastTable.optimize()
  1. Branch and commit id:

Main on 15667c2

  1. Spark version:
    On the spark shell run spark.version.

3.1.1

  1. Hadoop version:
    On the spark shell run org.apache.hadoop.util.VersionInfo.getVersion().

2.7.4

  1. Are you running Spark inside a container? Are you launching the app on a remote K8s cluster? Or are you just running the tests in a local computer?

I'm running spark on a remote Azure Kubernetes cluster of 9 nodes: 8 workers (each 3 cores) and 1 for the master. The desiredCubeSize configuration for the 10gb dataset of tpc-ds is set to 500000

  1. Stack trace:
21/09/21 08:00:15 WARN TaskSetManager: Lost task 0.0 in stage 67.0 (TID 2739) (10.244.0.3 executor 5): org.apache.spark.SparkException: Task failed while writing rows.                                               (0 + 1) / 1]
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:296)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hadoop.fs.azure.AzureException: com.microsoft.azure.storage.StorageException: One of the request inputs is not valid.
        at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2482)
        at org.apache.hadoop.fs.azure.NativeAzureFileSystem$FolderRenamePending.execute(NativeAzureFileSystem.java:424)
        at org.apache.hadoop.fs.azure.NativeAzureFileSystem.rename(NativeAzureFileSystem.java:1997)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:531)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:502)
        at org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:50)
        at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:77)
        at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitTask(HadoopMapReduceCommitProtocol.scala:260)
        at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:79)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:280)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:286)
        ... 9 more
Caused by: com.microsoft.azure.storage.StorageException: One of the request inputs is not valid.
        at com.microsoft.azure.storage.StorageException.translateException(StorageException.java:162)
        at com.microsoft.azure.storage.core.StorageRequest.materializeException(StorageRequest.java:307)
        at com.microsoft.azure.storage.core.ExecutionEngine.executeWithRetry(ExecutionEngine.java:177)
        at com.microsoft.azure.storage.blob.CloudBlob.startCopyFromBlob(CloudBlob.java:764)
        at org.apache.hadoop.fs.azure.StorageInterfaceImpl$CloudBlobWrapperImpl.startCopyFromBlob(StorageInterfaceImpl.java:399)
        at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2449)
        ... 20 more

Implement the table tolerance function

What is Tolerance?

Tolerance marks the error a user allows in an aggregation, within a confidence interval. That means that, giving a CI of 95% for example, 95 of 100 times runs of the same query, the answer would have a relative error of [0.0, tolerance]

How do we calculate Tolerance?

The idea is with a user-provided tolerance value, we can estimate the required sample size to satisfy the query that computes the mean with a predefined level of certainty.

Given a confidence level of say 95%, we want to determine a confidence interval for which 95% of all our mean estimations will fall within the target range. That is, given a sample of size n drawn from a population (with and as population mean and variance respectively), determine the confidence interval of the sample mean so it has a 95% chance of containing

In other words;

Here, the Central Limit Theorem is taken into account:

Regardless of the distribution of the population(as long a and are finite), the distribution of the sample means is normal.

As well as the notion of Standard Error of the Mean:

Given a single sample of size n, how can we determine how far its mean $\bar{x}$ is from the population mean $\mu$? The answer, , reflects the standard deviation of the sample means and can be estimated as , with s being the standard deviation of the sample.

Tolerance is the Relative Standard Error (RSE) of the distribution of the sample means. The formula of the RSE can be expressed in terms of the Standard Error (SE) and the Estimated Mean ( ).

Consequently, the RSE can be estimated from the Standard Error ( ) of the Sample Mean and the Estimated Mean ( with the formula .

Another way to put it is; "we want that the error of the mean to be less than the tolerance applied to the estimated mean ( )";

Both ways lead to the same equation which allows determining the sample size as follows;

Standard Error of the Mean, , can be estimated as , with s being the standard deviation of the sample. It can be done because of the assumption of normality.

Deviation of the sample mean from the population mean is the SEM, and we want the percentage of error with respect to the mean, which should have tolerance as upper bound (ratio of the error of the SEM ). This gives us;

This issue has the scope to collect all the information about the table tolerance and guide a bit the future development.
Missing steps.

  • formally define the algorithm for the table tolerance
  • Implement the algorithm inside the qbeast-spark library.
  • finish the .tolerance shortcut io.qbeast.spark.implicts
  • Set up a comprehensive confidence testing.

Support schema evolution

Schema evolution is a feature of Delta Lake that allows users to easily change a tableโ€™s current schema to accommodate data that is changing over time. Most commonly, itโ€™s used when performing an append or overwrite operation, to automatically adapt the schema to include one or more new columns.

Currently we don't support this type of change in Qbeast format, or at least we don't let the user specify any new columns to index with Qbeast. We should investigate on this topic, mostly on what could be the side effect. The proposal is to use SpaceRevision or Revision to actually save the new schema information and treat new revision as new indexes to query.

More information about the delta feature can be found in: https://databricks.com/blog/2019/09/24/diving-into-delta-lake-schema-enforcement-evolution.html

In the meantime, the Qbeast commit log structure should support the upcoming development of this feature. Meaning that some information like number of dimensions or columns indexed should be saved in the commit log.

Include INSERT INTO in Qbeast save contract

What went wrong?
Right now on the master, we cannot add new elements to a qbeast table through the INSERT INTO sql command

How to reproduce?

  1. Code that triggered the bug, or steps to reproduce:

On a previously indexed dataframe named df:

df.createOrReplaceTempView("t")
spark.sql("insert into table t (value) values (4)")

And the output is:

org.apache.spark.sql.AnalysisException: QbeastBaseRelation(parquet) does not allow insertion.;
'InsertIntoStatement Relation[value#1297] QbeastBaseRelation(parquet), false, false
+- LocalRelation [col1#1543]

  at org.apache.spark.sql.execution.datasources.PreWriteCheck$.failAnalysis(rules.scala:512)

  1. Branch and commit id:

master ef6b9f45ac54ab156e5b3474e3014b639b2ac827

  1. Spark version:
    3.x

  2. Hadoop version:

2.4.7

  1. Are you running Spark inside a container? Are you launching the app on a remote K8s cluster? Or are you just running the tests in a local computer?
    Local

  2. Stack trace:

Filter Pushdown on indexed columns

Right now we are using the whole space for querying.

val originalTo = Point(Vector.fill(qbeastSnapshot.dimensionCount)(Int.MaxValue.doubleValue()))

val querySpace = QuerySpaceFromTo(originalFrom, originalTo, spaceRevision)

This means that every search starts on the top of the tree and goes down until it finds all cubes that match the From / To precision. But if the user is interested only in some part of the space, this is filtered on memory instead of skipping the files before.

For example, imagine a table indexed with columns col1 and col2. A query like:

SELECT * FROM qbeast_table WHERE col1 > 2 and col1 < 7 and col2 > 3 and col2 < 10

will read all data and then filter by the column value. However, the index should allow filtering cube/blocks/files that have data only on the (2, 7),(3,10) range.

We should add two main functionalities:

  • Push down filters col1 > 2 and col1 < 7 and col2 > 3 and col2 < 10
  • Processing ranges of query space

Provide unified structure for Revision

The way to solve different spaces coexisting in the same OTree is through what we call SpaceRevision.
This class contains information of the min and max values for each column indexed for one or multiple write operations. This is the structure it has in a JSON format:

{"timestamp":1634196697656,
  "transformations":[
    {"min":1.16396325E8,"max":7.13218881E8,"scale":1.675539890285246E-9},
    {"min":-2.87485335E7,"max":9.02495125E7,"scale":8.403499331409189E-9}]
}

But we aim that SpaceRevision can contain more configurable attributes than just the coordinates. Some values like the columns that are indexed or the cube size could change from one revision to another in the future. That's why we should start processing them all in the same unified structure.

( This relate to #3 )

Read protocol should work when a cube is not present

Related to error found in #47.
Due to the greedy estimation of cube max weight in the first step of the writing protocol, we end up having missing parents in the index structure. This causes that all data is written down, but the read operation is not supposed to contemplate a behavior in which cube "AAAA" is present but parent "AAA" is not.

The things we need to cover are:

  • Does it make sense to prepare the read (a.k.a findSampleFiles) to detect if a cube is missing?
  • How to implement it in an efficient way

Drop here any opinions on this topic, please @alexeiakimov @cugni

Easier information API

In the current implementation, configurations like: columnsToIndex, desiredCubeSize and other index-related characteristics such as revisions available are not visible (easily) to the user/developer.

We should provide an API through the QbeastTable interface to allow anyone to consult those values.

Example of the functionality:

import io.qbeast.spark.table._

val dir = "path/to/mytable"
val qbeastTable = QbeastTable.forPath(spark, dir)

qbeastTable.getRevisions()
qbeastTable.getIndexedColumns()
qbeastTable.getDesiredCubeSize()
...

Delta log should use relative paths instead of absolute

What went wrong?
The Delta log uses absolute paths for the data files when the DataFrame is saved on the local file system. Later it does not allow to share the dataset via Delta Sharing which does not support absolute file paths.

How to reproduce?

  1. Code that triggered the bug, or steps to reproduce:
    Run Spark shell with delta and qbeast-spark like the following command:

./spark-shell --jars /home/alexey/src/qbeast-spark/target/scala-2.12/qbeast-spark-assembly-0.1.0.jar,/home/alexey/tmp/delta-core_2.12-0.8.0.jar --conf spark.sql.extensions=io.qbeast.spark.sql.QbeastSparkSessionExtension --conf spark.driver.memory=768m --conf spark.executor.memory=768m --conf spark.driver.extraJavaOptions="-Dqbeast.index.size=3000000"

Save a simple DataFrame on the file system in delta and qbeast formats

import io.qbeast.spark.implicits._
val df = Seq(1, 2, 3).toDF()
df.write.format("delta").mode("overwrite").save("/home/alexey/tmp/paths")
df.write.format("qbeast").mode("overwrite").save("/home/alexey/tmp/paths2")

Check the Delta log files in both cases

{"add":{"path":"part-00000-194d9f52-8170-44cb-b0bd-9d8923251975-c000.snappy.parquet","partitionValues":{},"size":436,"modificationTime":1633339621652,"dataChange":true}}
{"add":{"path":"file:/home/alexey/tmp/paths2/66ec10d2-3fb0-426d-8f60-4bc1ef3f7f3a.parquet","partitionValues":{},"size":438,"modificationTime":1633339760900,"dataChange":true,"stats":"","tags":{"state":"FLOODED","rowCount":"3","cube":"","space":"{\"timestamp\":1633339759124,\"transformations\":[{\"min\":0.0,\"max\":4.0,\"scale\":0.25}]}","minWeight":"-2147483648","maxWeight":"2147483647","indexedColumns":"value"}}}

  1. Branch and commit id:
    main at 4f37b58

  2. Spark version:
    3.1.1

  3. Hadoop version:
    2.7.4

  4. Are you running Spark inside a container? Are you launching the app on a remote K8s cluster? Or are you just running the tests in a local computer?
    No

  5. Stack trace:
    N/A

Investigate a way of saving columnsToIndex in schema or table metadata

What went wrong?
As Eric pointed out, there's no easy way to retrieve the columns indexed on a dataframe. Even if you know the insights, you still have to look through the commit log to know which are the ones selected.
Either if they are chosen manually (with columnsToIndex option) or automatically (in the future), user and developer should consult the information in a simpler manner.

Wrong SparkSession initialization in Testing method 'withQbeastContextSparkAndTmpDir'

What went wrong?
When running a single test that uses withQbeastContextSparkAndTmpDir, as there is no active SparkSession it fails. For example, in this test:

"the Qbeast data source" should
"expose the original number of columns and rows" in withQbeastContextSparkAndTmpDir {
(spark, tmpDir) =>

The method withQbeastContextSparkAndTmpDir is called and will fail, due to QbeastContext needing an active SparkSession, which is not active at this point yet:

def withQbeastContextSparkAndTmpDir[T](testCode: (SparkSession, String) => T): T =
withQbeastContext()(withTmpDir(tmpDir => withSpark(spark => testCode(spark, tmpDir))))

As you can see, withQbeastContext tries to get the SparkSession:

def withQbeastContext[T](
keeper: Keeper = LocalKeeper,
config: SparkConf = SparkSession.active.sparkContext.getConf)(testCode: => T): T = {

How to reproduce?

  1. Code that triggered the bug, or steps to reproduce:
    Run the mentioned test, for example, the one mentioned from QbeastDataSourceIntegrationTest.scala.

  2. Branch and commit id:
    main, 35c9f56

  3. Spark version:
    N/A

  4. Hadoop version:
    N/A

  5. Are you running Spark inside a container? Are you launching the app on a remote K8s cluster? Or are you just running the tests in a local computer?
    I'm running the tests in my local computer.

  6. Stack trace:

No active or default Spark session found
java.lang.IllegalStateException: No active or default Spark session found
	at org.apache.spark.sql.SparkSession$.$anonfun$active$2(SparkSession.scala:1065)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.SparkSession$.$anonfun$active$1(SparkSession.scala:1065)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.SparkSession$.active(SparkSession.scala:1064)
	at io.qbeast.spark.QbeastIntegrationTestSpec.withQbeastContext$default$2(QbeastIntegrationTestSpec.scala:128)
	at io.qbeast.spark.QbeastIntegrationTestSpec.withQbeastContext$default$2$(QbeastIntegrationTestSpec.scala:128)
	at io.qbeast.spark.utils.QbeastDataSourceIntegrationTest.withQbeastContext$default$2(QbeastDataSourceIntegrationTest.scala:11)
	at io.qbeast.spark.QbeastIntegrationTestSpec.withQbeastContextSparkAndTmpDir(QbeastIntegrationTestSpec.scala:145)
	at io.qbeast.spark.QbeastIntegrationTestSpec.withQbeastContextSparkAndTmpDir$(QbeastIntegrationTestSpec.scala:144)
	at io.qbeast.spark.utils.QbeastDataSourceIntegrationTest.withQbeastContextSparkAndTmpDir(QbeastDataSourceIntegrationTest.scala:11)
	at io.qbeast.spark.utils.QbeastDataSourceIntegrationTest.$anonfun$new$1(QbeastDataSourceIntegrationTest.scala:15)
	at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
...

Refine load/save contract for qbeast-spark

Problem

It is necessary to make the qbeast-spark contract more precise in the part responsible for loading and saving indexed data.

Currently, qbeast-spark implements CreatableRelationProvider and RelationProvider traits on the top of Delta Lake. The createRelation methods are generic in terms of supported parameters, namely the parameters of the operation are specified as Map[String, String]. Our implementation passes the received parameters directly to the corresponding Delta Lake API. It means that the customer can use Delta Lake specific parameters to invoke some Delta Lake features like partitioning or user metadata. Our implementation tries to handle those parameters where it is possible.

The problem is that the intent to provide a transparent support of the Delta Lake behind the Qbeast can lead to the situation when the customer expects our system to implement a scenario that is supported by Delta Lake and which is not compatible with our indexing system. For example, at the moment we cannot support the partitioning as it is specified by the customer, we have to extend the partitioning by adding the cube and state fields.

The goal of this task is

  1. To specify the features and the corresponding options supported by Qbeast save/load operations
  2. To isolate the Delta Lake based implementation from the options which are not supported by Qbeast.
  3. If it is necessary to support some Delta Lake features then to define the corresponding Qbeast options and to implement their translation to the original Delta Lake options as a part of the implementation.

In brief, the proposed approach is to hide the used Delta Lake instance from the direct access by the customer applications. Or course, we cannot hide it completely, because it is in the classpath, however, it should be done at the contract level.

On the load/save contract

At the moment DataFrameReader and DataFrameWriter are the only API supported by Qbeast for loading and saving data. To be precise only a subset of available methods of DataFrameReader and DataFrameWriter are supported by Qbeast. Below they are described in mo details.

The load contract

  1. Client obtains a DataFrameReader instance from the SparkSession.
  2. Client uses "qbeast" as format.
  3. Client can specify standard option "timeZone"
  4. Client uses one of the "load" methods to load the DataFrame.
  5. Alternatively a client can use "qbeast" method with the table path as a shortcut for "format(...).load(...)".
  6. Alternatively a client can specify the table path as the "path" option. This scenario is supported by Spark, though it is not recommended.

The save contract

  1. Client obtains a DataFrameWriter instance from the DataFrame "write" method.
  2. Client uses the "mode" method to specify the save mode.
  3. Client uses "qbeast" as the format.
  4. Client uses "columnsToIndex" option to specify a comma-separated list of columns to be used by the index.
  5. Client can specify standard option "timeZone"
  6. Client can specify the partition columns using the "partitionBy" method.
  7. Client uses the "save" method with the table path to write the data.
  8. Alternatively the client can use the "qbeast" method with the table path as a shortcut for "format(...).save(...)".
  9. Alternatively the client can specify the table path as the "path" option. This scenario is supported, though is not recommended.
  10. The methods "bucketBy", "sortBy", "insertInto" and "saveToTable" are not supported at the moment, because they require a certain type of the underlying table.
  11. The methods "csv", "json", "jdbc", "text", "orc", and alike are not supported, because they require a different data format.

On implementation

QbeastDataSource should remove all unsupported options from the parameters passed to the implementations of TableProvider, CreatableRelationProvider and RelationProvider before those parameters could be passed further to the persistence layer implemented on the top of Delta Lake.

Overhead of qbeast_hash filtering when doing a Sample

What went wrong?
When a sample() is performed against a qbeast dataset, the qbeast sql extension changes the Sample operation into a Filter to:

  • Pushdown the filter to the data source
  • Filter the cubes that satisfy the [lowerBound, upperBound]
  • In the end, read less data

But in this process, we are producing a hash in memory of the columns indexed to make sure that the records matched with the Sample range.
This hash has an extra cost when we don't need the information of those columns to satisfy the query. For example, if we have a dataset indexed by (user_id,product_id) and we want to know the avg price, we are avoiding the pruning of user_id, product_id columns.

We need to study how to overcome this problem.
One solution is to avoid the in-memory filtering for those files that have a maxWeight < upperBound, because we know for sure that all the records in that file match the predicate.

Tolerance performs 5 times slower

What went wrong?
Tolerance operation on the aggregation of average column takes 5 times more that normal aggregation

How to reproduce?

Once you write and read a dataset with qbeast, on spark-shell you can perform:

scala> spark.time(qbeast.agg(avg("user_id")).tolerance(0.1).show)
+-------------------+
|    avg(user_id)|
+-------------------+
|5.367291603961085E8|
+-------------------+
Time taken: 4109 ms
scala> spark.time(qbeast.agg(avg("user_id")).show)
+--------------------+
|    avg(user_id)|
+--------------------+
|5.3647643790496415E8|
+--------------------+
Time taken: 886 ms
  1. Branch and commit id:
    18-tableTolerance 8443eff25b42d709dd1c5d0aeaa8240c8117901f

  2. Spark version:
    3.x

  3. Hadoop version:
    2.7.4

  4. Are you running Spark inside a container? Are you launching the app on a remote K8s cluster? Or are you just running the tests in a local computer?
    Local Computer

  5. Stack trace:

Make replicated cube files invisible to other formats

The problem with the current implementation is that the files from the ReplicatedSet are still visible for the delta log. So, if someone wants to read the table as delta (either to tests or to process the data in other ways), it will find duplicated records that should not be present.

Possible solutions are the following:

  • To save those files in a different folder inside the root of the table and to process them independently.

  • To create a different class that is unrecognizable by delta (so it does not read those files) but that we can process internally. (This one would not work if we read them as parquet.

For example:

//DRAFT
case class ReplicatedFile(
    path: String,
    @JsonDeserialize(contentAs = classOf[java.lang.Long])
    replicationTimestamp: Option[Long],
    dataChange: Boolean = true,
    extendedFileMetadata: Boolean = false,
    partitionValues: Map[String, String] = null,
    size: Long = 0,
    tags: Map[String, String] = null) extends FileAction {
  override def wrap: SingleAction = SingleAction(remove = this)

  @JsonIgnore
  val repTimestamp: Long = replicationTimestamp.getOrElse(0L)
}

Unexpected exception when reading non-qbeast-formatted data

What went wrong?
The following exception should be thrown when you load data that is not in qbeast format or when the path does not exist. It works well when the path does not exist; however, a different exception is thrown when the path exists and it is non-qbeast-formatted data:

if (table.exists) {
table.load()
} else {
throw AnalysisExceptionFactory.create(
s"'$tableID' is not a Qbeast formatted data directory.")
}

My conclusion is that the format of the table is not checked, as this happens as well when trying to load a table indexed with an old version of the qbeast-spark format.

How to reproduce?

  1. Code that triggered the bug, or steps to reproduce:
  • Load an empty path:
val df = spark.read.format("qbeast").load("nonExistingPath")
org.apache.spark.sql.AnalysisException: 'nonExistingPath' is not a Qbeast formatted data directory.
  • But try to load a delta-formatted table or a qbeast table written with the old version of the format, and the exception will refer to the revision:
val df = spark.read.format("qbeast").load("deltaTablePath")
org.apache.spark.sql.AnalysisException: No space revision available with -1
  1. Branch and commit id:
    main, d9bd04a

  2. Spark version:
    3.1.1

  3. Hadoop version:
    3.2.0

  4. Are you running Spark inside a container? Are you launching the app on a remote K8s cluster? Or are you just running the tests in a local computer?
    N/A

  5. Stack trace:

val df = spark.read.format("qbeast").load("deltaTablePath")
org.apache.spark.sql.AnalysisException: No space revision available with -1
  at org.apache.spark.sql.AnalysisExceptionFactory$.create(AnalysisExceptionFactory.scala:36)
  at io.qbeast.spark.delta.DeltaQbeastSnapshot.$anonfun$getRevision$1(DeltaQbeastSnapshot.scala:81)
  at scala.collection.immutable.Map$EmptyMap$.getOrElse(Map.scala:104)
  at io.qbeast.spark.delta.DeltaQbeastSnapshot.getRevision(DeltaQbeastSnapshot.scala:81)
  at io.qbeast.spark.delta.DeltaQbeastSnapshot.loadLatestRevision(DeltaQbeastSnapshot.scala:140)
  at io.qbeast.spark.internal.sources.QbeastBaseRelation$.forDeltaTable(QbeastBaseRelation.scala:43)
  at io.qbeast.spark.table.IndexedTableImpl.createQbeastBaseRelation(IndexedTable.scala:194)
  at io.qbeast.spark.table.IndexedTableImpl.load(IndexedTable.scala:171)
  at io.qbeast.spark.internal.sources.QbeastDataSource.createRelation(QbeastDataSource.scala:90)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:354)
  at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:326)
  at org.apache.spark.sql.DataFrameReader.$anonfun$load$1(DataFrameReader.scala:306)
  at scala.Option.map(Option.scala:230)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:266)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:240)
  ... 47 elided

The change on cubeSize (probably) should not change revision

What went wrong?
Right now, if the user changes the cubeSize for the same table, a new Revision is issued.

if (checkRevisionParameters(QbeastOptions(parameters), latestIndexStatus.revision)) {

private def checkRevisionParameters(

latestRevision.desiredCubeSize == qbeastOptions.cubeSize

But having different desired cube sizes across writes does not affect the reading protocol, but having multiple index structures for the same dataset could lead to performance drawbacks.

To understand the behavior and explore the new strategy, we should:

  • Understand how it affects the IndexStatusBuilder
  • Understand how it affects the writing protocol
  • Set it as a mutable parameter in each Revision
  • How do we keep track of the changes?

Losing records when cubeSize is too small

What went wrong?
On the PR #39 we were testing different indexing columns. Running an indexing test with a DataFrame of size 1000 and cubeSize of 10 in an 8 core spark standalone process, we found out it is not reading all the information.

Why?

Estimation computed in CubeWeights can be suboptimal for small cube size. For example:

We set an overall cubeSize of 100. We have 5 partitions with the following number of elements on each one:

[92, 52, 22, 50, 10]

The max weight of the root cube should be: 100/(92 + 52 + 22 + 50 + 10) = 0.442

Now, some test with random distributions:
If we follow the actual implementation, the desiredSize for each partition is going to be 100 / 5 = 20.

With a program in Python we generate a list of random number for each partition with the size that corresponds, and we picked the element at 20th position (if existed). The results were:

[0.133, 0.4516, 0.87, 0.39, 2]

When we estimate the max weight, the result is: 0.0713 * 5 (num of partitions) = 0.3565

But if we do the same operation for a cubeSize of 1000, desiredSize per partitions 1000/5 = 200, the result equals to: 0.089 * 5 = 0.44 which is closer to the real value of 0.442

This leads to cubes having a very small estimation of their max weight on the first step of the algorithm. Then, when trying to put the actual records on the cubes, those with lower precision are skipped, and everything is stored in the children. The result of this is that all data is written correctly, but cubes are missing. And reading protocol does not expect this behavior in the structure.

Possible solution

To solve this we must set a minimum default cube size and let it be configurable on a SparkSession level (not as an option to a DataFrame).

How to reproduce?

  1. Code that triggered the bug, or steps to reproduce:

This is the code to reproduce the error:

case class T1(a: Int, b: String, c: Double)

 val source = 0
      .to(1000)
      .map(i => T1(i, s"$i", i.toDouble))
      .toDF()
      .as[T1]

    source.write
      .format("qbeast")
      .option("columnsToIndex", "a,c")
      .option("cubeSize", 10)
      .save(tmpDir)

    val indexed = spark.read
      .format("qbeast")
      .load(tmpDir)
      .as[T1]

    source.count() shouldBe indexed.count()
  1. Branch and commit id:

divide-et-impera at e059768

  1. Spark version:
    On the spark shell run spark.version.

3.1.1

  1. Hadoop version:
    On the spark shell run org.apache.hadoop.util.VersionInfo.getVersion().

3.2.0

  1. Are you running Spark inside a container? Are you launching the app on a remote K8s cluster? Or are you just running the tests in a local computer?

Local
6. Stack trace:

Slow Collect and Execute on QbeastWriter during indexing

What went wrong?

During the indexing of a dataset of 12.4GB using 2 numerical columns one of the most time-consuming jobs have been to execute and collect in the QbeastWriter on the following line:


slowCollext Execute

Hypothesis and know issue

Our current implementation for writing data is split into two stages

  1. Calculation of an estimated CubeWeight map.
    It is done by partition and requires a collect() in order to broadcast the variable and be able to put each row in a correspondent cube. Plus, aggregation of min/max of the columns indexed is required to know the space we are indexing.

  2. Reorganization of data and writing of multiple rows of the same cube in a single parquet file

    1. Repartition the data by cube and state. Minor penalization is equivalent to delta writing time.
    2. File I/O writing. All the records from a cube are written in a single parquet file, not allowing the parallelization of the write operation.

As documented by @alexeiakimov :

An attempt to index 1GB of data on a single node with qbeast-spark-nodep_2.12-nightly-d44ce4c.jar has shown that the following three jobs are the main contributors to the time:

  1. execute and collect() at QbeastWriter- 2.4 min. Method collect is called on the dataset after all the records are written to destination media

  2. min/max aggregation. Model - 1.8 min. Method agg is called on the dataset to compute min and max values for each dimension
  3. collect() on OTreeAlgorithm - 1.7 min. Method collect is called on the dataset to compute the cube weights.

The hypothesis is that in all three cases the traversing of the full dataset is responsible for the big latency. It is clear that in the ColumnInfo case our code has minimal impact, because it simply computes min and max for two columns, however, it has a latency of the same magnitude as QbeastWriter and OTreeAlgorithm, where we do a lot of computations.

How to reproduce?

  1. Code that triggered the bug, or steps to reproduce:
  • Relevant config for the SparkSession in the Azure Cluster:
            --packages org.apache.hadoop:hadoop-azure:2.7.0,com.azure:azure-storage-blob:12.8.0,com.azure:azure-storage-common:12.8.0,com.microsoft.azure:azure-storage:2.0.0,io.delta:delta-core_2.12:0.8.0 \
            --conf spark.executor.cores=3 \
            --conf spark.driver.memory=12g \
            --conf spark.executor.memory=12g \
            --conf spark.eventLog.enabled=true \
            --conf spark.sql.extensions=io.qbeast.spark.sql.QbeastSparkSessionExtension \
            --conf spark.driver.extraJavaOptions="-Dqbeast.index.size=2000000""

  • We load the table store_sales from the TPC-DS 100Gb dataset, which has a size of 12.4GB in delta format. We index it by using the primary key attributes and we write them on qbeast format:
val store_sales = spark.read.format("delta").load("wasb://[email protected]/data-1gb/store_sales")
val tmpDir = "wasb://[email protected]/qb_indexed_1gb/store_sales"
store_sales.write.mode("overwrite").format("qbeast").option("columnsToIndex", "ss_item_sk,ss_ticket_number").save(tmpDir)
  1. Branch and commit id:
    Main on 15667c2

  2. Spark version:
    3.1.1

  3. Hadoop version:
    2.7

  4. Are you running Spark inside a container? Are you launching the app on a remote K8s cluster? Or are you just running the tests in a local computer?
    Running Spark Shell from the Jupyter Notebook in the Kubernetes deployment on Azure

Reading all data gets delayed on collect()

What went wrong?
Comparing an aggregation over ss_sales_price on all data (12.4GB) with qbeast format, it takes more than 2 extra seconds than delta because of different collect() calls.

Qbeast execution:

qbeastAgg

Delta Execution:

deltaAgg

How to reproduce?

  1. Code that triggered the bug, or steps to reproduce:

On TPC-DS 100gb generated dataset run:

val qbeastDf = spark.read.format("qbeast").load(qbeastDir)
qbeastDf.agg(avg("ss_sales_price")).show()
  1. Branch and commit id:

Main on 15667c2

  1. Spark version:
    On the spark shell run spark.version.

3.1.1

  1. Hadoop version:
    On the spark shell run org.apache.hadoop.util.VersionInfo.getVersion().

2.4.7

  1. Are you running Spark inside a container? Are you launching the app on a remote K8s cluster? Or are you just running the tests in a local computer?

This is running on Azure Kubernetes cluster, with 8 worker nodes + 1 master node of 16GB memory.

  1. Stack trace:

No stack trace.

Throw new exception when loading non qbeast-formatted data

What went wrong?
Trying to load data not stored in qbeast format currently throws a java.lang.NullPointerException. We could add a new Exception to be thrown in that case, which tells the user something like "This is not qbeast-formatted data".

How to reproduce?

  1. Code that triggered the bug, or steps to reproduce:
val df = spark.read.format("qbeast").load(nonQbeastFormatPath)
  1. Branch and commit id:
    main branch, 15667c2

  2. Spark version:
    3.1.1

  3. Hadoop version:
    2.7.4

  4. Are you running Spark inside a container? Are you launching the app on a remote K8s cluster? Or are you just running the tests in a local computer?
    N/A

  5. Stack trace:

java.lang.NullPointerException
  at io.qbeast.spark.sql.qbeast.QbeastSnapshot.<init>(QbeastSnapshot.scala:35)
  at io.qbeast.spark.table.IndexedTableImpl.snapshot(IndexedTable.scala:151)
  at io.qbeast.spark.table.IndexedTableImpl.load(IndexedTable.scala:146)
  at io.qbeast.spark.sql.sources.QbeastDataSource.createRelation(QbeastDataSource.scala:94)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:354)
  at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:326)
  at org.apache.spark.sql.DataFrameReader.$anonfun$load$1(DataFrameReader.scala:306)
  at scala.Option.map(Option.scala:230)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:266)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:240)
  ... 47 elided

Update the target version of Spark/Hadoop

What went wrong?
Currently the project uses Spark 3.1.1 and Hadoop 2.7 as implementation dependencies. Unfortunately Hadoop 2.7 does not work well with S3, so it makes impossible the scenario when a user writes the indexed data to directly to S3. At the same time there is pre-built distribution of Spark 3.1.2 which brings Hadoop 3.2, the version which supports S3 well.

How to reproduce?

  1. Code that triggered the bug, or steps to reproduce: N/A

  2. Branch and commit id: main at 7edbfc6

  3. Spark version: 3.1.1

  4. Hadoop version: 2.7

  5. Are you running Spark inside a container? Are you launching the app on a remote K8s cluster? Or are you just running the tests in a local computer?

  6. Stack trace: N/A

QueryRange initialization lead to information loses

What went wrong?
On executing:

val df = spark.read.format("qbeast").load("/tmp/qbeast")
df.count()

in a Dataset of 2320724 rows, it outputs only 9973. If it's read with "delta" instead of "qbeast", the count() is correct.

After some digging on the reading algorithm, we found out that the initialization on QuerySpace had hardcoded the values for min and max as Int.MinValue and Int.MaxValue.

val originalFrom = Point(Vector.fill(dimensionCount)(Int.MinValue.doubleValue()))

On filtering the cubes to retrieve, it checks whether they intersect with the QuerySpace or not. If the set of transformation indexed are bigger than [Int.MinValue, Int.MaxValue] it will lead to set the condition to false and read only a subset of cubes (in this case, just the first one).

How to reproduce?

  1. Code that triggered the bug, or steps to reproduce:

Specification of the Dataset:

Github dataset indexed by columns: event_id,type_id,repo_id.

  • event_id : identifier of the event. Long, nullable
  • type_id : type of the event. Integer, nullable
  • repo_id : repository identifier. Long, nullable

Schema:


 |    |-- size: long (nullable = true)
 |-- public: boolean (nullable = true)
 |-- repo: struct (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- url: string (nullable = true)
 |-- type: string (nullable = true)
 |-- type_id: integer (nullable = true)
 |-- event_id: long (nullable = true)
 |-- repo_id: long (nullable = true)
  1. Branch and commit id:

main at b721684

  1. Spark version:
    On the spark shell run spark.version.

3.1.1

  1. Hadoop version:
    On the spark shell run org.apache.hadoop.util.VersionInfo.getVersion().

3.2.0

  1. Are you running Spark inside a container? Are you launching the app on a remote K8s cluster? Or are you just running the tests in a local computer?

Local Computer

  1. Stack trace:

Match Error on Filtering indexed String columns

What went wrong?
When doing a query on a string indexed column, the Spark type UTF8String is not recognized by the Transformation method and it throws a Match error.

This is the result of filtering the e-commerce dataset indexed with qbeast by "brand == 'versace'"

versace (of class org.apache.spark.unsafe.types.UTF8String)
scala.MatchError: versace (of class org.apache.spark.unsafe.types.UTF8String)
	at io.qbeast.core.transform.HashTransformation.transform(HashTransformation.scala:11)
	at io.qbeast.core.model.QuerySpaceFromTo$.$anonfun$apply$1(QuerySpace.scala:68)
	at scala.collection.immutable.List.map(List.scala:293)
	at io.qbeast.core.model.QuerySpaceFromTo$.apply(QuerySpace.scala:67)
	at io.qbeast.spark.index.query.QuerySpecBuilder.extractQuerySpace(QuerySpecBuilder.scala:107)
	at io.qbeast.spark.index.query.QuerySpecBuilder.build(QuerySpecBuilder.scala:144)
	at io.qbeast.spark.index.query.QueryExecutor.$anonfun$execute$1(QueryExecutor.scala:22)

The solution is to detect the spark type before calling core functions and parse it to the string representation.

How to reproduce?

  1. Code that triggered the bug, or steps to reproduce:
   val tmpDir = "/tmp/qbeast"

    val data = spark.read
    .format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load("src/test/resources/ecommerce100K_2019_Oct.csv")
    .distinct().na.drop()

   data.write
      .mode("overwrite")
      .format("qbeast")
      .options(
        Map("columnsToIndex" -> "brand,product_id", "cubeSize" -> "10000"))
      .save(tmpDir)

  val indexed = spark.read.format("qbeast").load(tmpDir)
  indexed.filter("brand == 'versace'").show()
  1. Branch and commit id:

main on c182980
3. Spark version:
On the spark shell run spark.version.

3.1.2

  1. Hadoop version:
    On the spark shell run org.apache.hadoop.util.VersionInfo.getVersion().

3.2.0

  1. Are you running Spark inside a container? Are you launching the app on a remote K8s cluster? Or are you just running the tests in a local computer?

On local computer

  1. Stack trace:

Why do we read twice the delta log when doing a query?

In the current implementation, when we do a query, we read at least twice the delta log:

  1. At first, in the OTreeIndex:33 class, when we call
val tahoeMatchingFiles = index.matchingFiles(partitionFilters, dataFilters)

on the TahoeLogFileIndex
2. Then in QueryExecutor:28 when we execute

val indexStatus = qbeastSnapshot.loadIndexStatus(revision.revisionID)

we read the full deltaLog once for each revision.

Also, we do an intersection of the files in tahoeMatchingFiles and the files found by the QueryExecutor twice, one explicitly in OTreeIndex:41

 tahoeMatchingFiles.filter { a => qbeastMatchingFiles.exists(_.path == a.path) }

and also when we do the flatMap in the QueryExecutor:63

outputFiles ++= files.flatMap(fileMap.get)

The questions are:

  1. Do we need to filter also for the tahoeIndexFiles? Which are the data predicates that it filters that we don't?
  2. If we need to use the file matched by the TahoeLogFileIndex, why don't we use these files to build the IndexStatus, so we don't have to scan the deltaLog multiple times, and we don't have to intersect the list of files.

Cube size should be estimated

Right now, the (desired) size of the cubes is set by default to 10000. This is not a good number when we have big datasets, since we end up having too many small files.
Although is a configurable parameter, it is not intuitive to change, so we need another approach.

My suggestion of tasks is:

  • Add a configuration as an option on the DataFrame API, as columnsToIndex
  • Define a better default size. Can be related to the default parquet size, since each cube would be written in one single file. Or just change it to something bigger in the meantime.

Add support for compacting files

What went wrong?

Recently, Delta contributors added the functionality to Optimize tables through SQL on the Open Source version. ๐Ÿ™Œ

You can read everything in the issue related: delta-io/delta@e366ccd but summarize:

Having many small files can turn into a performance issue. To address this, Optimize operation compacts them into a single bigger file.

OPTIMIZE ('/path/to/dir' | delta.table) [WHERE part = 25];

This new feature would be part of next release 1.2.0 of Delta Lake. Right now, we only have compatibility with 1.0.0, and Qbeast does not handle this type of compaction.

In order to be compatible with future Delta versions, we should:

  1. Upgrade to the newest version of Delta
  2. Solve compatibility problems that could arise (since it also changes Spark version to 3.2.0)
  3. Think about how to address compaction of small files at cube level
  4. Implement new functionality
  5. Add tests

ColumnsToIndex and/or CubeSize should be optional on Append

When writing data with qbeast format, the user needs to specify every time the columnsToIndex or cubeSize. This is ok if you want to change them, but it shouldn't be always explicit.

For example, if the user wants to append data to an existing table and maintain the same configuration, it should be able to write:

df.write.format("qbeast").save("existing-path")

instead of

df.write.format("qbeast").mode("append").option("columnsToIndex", "x,y,z").option("cubeSize", "10000").save("existing-path")

Move advanced config parameters to the SparkConf

Right now we load the advanced configurable parameters from the Typesafe. This force the user to use the --driver-java-options to change any of the values.

--driver-java-options="-Dqbeast.index.defaultCubeSize=100000
--driver-java-options="-Dqbeast.index.minPartitionCubeSize=100000

And the upcoming from PR #51
--driver-java-options="-Dqbeast.index.maxGroupSize=100000"

We should move this configuration to SparkConf, following the style of other similar systems and formats. So, when someone configures the environment of their spark applications, it should use the keyword--conf

For example:

 $SPARK_HOME/bin/spark-shell --master "local[4]" \
--conf spark.driver.memory=1g --jars ./target/scala-2.12/qbeast-spark-assembly-0.2.0.jar \
--conf io.qbeast.spark.defaultCubeSize=10000 \
--conf io.qbeast.spark.minPartitionCubeSize=1000 \
--conf io.qbeast.spark.maxGroupSize=100000 \
--conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \
--conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider \
--packages io.delta:delta-core_2.12:1.0.0

Also, the documentation should be updated accordingly, maybe adding a new section about the different configurations the user can manage.

Set transaction protocol for optimizing the index

On write data, we use the Delta Commit Protocol to ensure the atomicity of the operation.
On optimizing the index, we should encapsulate the write of new files (the ones belonging to the replicated cubes) on a Keeper transaction.

The pseudocode for resolving conflicting files should look like this:

  beginKeeperOperation()
  var tries = 2
    try {
      // Here new data is written
      val filesUpdates = writeData()
      while (tries > 0) {
        deltaLog.withNewTransaction(tnx => {
          val startingTnx = //number of transaction at the beginning of the operation
          try {
            tnx.commit(filesUpdates)
            tries = 0
            succeeded = Some(true)
          } catch {
            case cme: ConcurrentModificationException
                if isConflicted || tries == 0 => succeeded = Some(False); throw cme
            case _: ConcurrentModificationException => tries -= 1
          }
        })
      }

    } finally {
      endKeeperOperation()
    }

The block file tag minWeight should be equal to the minimum weight of the block elements

What went wrong?
The indexed data is written in blocks, each block has a separate file represented by AddFile instance. For each block file we define several custom tags including the minWeight and maxWeight which estimate the minimum and maximum weight of the block elements. Unfortunately the minWeight is always equal to the Weight.MinValue, which has negative impact on sampling. For example if user specifies the precision in range [0.001, 0.01] all the block files with minimum weight greater than Weight(0.01) should be skipped.

How to reproduce?

  1. Code that triggered the bug, or steps to reproduce:
    Save a simple non-empty data frame in qbeast format and observe the Delta log files (like _delta_log/0000000000.json). Every AddFile has a separate entry like the following

"add":{"path":"379f8503-c01c-4a99-9179-5064b9e81d79.parquet","partitionValues":{},"size":447665,"modificationTime":1633432831837,"dataChange":true,"stats":"","tags":{"state":"FLOODED","rowCount":"6203","cube":"gA","space":"{\"timestamp\":1633432303175,\"transformations\":[{\"min\":-8998.5,\"max\":26999.5,\"scale\":2.7779321073392966E-5}]}","minWeight":"-2147483648","maxWeight":"-2129049308","indexedColumns":"ss_item_sk"}}}
The custom tag "minWeight":"-2147483648" means that the minimum weight is Weight.MinValue.

  1. Branch and commit id: main at 7edbfc6

  2. Spark version: 3.1.1

  3. Hadoop version: 2.7

  4. Are you running Spark inside a container? Are you launching the app on a remote K8s cluster? Or are you just running the tests in a local computer?
    Is easily reproducible on a local computer

  5. Stack trace: N/A

Out of Memory on Executors due to Qbeast SparkDataWriter

What went wrong?
Indexing a dataset of 1M records /1 GB (from Delta to Qbeast Format) consumes too much memory on executors. Using EMR 6.4.0 with 8 nodes m5.xlarge (4 vCore, 16 GiB). I tried changing the cubeSize and the number of columns to index without success.

How to reproduce?

  1. Code that triggered the bug, or steps to reproduce:

Open spark-shell on EMR with the configuration:

/usr/bin/spark-shell  --jars /home/hadoop/qbeast-spark-assembly-*.jar \
   --conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \
   --packages io.delta:delta-core_2.12:1.0.0 

# Read the data and run
scala> df.write.format("qbeast").option("cubeSize","300000").option("columnsToIndex","columnA,columnB").save("s3a://somewhere")
  1. Branch and commit id:
    PR #69 on commit 04cfc46

  2. Spark version:
    EMR 6.4.0 - Spark 3.1.2

  3. Hadoop version:
    3.2.1

  4. Stack trace:

On a stage created by collect at SparkDataWriter.scala:56

22/02/02 16:33:50 WARN TaskSetManager: Lost task 7.0 in stage 19.0 (TID 252) (ip-172-31-15-68.eu-west-1.compute.internal executor 2): java.lang.OutOfMemoryError
	at sun.misc.Unsafe.allocateMemory(Native Method)
	at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:127)
	at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
	at org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
	at org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
	at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227)
	at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247)
	at org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405)
	at org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296)
	at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164)
	at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
	at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
	at org.apache.parquet.column.impl.ColumnWriterV1.accountForValueWritten(ColumnWriterV1.java:106)
	at org.apache.parquet.column.impl.ColumnWriterV1.write(ColumnWriterV1.java:200)
	at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.addBinary(MessageColumnIO.java:469)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.$anonfun$makeWriter$9(ParquetWriteSupport.scala:202)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.$anonfun$makeWriter$9$adapted(ParquetWriteSupport.scala:200)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.$anonfun$writeFields$1(ParquetWriteSupport.scala:158)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.consumeField(ParquetWriteSupport.scala:476)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.writeFields(ParquetWriteSupport.scala:158)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.$anonfun$write$1(ParquetWriteSupport.scala:148)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.consumeMessage(ParquetWriteSupport.scala:464)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:148)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:54)
	at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
	at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:182)
	at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:44)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:39)
	at io.qbeast.spark.index.writer.BlockWriter.$anonfun$writeRow$1(BlockWriter.scala:77)
	at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:162)
	at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:162)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429)
	at io.qbeast.spark.index.writer.BlockWriter.writeRow(BlockWriter.scala:54)
	at io.qbeast.spark.index.writer.SparkDataWriter$.$anonfun$write$1(SparkDataWriter.scala:56)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

[Delta] Add min-max column information

Right now we add block information on different metrics like cube, weight and state onto the delta commit log.

val tags = Map(
            cubeTag -> cube,
            weightMinTag -> minWeight.toString,
            weightMaxTag -> maxWeight.toString,
            stateTag -> state,
            spaceTag -> JsonUtils.toJson(cubeTransformation.transformations),
            indexedColsTag -> columnsToIndex.mkString(","),
            elementCountTag -> rowCount.toString)

For data-skipping to be optimal, we may need to collect information on another columns of interest (columns indexed..) Qbeast reading protocol can benefit for this stats in order to skip certain blocks that are not necessary for the query.

NullPointerException when indexing Strings in non-null columns.

What went wrong?
Indexing the table call_center from TPC-DS using non-null columns throws a NullPointerException. call_center is a really small table with 24 rows (in the 10Gb dataset), with no nulls in the indexed columns.
The column causing problems is cc_call_center_id, which is a String. In the following example, I tried to index using two columns, but I also did the test using only this column and it also fails (Indexing the other column, cc_call_center_sk, works well).

df.printSchema
root
 |-- cc_call_center_sk: integer (nullable = true)
 |-- cc_call_center_id: string (nullable = true)
  ...

There are only 24 rows with no nulls in them::

scala> df.select("cc_call_center_sk", "cc_call_center_id").where("cc_call_center_id is null or cc_call_center_sk is null").show()
+-----------------+-----------------+
|cc_call_center_sk|cc_call_center_id|
+-----------------+-----------------+
+-----------------+-----------------+

scala> df.select("cc_call_center_sk", "cc_call_center_id").show(30)
+-----------------+-----------------+
|cc_call_center_sk|cc_call_center_id|
+-----------------+-----------------+
|                1| AAAAAAAABAAAAAAA|
|                2| AAAAAAAACAAAAAAA|
|                3| AAAAAAAACAAAAAAA|
|                4| AAAAAAAAEAAAAAAA|
|                5| AAAAAAAAEAAAAAAA|
|                6| AAAAAAAAEAAAAAAA|
|                7| AAAAAAAAHAAAAAAA|
|                8| AAAAAAAAIAAAAAAA|
|                9| AAAAAAAAIAAAAAAA|
|               10| AAAAAAAAKAAAAAAA|
|               11| AAAAAAAAKAAAAAAA|
|               12| AAAAAAAAKAAAAAAA|
|               13| AAAAAAAANAAAAAAA|
|               14| AAAAAAAAOAAAAAAA|
|               15| AAAAAAAAOAAAAAAA|
|               16| AAAAAAAAABAAAAAA|
|               17| AAAAAAAAABAAAAAA|
|               18| AAAAAAAAABAAAAAA|
|               19| AAAAAAAADBAAAAAA|
|               20| AAAAAAAAEBAAAAAA|
|               21| AAAAAAAAEBAAAAAA|
|               22| AAAAAAAAGBAAAAAA|
|               23| AAAAAAAAGBAAAAAA|
|               24| AAAAAAAAGBAAAAAA|
+-----------------+-----------------+

How to reproduce?

  1. Code that triggered the bug, or steps to reproduce:
val df = spark.read.format("delta").load("s3://qbeast-benchmarking-us-east-1/datasets/10gb/delta/original-10gb-delta/call_center")

val columnsToIndex="cc_call_center_sk,cc_call_center_id"
df.write.format("qbeast").option("columnsToIndex",columnsToIndex).save("s3://qbeast-benchmarking-us-east-1/datasets/10gb/qbeast/pk-10gb-qbeast/call_center")
  1. Branch and commit id:
    Main, bcea74f

  2. Spark version:
    3.1.2-amzn-0

  3. Hadoop version:
    3.2.1-amzn-4

  4. Are you running Spark inside a container? Are you launching the app on a remote K8s cluster? Or are you just running the tests in a local computer?
    Running in an AWS EMR (emr-6.4.0) cluster with 2 nodes.

  5. Stack trace:

java.lang.NullPointerException                                                  
  at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1838)
  at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110)
  at java.lang.Double.parseDouble(Double.java:538)
  at scala.collection.immutable.StringLike.toDouble(StringLike.scala:321)
  at scala.collection.immutable.StringLike.toDouble$(StringLike.scala:321)
  at scala.collection.immutable.StringOps.toDouble(StringOps.scala:33)
  at io.qbeast.core.transform.ColumnStats$.apply(Transformer.scala:150)
  at io.qbeast.spark.index.DoublePassOTreeDataAnalyzer$.$anonfun$getColumnStats$2(OTreeDataAnalyzer.scala:161)
  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
  at scala.collection.Iterator.foreach(Iterator.scala:941)
  at scala.collection.Iterator.foreach$(Iterator.scala:941)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
  at scala.collection.IterableLike.foreach(IterableLike.scala:74)
  at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
  at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
  at scala.collection.TraversableLike.map(TraversableLike.scala:238)
  at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
  at scala.collection.AbstractTraversable.map(Traversable.scala:108)
  at io.qbeast.spark.index.DoublePassOTreeDataAnalyzer$.getColumnStats(OTreeDataAnalyzer.scala:155)
  at io.qbeast.spark.index.DoublePassOTreeDataAnalyzer$.analyze(OTreeDataAnalyzer.scala:171)
  at io.qbeast.spark.index.SparkOTreeManager$.index(SparkOTreeManager.scala:67)
  at io.qbeast.spark.index.SparkOTreeManager$.index(SparkOTreeManager.scala:22)
  at io.qbeast.spark.index.SparkOTreeManager$.index(SparkOTreeManager.scala:13)
  at io.qbeast.spark.table.IndexedTableImpl.$anonfun$doWrite$1(IndexedTable.scala:218)
  at io.qbeast.spark.delta.DeltaMetadataWriter.$anonfun$writeWithTransaction$1(DeltaMetadataWriter.scala:49)
  at io.qbeast.spark.delta.DeltaMetadataWriter.$anonfun$writeWithTransaction$1$adapted(DeltaMetadataWriter.scala:48)
  at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:187)
  at io.qbeast.spark.delta.DeltaMetadataWriter.writeWithTransaction(DeltaMetadataWriter.scala:48)
  at io.qbeast.spark.delta.SparkDeltaMetadataManager$.updateWithTransaction(SparkDeltaMetadataManager.scala:26)
  at io.qbeast.spark.delta.SparkDeltaMetadataManager$.updateWithTransaction(SparkDeltaMetadataManager.scala:16)
  at io.qbeast.spark.table.IndexedTableImpl.doWrite(IndexedTable.scala:216)
  at io.qbeast.spark.table.IndexedTableImpl.write(IndexedTable.scala:207)
  at io.qbeast.spark.table.IndexedTableImpl.save(IndexedTable.scala:165)
  at io.qbeast.spark.internal.sources.QbeastDataSource.createRelation(QbeastDataSource.scala:78)
  at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
  at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:194)
  at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:190)
  at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
  at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
  at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
  at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
  at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
  at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
  at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
  at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
  at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:409)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
  ... 47 elided

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.