Code Monkey home page Code Monkey logo

spark-acid's Introduction

Hive ACID Data Source for Apache Spark

A Datasource on top of Spark Datasource V1 APIs, that provides Spark support for Hive ACID transactions.

This datasource provides the capability to work with Hive ACID V2 tables, both Full ACID tables as well as Insert-Only tables.

functionality availability matrix

Functionality Full ACID table Insert Only Table
READ >= v0.4.0 >= v0.4.0
INSERT INTO / OVERWRITE >= v0.4.3 >= v0.4.4
CTAS >= v0.4.3 >= v0.4.4
UPDATE >= v0.5.0 Not Supported
DELETE >= v0.5.0 Not Supported
MERGE > v0.5.0 Not Supported
STREAMING INSERT >= v0.5.0 >= v0.5.0

Note: In case of insert only table for support of write operation compatibility check needs to be disabled

Quick Start

QuickStart

Prerequisite

These are the pre-requisites to using this library:

  1. You have Hive Metastore DB with version 3.1.2 or higher. Please refer to Hive Metastore for details.
  2. You have a Hive Metastore Server running with version 3.1.1 or higher, as Hive ACID needs a standalone Hive Metastore Server to operate. Please refer to Hive Configuration for configuration options.

Config

Change configuration in $SPARK_HOME/conf/hive-site.xml to point to already configured HMS server endpoint. If you meet the above pre-requisites, this is probably already configured.

<configuration>
  <property>
  <name>hive.metastore.uris</name>
    <!-- hostname must point to the Hive metastore URI in your cluster -->
    <value>thrift://hostname:10000</value>
    <description>URI for spark to contact the hive metastore server</description>
  </property>
</configuration>

Run

There are a few ways to use the library while running spark-shell

   `spark-shell --packages qubole:spark-acid:0.6.0-s_2.11
  1. If you built the jar yourself, copy the spark-acid-assembly-0.6.0.jar jar into $SPARK_HOME/assembly/target/scala.2_11/jars and run

    spark-shell
    

DataFrame API

To operate on Hive ACID table from Scala / pySpark, the table can be directly accessed using this datasource. Note the short name of this datasource is HiveAcid. Hive ACID table are tables in HiveMetastore so any operation of read and/or write needs format("HiveAcid").option("table", "<table name>""). Direct read and write from the file is not supported

scala> val df = spark.read.format("HiveAcid").options(Map("table" -> "default.acidtbl")).load()
scala> df.collect()

SQL

To read an existing Hive acid table through pure SQL, there are two ways:

  1. Use SparkSession extensions framework to add a new Analyzer rule (HiveAcidAutoConvert) to Spark Analyser. This analyzer rule automatically converts an HiveTableRelation representing acid table to LogicalRelation backed by HiveAcidRelation.

    To use this, initialize SparkSession with the extension builder as mentioned below:

         val spark = SparkSession.builder()
           .appName("Hive-acid-test")
           .config("spark.sql.extensions", "com.qubole.spark.hiveacid.HiveAcidAutoConvertExtension")
           .enableHiveSupport()
           .<OTHER OPTIONS>
           .getOrCreate()
    
         spark.sql("select * from default.acidtbl")
    
  2. Create a dummy table that acts as a symlink to the original acid table. This symlink is required to instruct Spark to use this datasource against an existing table.

    To create the symlink table:

     spark.sql("create table symlinkacidtable using HiveAcid options ('table' 'default.acidtbl')")
    
     spark.sql("select * from symlinkacidtable")
    

    Note: This will produce a warning indicating that Hive does not understand this format

    WARN hive.HiveExternalCatalog: Couldn’t find corresponding Hive SerDe for data source provider com.qubole.spark.hiveacid.datasource.HiveAcidDataSource. Persisting data source table default.sparkacidtbl into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.

    Please ignore it, as this is a sym table for Spark to operate with and no underlying storage.

Usage

This section talks about major functionality provided by the data source and example code snippets for them.

Create/Drop ACID Table

SQL Syntax

Same as CREATE and DROP supported by Spark SQL.

Examples

Drop Existing table

spark.sql("DROP TABLE if exists acid.acidtbl")

Create table

spark.sql("CREATE TABLE acid.acidtbl (status BOOLEAN, tweet ARRAY<STRING>, rank DOUBLE, username STRING) STORED AS ORC TBLPROPERTIES('TRANSACTIONAL' = 'true')")

Note: Table property 'TRANSACTIONAL' = 'true' is required to create ACID table

Check if it is transactional

spark.sql("DESCRIBE extended acid.acidtbl").show()

Read ACID Table

DataFrame API

Read acid table

val df = spark.read.format("HiveAcid").options(Map("table" -> "acid.acidtbl")).load()
df.select("status", "rank").filter($"rank" > "20").show()

Read acid via implicit API

import com.qubole.spark.hiveacid._

val df = spark.read.hiveacid("acid.acidtbl")
df.select("status", "rank").filter($"rank" > "20").show()

SQL SYNTAX

Same as SELECT supported by Spark SQL.

Example
spark.sql("SELECT status, rank from acid.acidtbl where rank > 20")

Note: com.qubole.spark.hiveacid.HiveAcidAutoConvertExtension has to be added to spark.sql.extensions for above.

Batch Write into ACID Table

DataFrame API

Insert into

val df = spark.read.parquet("tbldata.parquet")
df.write.format("HiveAcid").option("table", "acid.acidtbl").mode("append").save()

Insert overwrite

val df = spark.read.parquet("tbldata.parquet")
df.write.format("HiveAcid").option("table", "acid.acidtbl").mode("overwrite").save()

Insert into using implicit API

import com.qubole.spark.hiveacid._

val df = spark.read.parquet("tbldata.parquet")
df.write.hiveacid("acid.acidtbl", "append")

SQL Syntax

Same as INSERT supported by Spark SQL

Example

Insert into the table select as

spark.sql("INSERT INTO acid.acidtbl select * from sample_data")

Insert overwrite the table select as

spark.sql("INSERT OVERWRITE TABLE acid.acidtbl select * from sample_data")

Insert into"

spark.sql("INSERT INTO acid.acidtbl VALUES(false, array("test"), 11.2, 'qubole')")

Note: com.qubole.spark.hiveacid.HiveAcidAutoConvertExtension has to be added to spark.sql.extensions for above SQL statements.

Stream Write into ACID Table

ACID table supports streaming writes and can also be used as a Streaming Sink. Streaming write happens under transactional guarantees which allows other concurrent writes to the same table either streaming writes or batch writes. For exactly-once semantics, spark.acid.streaming.log.metadataDir is specified to store the latest batchId processed. Note, that concurrent streaming writes to the same table should have different metadataDir specified.

val query = newDf
  .writeStream
  .format("HiveAcid")
  .options(Map(
    "table" ->"acid.acidtbl",
    "spark.acid.streaming.log.metadataDir"->"/tmp/metadataDir"))
  .outputMode(OutputMode.Append)
  .option("checkpointLocation", "/tmp/checkpointDir")
  .start()

Updates

SQL Syntax

UPDATE tablename SET column = updateExp [, column = updateExp ...] [WHERE expression]
  • column must be a column of the table being updated.
  • updateExp is an expression that Spark supports in the SELECT clause. Subqueries are not supported.
  • WHERE clause specifies the row to be updated.
  • Partitioning columns cannot be updated.
  • Bucketed table are not supported currently.

Example

spark.sql("UPDATE acid.acidtbl set rank = rank - 1, status = true where rank > 20 and rank < 25 and status = false")

Note: com.qubole.spark.hiveacid.HiveAcidAutoConvertExtension has to be added to spark.sql.extensions for above.

Deletes

SQL syntax

DELETE FROM tablename [WHERE expression]
  • WHERE clause specifies rows to be deleted from tablename.
  • Bucketed tables are not supported currently.
Example
DELETE from acid.acidtbl where rank = 1000

Note: com.qubole.spark.hiveacid.HiveAcidAutoConvertExtension has to be added to spark.sql.extensions for above.

Merge

SQL syntax

MERGE INTO <target table> [AS T] USING <source table> [AS S]
ON <boolean merge expression>
WHEN MATCHED [AND <boolean expression1>] THEN <match_clause>
WHEN MATCHED [AND <boolean expression2>] THEN <match_clause>
WHEN NOT MATCHED [AND <boolean expression3>] THEN INSERT VALUES ( <insert value list> )

<match_clause> ::
                    UPDATE SET <set clause list>
                    DELETE

<insert value list> :: 
                    value 1 [, value 2, value 3, ...]
                    [value 1, value 2, ...] * [, value n, value n+1, ...]

<update set list> ::
                    target_col1 = value 1 [, target_col2 = value 2 ...]
  • <target table> needs to be Full ACID table. T is optional placeholder for target alias.
  • <source table> needs to be a table defined. You can use functions like createOrReplaceTempView to store source DataFrames as tables. S is optional placeholder for source alias.
  • <merge expression> are the join expressions used as merge condition
  • Match clauses (UPDATE and DELETE)
    • At most 2 match clauses are allowed i.e., minimum 0 and maximum 2.
    • Only UPDATE and DELETE operations are supported in match clause.
    • <boolean expression1> and <boolean expression2> are optional match conditions.
    • If 2 match clauses are specified:
      • Both should be different operations.
      • First match clause should have a match condition.
      • If a target row qualifies for both match clauses as their match conditions overlap, then only the first clause will be executed on them.
  • INSERT clause
    • is only supported for non-matched clause.
    • supports * to be used anywhere in value list and it resolves into source table columns.
    • values to be inserted should exactly match the number of target columns after * resolution and also match corresponding data type.
  • Cardinality Check: SQL standard enforces that one row of target doesn't match multiple rows of source. This check is enforced and runtime exception is thrown if it is violated.

Example

MERGE INTO target as t USING source as s
ON t.id = s.id
WHEN MATCHED AND t.city = 'Bangalore' THEN UPDATE t.city = s.city
WHEN MATCHED AND t.dept = 'closed' THEN DELETE
WHEN NOT MATCHED AND t.city = ('Bangalore', 'San Jose') THEN INSERT VALUES (*, '07', '2020')

Performance consideration

  • MERGE operation is a pretty loaded statement and can be expensive in nature.
  • MERGE operation will perform Right Outer Join between target and source. This can lead to full table scan of the target table. Please consider partitioning the target table and only mentioning required partitions in merge condition for MERGE operations.
  • When only INSERT clause is present, Left Anti Join between source and target will be performed. It will be cheaper than Right Outer Join between target and source.
  • Cardinality check (as described above) also requires Join. We reuse the same Right Outer Join done for MERGE operation to avoid extra Join for this check. When only INSERT clause is present this check is not done as it is not required.

Version Compatibility

Compatibility with Apache Spark Versions

ACID datasource has been tested to work with Apache Spark 2.4.3, but it should work with older versions as well. However, because of a Hive dependency, this datasource needs Hadoop version 2.8.2 or higher due to HADOOP-14683

NB: Hive ACID V2 is supported in Hive 3.0.0 onwards and for that hive Metastore db needs to be upgraded to 3.0.0 or above.

Data Storage Compatibility

  1. ACID datasource does not control data storage format and layout, which is managed by Hive. It works with data written by Hive version 3.0.0 and above. Please see Hive ACID storage layout.

  2. ACID datasource works with data stored on local files, HDFS as well as cloud blobstores (AWS S3, Azure Blob Storage etc).

Developer resources

Build

  1. First, build the dependencies and publish it to local. The shaded-dependencies sub-project is an sbt project to create the shaded hive metastore and hive exec jars combined into a fat jar spark-acid-shaded-dependencies. This is required due to our dependency on Hive 3 for Hive ACID, and Spark currently only supports Hive 1.2. To compile and publish shaded dependencies jar:

    cd shaded-dependencies sbt clean publishLocal

  2. Next, build the main project:

    sbt assembly

This will create the spark-acid-assembly-0.6.0.jar which can be now used in your application.

Test

Tests are run against a standalone docker setup. Please refer to [Docker setup] (docker/README.md) to build and start a container.

NB: Container run HMS server, HS2 Server and HDFS and listens on port 10000,10001 and 9000 respectively. So stop if you are running HMS or HDFS on same port on host machine.

To run the full integration test:

sbt test

Release

To release a new version use

sbt release

To publish a new version use

sbt spPublish

Read more about sbt release

Design Constraints

  1. This datasource when it needs to read data, it talks to the HiveMetaStore Server to get the list of transactions that have been committed, and using that, the list of files it should read from the filesystem (uses s3 listing). Given the snapshot of list of file is created by using listing, to avoid inconsistent copy of data, on cloud object store service like S3 guard should be used.

  2. This snapshot of list of files is created at the RDD level. These snapshot are at the RDD level so even when using same table in single SQL it may be operating on two different snapshots

    spark.sql("select * from a join a)

  3. The files in the snapshot needs to be protected till the RDD is in use. By design concurrent reads and writes on the Hive ACID works with the help of locks, where every client (across multiple engines) that is operating on ACID tables is expected to acquire locks for the duration of reads and writes. The lifetime of RDD can be very long, to avoid blocking other operations like inserts this datasource DOES NOT acquire lock but uses an alternative mechanism to protect reads. Other way the snapshot can be protected is by making sure the files in the snapshot are not deleted while in use. For the current datasoure any table on which Spark is operating Automatic Compaction should be disabled. This makes sure that cleaner does not clean any file. To disable automatic compaction on table

      ALTER TABLE <> SET TBLPROPERTIES ("NO_AUTO_COMPACTION"="true")
    

    When the table is not in use cleaner can be enabled and all the files that needs cleaned will get queued up for cleaner. Disabling compaction do have performance implication on reads/writes as lot of delta file may need to be merged when performing read.

  4. Note that even though reads are protected admin operation like TRUNCATE ALTER TABLE DROP COLUMN and DROP have no protection as they clean files with intevention from cleaner. These operations should be performed when Spark is not using the table.

Contributing

  1. You can join the group for queries and discussions by sending email to: [email protected] On subscribing, you will be sent email to confirm joining the group.

  2. We use Github Issues to track issues. Please feel free to open an issue for any queries, bugs and feature requests.

  3. Pull Request can be raised against any open issues and are most welcome. Processes or guidelines for the same is not formal currently.

Reporting bugs or feature requests

Please use the github issues for the acid-ds project to report issues or raise feature requests. You can also join this group to discuss them: [email protected]

Known Issues

  1. Insert in static partitions is not supported via spark acid. For example query like "insert into tbl partition (p1=1) ...." will not work. It is because spark currently does not support partitioned datasources. It only supports partitions in Hive table relation or a file based relation. But spark acid relation is neither of them.
  2. Because of an open source issue HIVE-21052, users started hitting the issue described by @amoghmargoor in this comment for partitioned tables. The workaround of the issue HIVE-21052 is that we don't set dynamic partition flag when making lock request in spark acid. HIVE-21052 will not have any impact on the functionality and it is expected to continue to work well. However users will have to do some manual cleanups. They are not serious in nature and can be performed once in a while. These are:
  • If transaction is successful

    • For every transaction on partitioned table, now a null entry gets created in COMPLETED_TXN_COMPONENTS table when a transaction is moved from TXN_COMPONENTS. This entry does not get removed after compaction. Note that there would be only one such null entry in COMPLETED_TXN_COMPONENTS table for a transaction. For example if your transaction has touched 100 partitions only one entry of null partitions would get created. So the null entries should not overwhelm the table.

    • To delete this null entry from COMPLETED_TXN_COMPONENTS table, manually run this query once in a while on metastore db. Note that this is only applicable on partitioned tables

      DELETE FROM completed_txn_components WHERE ctc_partition IS NULL AND ctc_writeid IS NULL AND ctc_table = <TABLE_NAME>

  • If transaction is aborted

    • Transaction now remains in TXN_COMPONENTS and TXNS tables. Any future reads are not impacted by this aborted transaction though.
    • The cleanup involves 2 simple steps to be followed for partitioned tables in the following order. These are:
      1. Delete files in the object store which were written by the aborted transaction. To find out the write id the simple sql query is:

        SELECT t.TXN_ID, T2W_WRITEID as WRITE_ID from TXNS as t JOIN TXN_COMPONENTS as tc ON tc.TC_TXNID = t.TXN_ID JOIN TXN_TO_WRITE_ID as tw on t.TXN_ID = tw.T2W_TXNID and t.TXN_STATE = 'a' and tc.TC_PARTITION is NULL

        For example if your write ID is 4, then you will need to cleanup all delta/delta_delete/base directories with name: delta_0000004_0000004/delete_delta_0000004_0000004/base_0000004

      2. Delete entry from TXN_COMPONENTS table for aborted transaction. Complete sql query looks like

        WITH aborted_transactions AS ( SELECT t.txn_id, tw.t2w_writeid AS write_id FROM txns AS t JOIN txn_components AS tc ON t.txn_id = tc.tc_txnid JOIN txn_to_write_id AS tw ON t.txn_id = tw.t2w_txnid AND t.txn_state = 'a' AND tc.tc_partition IS NULL ) DELETE FROM txn_components WHERE tc_txnid IN (SELECT txn_id FROM aborted_transactions)

spark-acid's People

Contributors

abhishekd0907 avatar amoghmargoor avatar citrusraj avatar indit-qubole avatar maheshk114 avatar prakharjain09 avatar shuwnyuantee avatar sourabh912 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

spark-acid's Issues

Add SQL support for Update/Delete

PR #18 is adding the SQL support for Update and Delete. It currently is copying the entire grammar and its associated visitors etc, but can we find alternate way to do it where we extend the Spark Parser instead of code copy.

Setting NO_AUTO_COMPACTION for Hive Table

Referring to your doc: https://github.com/qubole/spark-acid#design-constraints => item (3)

It mentions about To disable automatic compaction on table with:

ALTER TABLE <> SET TBLPROPERTIES ("NO_AUTO_COMPACTION"="true")

We have a Hive table which is continuously written into via Spark Structured Streaming with:

val output_stream_df = data.writeStream.format("HiveAcid")....

Is it compulsory to set "NO_AUTO_COMPACTION"="true" on this table?

Does it mean we need to handle compaction manually? What is the suggested approach to handle this?

What came to my mind is to schedule a cron job to do the table compaction manually, something like this:

  • stop the Spark app (app1) that streams (writes) into the Hive table (myTable)
  • do table compaction: ALTER TABLE myTable COMPACT 'minor' / 'major'
  • start Spark app1

Does this make sense to you? If not, is there any example / link on how to handle this?

From your source code, I found this:
src/main/scala/com/qubole/spark/hiveacid/streaming/HiveAcidSinkOptions.scala: val COMPACT_INTERVAL_KEY = "spark.acid.streaming.log.compactInterval"

Does this related to table compaction I mentioned above? By setting spark.acid.streaming.log.compactInterval, will Spark Acid automatically handle the table compaction for us?

Thanks!

Can I build it for Spark 2.3.2 and Hive 3.1.0

Hi,

I would like to use this project for Spark 2.3.2 and Hive 3.1.0,

  1. Updated hive version inside shaded-dependency -> build.sbt
    (val hive_version = sys.props.getOrElse("hive.version", "3.1.0")
  2. Updated spark version in build.sbt
    (sparkVersion := sys.props.getOrElse("spark.version", "2.3.2"))

But my build is failing with below error:

[info] Compiling 43 Scala sources and 6 Java sources to /Users/snchitg/Projects/spark-acid/target/scala-2.11/classes...
[error] /Users/snchitg/Projects/spark-acid/src/main/scala/com/qubole/spark/datasources/hiveacid/sql/catalyst/parser/AstBuilder.scala:327: too many arguments for method apply: (left: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan, right: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan)org.apache.spark.sql.catalyst.plans.logical.Intersect in object Intersect
[error] Intersect(left, right, isAll = true)
[error] ^
[error] /Users/snchitg/Projects/spark-acid/src/main/scala/com/qubole/spark/datasources/hiveacid/sql/catalyst/parser/AstBuilder.scala:329: too many arguments for method apply: (left: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan, right: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan)org.apache.spark.sql.catalyst.plans.logical.Intersect in object Intersect
[error] Intersect(left, right, isAll = false)
[error] ^
[error] /Users/snchitg/Projects/spark-acid/src/main/scala/com/qubole/spark/datasources/hiveacid/sql/catalyst/parser/AstBuilder.scala:331: too many arguments for method apply: (left: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan, right: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan)org.apache.spark.sql.catalyst.plans.logical.Except in object Except
[error] Except(left, right, isAll = true)
[error] ^
[error] /Users/snchitg/Projects/spark-acid/src/main/scala/com/qubole/spark/datasources/hiveacid/sql/catalyst/parser/AstBuilder.scala:333: too many arguments for method apply: (left: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan, right: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan)org.apache.spark.sql.catalyst.plans.logical.Except in object Except
[error] Except(left, right, isAll = false)
[error] ^
[error] /Users/snchitg/Projects/spark-acid/src/main/scala/com/qubole/spark/datasources/hiveacid/sql/catalyst/parser/AstBuilder.scala:335: too many arguments for method apply: (left: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan, right: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan)org.apache.spark.sql.catalyst.plans.logical.Except in object Except
[error] Except(left, right, isAll = true)
[error] ^
[error] /Users/snchitg/Projects/spark-acid/src/main/scala/com/qubole/spark/datasources/hiveacid/sql/catalyst/parser/AstBuilder.scala:337: too many arguments for method apply: (left: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan, right: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan)org.apache.spark.sql.catalyst.plans.logical.Except in object Except
[error] Except(left, right, isAll = false)
[error] ^
[error] /Users/snchitg/Projects/spark-acid/src/main/scala/com/qubole/spark/datasources/hiveacid/sql/catalyst/parser/AstBuilder.scala:432: type mismatch;
[error] found : None.type
[error] required: Seq[org.apache.spark.sql.catalyst.expressions.NamedExpression]
[error] Pivot(None, pivotColumn, pivotValues, aggregates, query)
[error] ^
[error] /Users/snchitg/Projects/spark-acid/src/main/scala/com/qubole/spark/datasources/hiveacid/sql/catalyst/parser/AstBuilder.scala:432: type mismatch;
[error] found : scala.collection.mutable.Buffer[org.apache.spark.sql.catalyst.expressions.Expression]
[error] required: Seq[org.apache.spark.sql.catalyst.expressions.Literal]
[error] Pivot(None, pivotColumn, pivotValues, aggregates, query)
[error] ^
[error] /Users/snchitg/Projects/spark-acid/src/main/scala/com/qubole/spark/datasources/hiveacid/sql/catalyst/parser/AstBuilder.scala:903: not found: value InSubquery
[error] invertIfNotDefined(InSubquery(getValueExpressions(e), ListQuery(plan(ctx.query))))
[error] ^
[error] /Users/snchitg/Projects/spark-acid/src/main/scala/com/qubole/spark/datasources/hiveacid/sql/catalyst/parser/AstBuilder.scala:1110: not found: value LambdaFunction
[error] LambdaFunction(expression(ctx.expression), arguments)
[error] ^
[error] /Users/snchitg/Projects/spark-acid/src/main/scala/com/qubole/spark/datasources/hiveacid/sql/execution/SparkAcidSqlParser.scala:72: value setOpsPrecedenceEnforced is not a member of org.apache.spark.sql.internal.SQLConf
[error] lexer.legacy_setops_precedence_enbled = SQLConf.get.setOpsPrecedenceEnforced
[error] ^
[error] /Users/snchitg/Projects/spark-acid/src/main/scala/com/qubole/spark/datasources/hiveacid/sql/execution/SparkAcidSqlParser.scala:79: value setOpsPrecedenceEnforced is not a member of org.apache.spark.sql.internal.SQLConf
[error] parser.legacy_setops_precedence_enbled = SQLConf.get.setOpsPrecedenceEnforced
[error] ^
[error] /Users/snchitg/Projects/spark-acid/src/main/scala/com/qubole/spark/hiveacid/HiveAcidAutoConvert.scala:55: value resolveOperators is not a member of org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
[error] plan resolveOperators {
[error] ^
[error] /Users/snchitg/Projects/spark-acid/src/main/scala/com/qubole/spark/hiveacid/HiveAcidTable.scala:225: type mismatch;
[error] found : Seq[String]
[error] required: Option[String]
[error] x.withQualifier(hiveAcidMetadata.fullyQualifiedName.split('.').toSeq))
[error] ^
[error] /Users/snchitg/Projects/spark-acid/src/main/scala/com/qubole/spark/hiveacid/HiveAcidTable.scala:169: non-variable type argument org.apache.spark.sql.Row in type pattern org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] (the underlying of org.apache.spark.sql.DataFrame) is unchecked since it is eliminated by erasure
[error] val (df: DataFrame, qualifiedPlan: LogicalPlan, resolvedDf: DataFrame) = getResolvedReadDF
[error] ^
[error] /Users/snchitg/Projects/spark-acid/src/main/scala/com/qubole/spark/hiveacid/HiveAcidTable.scala:169: non-variable type argument org.apache.spark.sql.Row in type pattern org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] (the underlying of org.apache.spark.sql.DataFrame) is unchecked since it is eliminated by erasure
[error] val (df: DataFrame, qualifiedPlan: LogicalPlan, resolvedDf: DataFrame) = getResolvedReadDF
[error] ^
[error] /Users/snchitg/Projects/spark-acid/src/main/scala/com/qubole/spark/hiveacid/HiveAcidTable.scala:324: non-variable type argument org.apache.spark.sql.Row in type pattern org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] (the underlying of org.apache.spark.sql.DataFrame) is unchecked since it is eliminated by erasure
[error] val (_, qualifiedPlan: LogicalPlan, resolvedDf: DataFrame) = getResolvedReadDF
[error] ^
[error] /Users/snchitg/Projects/spark-acid/src/main/scala/com/qubole/spark/hiveacid/rdd/HiveAcidRDD.scala:286: overloaded method value addTaskCompletionListener with alternatives:
[error] (f: org.apache.spark.TaskContext => Unit)org.apache.spark.TaskContext
[error] (listener: org.apache.spark.util.TaskCompletionListener)org.apache.spark.TaskContext
[error] does not take type parameters
[error] context.addTaskCompletionListener[Unit] { _ =>
[error] ^
[error] /Users/snchitg/Projects/spark-acid/src/main/scala/com/qubole/spark/hiveacid/writer/hive/HiveAcidWriter.scala:64: type mismatch;
[error] found : Boolean(true)
[error] required: org.apache.spark.sql.types.DataType
[error] Seq(true, true))
[error] ^
[error] /Users/snchitg/Projects/spark-acid/src/main/scala/com/qubole/spark/hiveacid/writer/hive/HiveAcidWriter.scala:64: type mismatch;
[error] found : Boolean(true)
[error] required: org.apache.spark.sql.types.DataType
[error] Seq(true, true))
[error] ^
[error] 20 errors found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 19 s, completed 20 Apr 2020, 20:59:33

Don't set dynamic partition flag when taking lock on a partitioned tables with dynamic partitions

As part of issue-44, we made a change in which we set dynamic partition flag to true in the lock request if table is partitioned and partitions being touched are dynamic. But because of an open source issue HIVE-21052, users started hitting the issue described by @amoghmargoor in this comment.

The workaround of the issue HIVE-21052 is that we don't set dynamic partition flag when making lock request. With this workaround, we have observed the following:

  • If transaction is successful

    • It is moved to COMPLETED_TXN_COMPONENTS table and has all the partitions information which were touched in the transaction. Compaction (whenever gets kicked in) happens correctly.
    • The entry in COMPLETED_TXN_COMPONENTS gets cleaned up after the compaction runs successfully.
  • If transaction is aborted

    • Transaction now remains in TXN_COMPONENTS and TXN tables. Any future reads don't read the data written by this transaction.
    • Initiator (which is a part of compactor) does not remove this transaction from TXN_COMPONENTS resulting into a leak. For now this needs to be handled manually (i.e remove the files written by this transaction and then delete db entry) if required.

Difference between qubole/spark-acid and Hive-warehouse-connector

Hello everyone! Thanks for the great library. Now I'm working on research of achieving acid on hive tables using spark. Everything what I found are qubole/spark-acid (your library) and hive-warehouse-connector (https://docs.hortonworks.com/HDPDocuments/HDP3/HDP-3.1.0/integrating-hive/content/hive_hivewarehouseconnector_for_handling_apache_spark_data.html). So I'm confused what to use and how to decide what I need.

Could you please describe difference what your library can or cannot in comparing to hive-warehouse-connector? What's the difference at all? Thanks!

Unlock the transaction which could not acquire lock as it may be in waiting state.

So this can cause the bug as follows:

mysql> select * from HIVE_LOCKS;
+----------------+----------------+----------+----------------+----------------+--------------+---------------+--------------+-------------------+----------------+---------+------------+--------------------+---------------+---------------------+---------------------+
| HL_LOCK_EXT_ID | HL_LOCK_INT_ID | HL_TXNID | HL_DB | HL_TABLE | HL_PARTITION | HL_LOCK_STATE | HL_LOCK_TYPE | HL_LAST_HEARTBEAT | HL_ACQUIRED_AT | HL_USER | HL_HOST | HL_HEARTBEAT_COUNT | HL_AGENT_INFO | HL_BLOCKEDBY_EXT_ID | HL_BLOCKEDBY_INT_ID |
+----------------+----------------+----------+----------------+----------------+--------------+---------------+--------------+-------------------+----------------+---------+------------+--------------------+---------------+---------------------+---------------------+
| 314 | 1 | 384 | hivetestlockdb | nonpartitioned | NULL | a | e | 0 | 1591911214000 | amoghm | amoghm-mbp | NULL | HiveAcid | NULL | NULL |
| 315 | 1 | 385 | hivetestlockdb | nonpartitioned | NULL | w | w | 0 | NULL | amoghm | amoghm-mbp | NULL | HiveAcid | 314 | 1 |
| 316 | 1 | 385 | hivetestlockdb | nonpartitioned | NULL | a | w | 0 | 1591911260000 | amoghm | amoghm-mbp | NULL | HiveAcid | NULL | NULL |
+----------------+----------------+----------+----------------+----------------+--------------+---------------+--------------+-------------------+----------------+---------+------------+--------------------+---------------+---------------------+---------------------+
So there is transaction 314 which took exclusive lock (edited)
after with transaction 315 wanted to take Write lock and it was put in wait state which is expected
but when transaction 315 again tried to take Write Lock in new request the lock was acquired as shown in snapshot above
so at a point 314 holds exclusive lock and 315 holds Write Lock on the same table (nonpartitioned)
this seems like a bug to me.

sparkSQL Error : instantiating 'org.apache.spark.sql.hive.HiveSessionStateBuilder

when I use assembly-0.5.0.jar to access acid table , it seems that dataframe API works well, reading or writing table , either partitioned or not.

But I can not use spark.sql() to access acid table,I init sparkSession following instruction :

val spark = SparkSession.builder() .appName("Hive-acid-test") .config("spark.sql.extensions", "com.qubole.spark.hiveacid.HiveAcidAutoConvertExtension") .enableHiveSupport() .<OTHER OPTIONS> .getOrCreate()
it will not work above, But if I add the config after sparkSession build
spark.conf.set("spark.sql.extensions", "com.qubole.spark.hiveacid.HiveAcidAutoConvertExtension")
it seems used hiveacid.sql.execution.SparkAcidSqlParser to parse sql

but then an execption has been thrown:
20/06/23 22:12:18 ERROR ApplicationMaster: User class threw exception: java.lang.IllegalArgumentException: Error while instantiating 'org.apache.spark.sql.hive.HiveSessionStateBuilder': java.lang.IllegalArgumentException: Error while instantiating 'org.apache.spark.sql.hive.HiveSessionStateBuilder': at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$instantiateSessionState(SparkSession.scala:1107) at org.apache.spark.sql.SparkSession$$anonfun$sessionState$2.apply(SparkSession.scala:145) at org.apache.spark.sql.SparkSession$$anonfun$sessionState$2.apply(SparkSession.scala:144) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:144) at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:141) at org.apache.spark.sql.SparkSession.conf$lzycompute(SparkSession.scala:170) at org.apache.spark.sql.SparkSession.conf(SparkSession.scala:170) at com.ccb.camp.load.SparkAcidTestSQL$.testReadSQL(SparkAcidTestSQL.scala:41) at com.ccb.camp.load.SparkAcidTestSQL$.main(SparkAcidTestSQL.scala:25) at com.ccb.camp.load.SparkAcidTestSQL.main(SparkAcidTestSQL.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:684) Caused by: java.lang.IllegalArgumentException: Can not set final org.apache.spark.sql.internal.VariableSubstitution field org.apache.spark.sql.execution.SparkSqlParser.substitutor to com.qubole.spark.datasources.hiveacid.sql.execution.SparkAcidSqlParser at sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:167) at sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:171) at sun.reflect.UnsafeFieldAccessorImpl.ensureObj(UnsafeFieldAccessorImpl.java:58) at sun.reflect.UnsafeQualifiedObjectFieldAccessorImpl.get(UnsafeQualifiedObjectFieldAccessorImpl.java:38) at java.lang.reflect.Field.get(Field.java:393) at com.qubole.spark.datasources.hiveacid.sql.execution.SparkAcidSqlParser.<init>(SparkAcidSqlParser.scala:35) at com.qubole.spark.hiveacid.HiveAcidAutoConvertExtension$$anonfun$apply$3.apply(HiveAcidAutoConvert.scala:73) at com.qubole.spark.hiveacid.HiveAcidAutoConvertExtension$$anonfun$apply$3.apply(HiveAcidAutoConvert.scala:72) at org.apache.spark.sql.SparkSessionExtensions$$anonfun$buildParser$1.apply(SparkSessionExtensions.scala:161) at org.apache.spark.sql.SparkSessionExtensions$$anonfun$buildParser$1.apply(SparkSessionExtensions.scala:160) at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57) at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66) at scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:48) at org.apache.spark.sql.SparkSessionExtensions.buildParser(SparkSessionExtensions.scala:160) at org.apache.spark.sql.internal.BaseSessionStateBuilder.sqlParser$lzycompute(BaseSessionStateBuilder.scala:117) at org.apache.spark.sql.internal.BaseSessionStateBuilder.sqlParser(BaseSessionStateBuilder.scala:116) at org.apache.spark.sql.internal.BaseSessionStateBuilder.build(BaseSessionStateBuilder.scala:292) at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$instantiateSessionState(SparkSession.scala:1104) ... 15 more 20/06/23 22:12:18 INFO SparkContext: Invoking stop() from shutdown hook

Table not found exception

Hi,

I was trying to use the spark-acid library from pyspark 2.3.1 and got the InvalidTable exception.
Specifically, I ran the following commands:

$ pyspark --jars spark-acid-0.4.0.jar
>>> spark.read.format("HiveAcid").option("table", "default.test_sa").load()
19/08/21 13:42:38 ERROR Hive: Table default.test_sa not found: spark.default.test_sa table not found
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/readwriter.py", line 172, in load
    return self._df(self._jreader.load())
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o75.load.
: com.qubole.shaded.hadoop.hive.ql.metadata.InvalidTableException: Table not found test_sa
        at com.qubole.shaded.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:1133)
        at com.qubole.shaded.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:1103)
        at com.qubole.spark.datasources.hiveacid.HiveAcidRelation.<init>(HiveAcidRelation.scala:67)
        at com.qubole.spark.datasources.hiveacid.HiveAcidDataSource.createRelation(HiveAcidDataSource.scala:34)
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:340)
        at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:745)

I'm running the HDP 3.0.1 Hadoop distribution with Hive 3.1. The Hive table was created using the following statements:

CREATE TABLE default.test_sa (a string, b string) STORED AS ORC;
INSERT INTO default.test_sa ('a', 'a'), ('b', 'b');

Am I using unsupported stack? Can you help me debug this issue?

Thank you

Support for Complex Data Types.

This is place holder issue for adding support for complex data type. This includes full qualified name in

  1. Project list in Select
  2. Filter clause for update / delete / select
  3. Set clause in update.

Handle task retries gracefully in Spark ACID OSS

Tasks writing data into Hive ACID if failed then would be retried like normal Spark Tasks. But if failed task would have created bucket file under delta directory, then new task would fail like specified in this issue: #41

In Qubole's Hive fork are overwriting the file instead of throwing error and hence things work. We need to solve this for Open source.

Performance improvements in acid writer

AcidWriter process every InternalRow one by one in process() method. We identified few places in the code flow where redundant objects were being created, which were slowing down the write performance.

In our internal environments, we measured performance of insert/insertOverwrite queries and have seen 25-30% improvement. The patch should also improve performance of update/delete operations since some of the code flow is common for all write operations.

illegal state exception in read code flow.

[info] org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 16.0 failed 1 times, most recent failure: Lost task 0.0 in stage 16.0 (TID 112, localhost, executor driver): java.lang.IllegalStateException: Current state = FLUSHED, new state = CODING_END[info] at java.nio.charset.CharsetDecoder.throwIllegalStateException(CharsetDecoder.java:992)[info] at java.nio.charset.CharsetDecoder.decode(CharsetDecoder.java:572)[info] at java.nio.charset.CharsetDecoder.decode(CharsetDecoder.java:802)[info] at com.qubole.shaded.hadoop.hive.ql.io.orc.OrcRecordUpdater.parseKeyIndex(OrcRecordUpdater.java:628)[info] at com.qubole.shaded.hadoop.hive.ql.io.orc.OrcRawRecordMerger.discoverKeyBounds(OrcRawRecordMerger.java:744)[info] at com.qubole.shaded.hadoop.hive.ql.io.orc.OrcRawRecordMerger.<init>(OrcRawRecordMerger.java:1007)[info] at com.qubole.shaded.hadoop.hive.ql.io.orc.OrcInputFormat.getReader(OrcInputFormat.java:2091)[info] at com.qubole.shaded.hadoop.hive.ql.io.orc.OrcInputFormat.getRecordReader(OrcInputFormat.java:1989)[info] at com.qubole.spark.datasources.hiveacid.reader.Hive3RDD$$anon$1.liftedTree1$1(Hive3Rdd.scala:287)[info] at com.qubole.spark.datasources.hiveacid.reader.Hive3RDD$$anon$1.<init>(Hive3Rdd.scala:286)[info] at com.qubole.spark.datasources.hiveacid.reader.Hive3RDD.compute(Hive3Rdd.scala:253)[info] at com.qubole.spark.datasources.hiveacid.reader.Hive3RDD.compute(Hive3Rdd.scala:86)[info] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)[info] at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)[info] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)[info] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)[info] at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)[info] at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105)[info] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)[info] at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)[info] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)[info] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)[info] at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)[info] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)[info] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)[info] at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)[info] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)[info] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)[info] at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)[info] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)[info] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)[info] at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)[info] at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)[info] at org.apache.spark.scheduler.Task.run(Task.scala:121)[info] at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)[info] at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)[info] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)[info] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)[info] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)[info] at java.lang.Thread.run(Thread.java:748)[info][info] Driver stacktrace:[info] at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)[info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)[info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)[info] at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)[info] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)[info] at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)[info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)[info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)[info] at scala.Option.foreach(Option.scala:257)[info] at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)[info] ...[info] Cause: java.lang.IllegalStateException: Current state = FLUSHED, new state = CODING_END[info] at java.nio.charset.CharsetDecoder.throwIllegalStateException(CharsetDecoder.java:992)[info] at java.nio.charset.CharsetDecoder.decode(CharsetDecoder.java:572)[info] at java.nio.charset.CharsetDecoder.decode(CharsetDecoder.java:802)[info] at com.qubole.shaded.hadoop.hive.ql.io.orc.OrcRecordUpdater.parseKeyIndex(OrcRecordUpdater.java:628)[info] at com.qubole.shaded.hadoop.hive.ql.io.orc.OrcRawRecordMerger.discoverKeyBounds(OrcRawRecordMerger.java:744)[info] at com.qubole.shaded.hadoop.hive.ql.io.orc.OrcRawRecordMerger.<init>(OrcRawRecordMerger.java:1007)[info] at com.qubole.shaded.hadoop.hive.ql.io.orc.OrcInputFormat.getReader(OrcInputFormat.java:2091)[info] at com.qubole.shaded.hadoop.hive.ql.io.orc.OrcInputFormat.getRecordReader(OrcInputFormat.java:1989)[info] at com.qubole.spark.datasources.hiveacid.reader.Hive3RDD$$anon$1.liftedTree1$1(Hive3Rdd.scala:287)[info] at com.qubole.spark.datasources.hiveacid.reader.Hive3RDD$$anon$1.<init>(Hive3Rdd.scala:286)

Fix the reading of table with option to include rowIds during read

Parameter to include reading rowIds can be specified like below:
val df1 = spark.read.format("HiveAcid").options(Map("includeRowIds" -> "true", "table" -> "tSparkUpdate")).load()
These were failing with ClassCastExceptions due to wrong order in the InternalRecords being generated.
This commit fixes the order based on requiredColumns being passed to HiveAcidRelation.buildScan

Fix Docker Tests

Commit to fix the long running Docker tests.
We will currently ignore Read and Write Suite temporarily as it takes many hours to run.
Other tests will be fixed.

Issue with Multi Statement Delta files

This was found in open source issue: #70 - Connect to preview

Consider the following sequence:

  1. Consider a ACID table test with just one column i of type integer. DF containing (1, 2, 3) is used to insert into it using HiveAcidTable API: insertInto(df: DataFrame, statementId: Option[Int] = None): Unit
    Use statement Id as 1

  2. Again do a insert into with DF having (4, 5,6), but this time give no statement id.
    So there would be 2 directory structure created:

first transaction:
hdfs://0.0.0.0:9000/tmp/hive/test/delta_0000001_0000001_0001/bucket_0000
second transaction:
hdfs://0.0.0.0:9000/tmp/hive/test/delta_0000003_0000003/bucket_0000

When you update or delete i = 1 and 4 which are in different files but with same bucket id: 0, they need to be written to same bucket file in the new transaction's delete_delta and delta directory.
But thing to note here is even if they both belong to bucket 0, the rowId.bucketId will be different as they have different statement id. rowId.bucketID is encoded this way:

Represents format of "bucket" property in Hive 3.0.

top 3 bits - version code.

next 1 bit - reserved for future

next 12 bits - the bucket ID

next 4 bits reserved for future

remaining 12 bits - the statement ID - 0-based numbering of all statements within a

transaction. Each leg of a multi-insert statement gets a separate statement ID.

The reserved bits align it so that it easier to interpret it in Hex.

Since we are repartitioning based on rowid.bucketId both 1 and 4 will be processed by different tasks and they end up conflicting as they need to write to the same file. Hence we will have to repartition to just the bucketID encoded in rowid.bucketId

Merge SQL failing with ParseException

Hi Team,
I am trying to perform MERGE on HiveAcid talbes using Qubole Spark-ACID, but was facing below errors:

Created an assembly jar from latest code from mater and tried to execute MERGE statement using spark.sql from spark-shell:

/spark-2.4.3-bin-hadoop2.7/bin/spark-shell --jars /sandbox/spark-acid-assembly-0.5.0.jar --conf spark.sql.extensions=com.qubole.spark.hiveacid.HiveAcidAutoConvertExtension --conf "spark.hadoop.yarn.timeline-service.enabled=false"
scala> spark.sql("MERGE INTO TARGET_TABLE AS T  USING SOURCE_TABLE AS S ON t.key_col= s.key_col WHEN MATCHED THEN UPDATE SET                    col_1 = s.col_1, col_2 = s.col_2 WHEN NOT MATCHED THEN INSERT VALUES ( s.key_col, s.col_1, s.col_2)")

org.apache.spark.sql.catalyst.parser.ParseException: mismatched input 'MERGE' expecting {'(', 'SELECT', 'FROM', 'ADD', 'DESC', 'WITH', 'VALUES', 'CREATE', 'TABLE', 'INSERT', 'DELETE', 'DESCRIBE', 'EXPLAIN', 'SHOW', 'USE', 'DROP', 'ALTER', 'MAP', 'SET', 'RESET', 'START', 'COMMIT', 'ROLLBACK', 'REDUCE', 'REFRESH', 'CLEAR', 'CACHE', 'UNCACHE', 'DFS', 'TRUNCATE', 'ANALYZE', 'LIST', 'REVOKE', 'GRANT', 'LOCK', 'UNLOCK', 'MSCK', 'EXPORT', 'IMPORT', 'LOAD'}(line 1, pos 0)

== SQL ==
MERGE INTO TARGET_TABLE AS T  USING SOURCE_TABLE AS S ON t.key_col= s.key_col WHEN MATCHED THEN UPDATE SET                    col_1 = s.col_1, col_2 = s.col_2 WHEN NOT MATCHED THEN INSERT VALUES ( s.key_col, s.col_1, s.col_2)
^^^

  at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:241)
  at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:117)
  at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:48)
  at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:69)
  at com.qubole.spark.datasources.hiveacid.sql.execution.SparkAcidSqlParser.parsePlan(SparkAcidSqlParser.scala:56)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
  ... 49 elided

Merge condition in merge sql command does not support table names

Merge statement like this
merge into mtbl1 using mtbl2 on mtbl1.id=mtbl2.id when not matched then insert values (*) when matched and mtbl1.name != mtbl2.name then update set name=mtbl2.name

fails with the following error:

org.apache.spark.sql.AnalysisException: mtbl1.id resolution failed given these columns: name,id,name,id; line 1 pos 32 at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) at org.apache.spark.sql.SqlUtils$$anonfun$resolveReferences$3.apply(SqlUtils.scala:61) at org.apache.spark.sql.SqlUtils$$anonfun$resolveReferences$3.apply(SqlUtils.scala:60) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.sql.SqlUtils$.resolveReferences(SqlUtils.scala:59) at org.apache.spark.sql.catalyst.parser.plans.logical.MergePlan$.resolve(MergePlan.scala:37) at com.qubole.spark.hiveacid.merge.MergeImpl.<init>(MergeImpl.scala:72) at com.qubole.spark.hiveacid.HiveAcidOperationDelegate.merge(AcidOperationDelegate.scala:442) at com.qubole.spark.hiveacid.HiveAcidTable$$anonfun$merge$1.apply(HiveAcidTable.scala:199) at com.qubole.spark.hiveacid.HiveAcidTable$$anonfun$merge$1.apply(HiveAcidTable.scala:199) at com.qubole.spark.hiveacid.HiveTxnWrapper.inTxnRetry$1(HiveAcidTable.scala:330) at com.qubole.spark.hiveacid.HiveTxnWrapper.inTxn(HiveAcidTable.scala:355) at com.qubole.spark.hiveacid.HiveAcidTable.merge(HiveAcidTable.scala:198) at com.qubole.spark.hiveacid.datasource.HiveAcidRelation.merge(HiveAcidRelation.scala:98) at com.qubole.spark.datasources.hiveacid.sql.catalyst.plans.command.MergeCommand.run(MergeCommand.scala:51) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79) at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:195) at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:195) at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3370) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:79) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:126) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369) at org.apache.spark.sql.Dataset.<init>(Dataset.scala:195) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:79) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:662) ... 49 elided

Support for anti-join based reader support for Hive ACID tables in Spark

Currently, ACID reader is currently using Hive ORC Acid readers shaded to read the Hive ACID tables. We can definitely improve the performance if we move to native readers to read those tables. Especially in the case of Parquet, Spark’s native Parquet readers are optimized and we don't want to miss on that. Another reason is that we need to convert every row from Hive Structure to Internal Row which might add penalty too.

Following are the 2 important objectives for this exercise:

Performance: Figure out a performant way of using the native Spark readers.

Maintainability: Not make changes to Spark readers for Hive ACID. We would not like to make any changes to Spark readers or keep a fork of it which needs to be maintained later.

One of the solutions we are planning to evaluate is to use anti-join between base_files + delta files (let’s call it base relation) and on the delete_delta files (let’s call it delete relation). AntiJoin of base and delta relations on (rowId, bucket, transactionIds) can give us the result to read i.e.,

AnitJoin(base, delete)

However, Sort Merge Join can lead to extra shuffle that might cause a performance issue. If delete relation is more than the broadcast threshold then that can lead to SMJ. To ensure Broadcast to happen more frequently is to split the base and delete relations by bucket id and doing anti-join corresponding to the bucket Ids i.e.,

AntiJoin(base, delete) = Union(AntiJoin(base_bucket_1, delete_bucket_1), AntiJoin(base_bucket_2, delete_bucket_2), … AntiJoin(base_bucket_n, delete_bucket_n))

This way joins are getting split and making sure the broadcast joins come into play here.

Insert into TimeStamp and Date columns are failing for HiveAcid

Currently, SparkRows are converted to HiveRows which writing via Hive's RecordUpdater. During that we pass java.sql.* type for Date and Timestamp. It cause CastException during write:
java.sql.Timestamp cannot be cast to com.qubole.shaded.hadoop.hive.common.type.Timestamp

ACID removing data

Hey everybody. I have problem with removing data from HDFS with spark (hive context) so I have question, is your library support ACID removing in sparksql (hive context) from HDFS ?

Update/delete operation on acid table sometimes fetches wrong bucket ID to write to

One of the issues in issue #70 was that update in merge command was failing because multiple spark tasks were trying to write to same bucket file. Ideally this should not have happened because in spark acid writer, we repartition data frame on the basis of rowId.bucketID column so that all rows with same bucketID go to the same task. But there is a bug in getting bucket ID from each InternalRow of the table during update/delete operation . The issue is that while fetching bucket id from unsafe row, we are passing table schema. Instead we should have passed rowID schema (which is a struct type and contains bucketID, rowID and writeID). As a result of it, unsafe row returns wrong integer value for rowID column.

Heartbeat Runner is not running.

We are using Spark-Acid latest master code, which contains commit 736ffa3 that you suggested to include about dynamic partition. However, we came across this error now:

In the driver error log:
NoSuchTxnException: No such transaction txnid:22521
at com.qubole.shaded.hadoop.hive.metastore.api.ThriftHiveMetastore$add_dynamic_partitions_result$add_dynamic_partitions_resultStandardScheme.read(ThriftHiveMetastore.java)
at com.qubole.shaded.hadoop.hive.metastore.api.ThriftHiveMetastore$add_dynamic_partitions_result$add_dynamic_partitions_resultStandardScheme.read(ThriftHiveMetastore.java)
at com.qubole.shaded.hadoop.hive.metastore.api.ThriftHiveMetastore$add_dynamic_partitions_result.read(ThriftHiveMetastore.java)
at com.qubole.shaded.thrift.TServiceClient.receiveBase(TServiceClient.java:86)
at com.qubole.shaded.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_add_dynamic_partitions(ThriftHiveMetastore.java:5527)
...
(28 additional frame(s) were not displayed)

StreamingQueryException: Query hiveSink [id = 3267e047-4848-4821-a2f7-04d6b8dd84c9, runId = c94a4e2e-34bc-48cf-8f2f-266057cf4c84] terminated with exception: No such transaction txnid:22521
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:297)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
NoSuchTxnException: No such transaction txnid:22521
at com.qubole.shaded.hadoop.hive.metastore.api.ThriftHiveMetastore$add_dynamic_partitions_result$add_dynamic_partitions_resultStandardScheme.read(ThriftHiveMetastore.java)
at com.qubole.shaded.hadoop.hive.metastore.api.ThriftHiveMetastore$add_dynamic_partitions_result$add_dynamic_partitions_resultStandardScheme.read(ThriftHiveMetastore.java)
at com.qubole.shaded.hadoop.hive.metastore.api.ThriftHiveMetastore$add_dynamic_partitions_result.read(ThriftHiveMetastore.java)
at com.qubole.shaded.thrift.TServiceClient.receiveBase(TServiceClient.java:86)
at com.qubole.shaded.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_add_dynamic_partitions(ThriftHiveMetastore.java:5527)
...
(28 additional frame(s) were not displayed)
In the worker error log
ERROR AcidUtils: Failed to create hdfs://.../warehouse/tablespace/managed/hive/events/year=2020/month=6/day=4/delta_0000727_0000727/or
c_acid_version due to: Failed to CREATE_FILE /warehouse/tablespace/managed/hive/events/year=2020/month=6/day=4/delta_0000727_0000727/orc_acid_version for DFSClient_NONMAPREDUCE-528742007_48
on 10.236.7.228 because DFSClient_NONMAPREDUCE
-528742007_48 is already the current lease holder.
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2555)
at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.startFile(FSDirWriteFileOp.java:378)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2453)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2351)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:774)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:462)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:524)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1025)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:876)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:822)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2682)
Any clue regarding this? Can it be related to the latest dynamic partitioning changes?

'getPartition' computation optimization

Currently, every Hive Partition has corresponding HiveACID RDD created. Every RDD computes getPartition (this is not Hive partition, but RDD partitions) which is basically the split computation. In case of partitioned tables, since every Hive partition is a different RDD, their getPartition is currently invoked in sequential manner. So for reading large number of partitions, this can be time consuming especially on cloud object stores like S3 as listing is expensive. So we are planning to parallelize it by spawning a spark job itself. We end up using the Spark framework itself to parallelize getPartition calls when multiple RDDs are involved.

Improvements observed on S3 for 2000 partitions are below:
Improvements in Split Computation for Partitioned Tables

We are observing 8X improvements in split computation times i.e., getPartitions. IT will only increase with more parallelism in the cluster which can be harnessed by Spark job and with more Hive Partitions to parallelize.

Not able to perform on action on hive tables using spark-acid jar

Running on HDI 4.0 with spark 2.4.0 and HIVE 3.1.2.

spark-shell --packages qubole:spark-acid:0.4.0-s_2.11
val df = spark.read.format("HiveAcid").options(Map("table" -> "default.college")).load()
df.count

--stack-trace--

Caused by: java.lang.NullPointerException
at org.apache.hadoop.conf.Configuration.(Configuration.java:820)
at org.apache.hadoop.mapred.JobConf.(JobConf.java:440)
at com.qubole.spark.datasources.hiveacid.rdd.Hive3RDD.getJobConf(Hive3Rdd.scala:165)
at com.qubole.spark.datasources.hiveacid.rdd.Hive3RDD$$anon$1.(Hive3Rdd.scala:257)
at com.qubole.spark.datasources.hiveacid.rdd.Hive3RDD.compute(Hive3Rdd.scala:252)
at com.qubole.spark.datasources.hiveacid.rdd.Hive3RDD.compute(Hive3Rdd.scala:86)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

JobConf is picked from hadoop-mapreduce-client-core-3.1.1.jar

Configuration is picked from hadoop-common-3.1.1.jar

val a: org.apache.hadoop.conf.Configuration = null
new JobConf(a)

java.lang.NullPointerException
at org.apache.hadoop.conf.Configuration.(Configuration.java:820)
at org.apache.hadoop.mapred.JobConf.(JobConf.java:440)
... 49 elided

The value is coming as null in this line---> https://github.com/qubole/spark-acid/blob/f445eeef4416ee27192905e0e69a43076db7b2b1/src/main/scala/com/qubole/spark/datasources/hiveacid/rdd/Hive3Rdd.scala#L138

--even tried with setting spark.hadoop.cloneConf seems to be breaking in both if and else condition--
-- tried with HDI 3.6 with spark 2.3.0 but there was an issue with guava jar version being 24.1.1,
so it was throwing as the method was found

java.lang.NoSuchMethodError: com.google.common.collect.MapMaker.softValues()Lcom/google/common/collect/MapMaker;
at com.qubole.spark.datasources.hiveacid.rdd.Cache$.(Hive3Rdd.scala:53)
at com.qubole.spark.datasources.hiveacid.rdd.Cache$.(Hive3Rdd.scala)
at com.qubole.spark.datasources.hiveacid.rdd.Hive3RDD.getJobConf(Hive3Rdd.scala:160)
at com.qubole.spark.datasources.hiveacid.rdd.Hive3RDD.getPartitions(Hive3Rdd.scala:196)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:46)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:46)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:84)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:84)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:84)
at com.qubole.spark.datasources.hiveacid.rdd.AcidLockUnionRDD.getPartitions(AcidLockUnionRDD.scala:37)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:46)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:46)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.ShuffleDependency.(Dependency.scala:91)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:322)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:91)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:150)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:294)
at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2775)
at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2774)
at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3259)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3258)
at org.apache.spark.sql.Dataset.count(Dataset.scala:2774)
... 49 elided

in this line--> https://github.com/qubole/spark-acid/blob/f445eeef4416ee27192905e0e69a43076db7b2b1/src/main/scala/com/qubole/spark/datasources/hiveacid/rdd/Hive3Rdd.scala#L51

Can you guys please help me with this..

Thanks and Regards,
Vinay K L

Adding check for Write Transaction validity after it acquires lock, so that it is working on consistent states of table

Currently these are the steps in write transactions:

  1. Create Transaction
  2. While creating valid transaction, also store the valid Transactions (validTxns) at that point.
  3. Acquire Locks
  4. Execute code within transaction.
  5. End the transaction is Step 4 is successful or else abort.

validTxns could have changed after Step 3 due to transactions getting committed between Step 2 and 3 above. Abort the transaction if it happened and Retry from step 1.

HiveAcid writeStream with checkpoint throwing org.apache.hadoop.fs.FileAlreadyExistsException

In our Spark app, we use Spark structured streaming. It uses Kafka as readStream, & HiveAcid as writeStream to Hive table. Below is our code:

import za.co.absa.abris.avro.functions.from_confluent_avro
.....

val spark = SparkSession
  .builder()
  .appName("events")
  .config("spark.sql.streaming.metricsEnabled", true)
  .enableHiveSupport()
  .getOrCreate()

import spark.implicits._

val input_stream_df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "kafka:9092")
  .option("startingOffsets", '{"events":{"0":2310384922,"1":2280420020,"2":2278027233,"3":2283047819,"4":2285647440}}')
  .option("maxOffsetsPerTrigger", 10000)
  .option("subscribe", "events")
  .load()

// schema registry config
val srConfig = Map(
  "schema.registry.url"           -> "http://schema-registry:8081",
  "value.schema.naming.strategy"  -> "topic.name",
  "schema.registry.topic"         -> "events",
  "value.schema.id"               -> "latest"
)

val data = input_stream_df
  .withColumn("value", from_confluent_avro(col("value"), srConfig))
  .withColumn("timestamp_s", from_unixtime($"value.timestamp" / 1000))
  .select(
    $"value.*",
    year($"timestamp_s")       as 'year,
    month($"timestamp_s")      as 'month,
    dayofmonth($"timestamp_s") as 'day
  )

val output_stream_df = data.writeStream.format("HiveAcid")
  .queryName("hiveSink")
  .option("database", "default")
  .option("table", "events_sink")
  .option("checkpointLocation", "/user/spark/events/checkpoint")
  .option("spark.acid.streaming.log.metadataDir", "/user/spark/events/checkpoint/spark-acid")
  .option("metastoreUri", "thrift://hive-metastore:9083")
  .trigger(Trigger.ProcessingTime("30 seconds"))
  .start()

output_stream_df.awaitTermination()

We able to deploy the app to production, & redeployed it several times (~ 10 times) without issue. Then it ran into the following error:

Query hiveSink [id = 080a9f25-23d2-4ec8-a8c0-1634398d6d29, runId = 990d3bba-0f7f-4bae-9f41-b43db6d1aeb3] terminated with exception: Job aborted due to stage failure: Task 3 in stage 0.0 failed 4 times, most recent failure: Lost task 3.3 in stage 0.0 (TID 42, 10.236.7.228, executor 3): org.apache.hadoop.fs.FileAlreadyExistsException: /warehouse/tablespace/managed/hive/events/year=2020/month=5/day=18/delta_0020079_0020079/bucket_00003 for client 10.236.7.228 already exists
(...)
at com.qubole.shaded.orc.impl.PhysicalFsWriter.(PhysicalFsWriter.java:95)
at com.qubole.shaded.orc.impl.WriterImpl.(WriterImpl.java:177)
at com.qubole.shaded.hadoop.hive.ql.io.orc.WriterImpl.(WriterImpl.java:94)
at com.qubole.shaded.hadoop.hive.ql.io.orc.OrcFile.createWriter(OrcFile.java:334)
at com.qubole.shaded.hadoop.hive.ql.io.orc.OrcRecordUpdater.initWriter(OrcRecordUpdater.java:602)
at com.qubole.shaded.hadoop.hive.ql.io.orc.OrcRecordUpdater.addSimpleEvent(OrcRecordUpdater.java:423)
at com.qubole.shaded.hadoop.hive.ql.io.orc.OrcRecordUpdater.addSplitUpdateEvent(OrcRecordUpdater.java:432)
at com.qubole.shaded.hadoop.hive.ql.io.orc.OrcRecordUpdater.insert(OrcRecordUpdater.java:484)
at com.qubole.spark.hiveacid.writer.hive.HiveAcidFullAcidWriter.process(HiveAcidWriter.scala:295)
at com.qubole.spark.hiveacid.writer.TableWriter$$anon$1$$anonfun$6.apply(TableWriter.scala:153)
at com.qubole.spark.hiveacid.writer.TableWriter$$anon$1$$anonfun$6.apply(TableWriter.scala:153)
(...)
at com.qubole.spark.hiveacid.writer.TableWriter$$anon$1.apply(TableWriter.scala:153)
at com.qubole.spark.hiveacid.writer.TableWriter$$anon$1.apply(TableWriter.scala:139)

Each time the app is restarted, it shows different delta + bucket files already exists error. However, those files are newly created (most probably) each time it starts, but no clue why the error is thrown.

Any pointer will be much appreciated.

Can not filter on partition which are not string

Hi,
I have some issue to filter on the partition of a table.

spark.sql("select * from symlinkacidtable").printSchema
root
 |-- name: string (nullable = true)
 |-- part: string (nullable = true)

Partition are like yyyMMdd
When I try to filter with an integer it's working :

spark.sql("select * from symlinkacidtable where part=20190905").show()
+-------+--------+
| name  |  part  |
+-------+--------+
|      a|20190905|

When I try to filter with a String I have this exception :

spark.sql("select * from symlinkacidtable where part='20190905'").show()
com.qubole.shaded.hadoop.hive.metastore.api.MetaException: Filtering is supported only on partition keys of type string
  at com.qubole.shaded.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partitions_by_filter_result$get_partitions_by_filter_resultStandardScheme.read(ThriftHiveMetastore.java)
  at com.qubole.shaded.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partitions_by_filter_result$get_partitions_by_filter_resultStandardScheme.read(ThriftHiveMetastore.java)
  at com.qubole.shaded.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partitions_by_filter_result.read(ThriftHiveMetastore.java)
  at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:86)
  at com.qubole.shaded.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_partitions_by_filter(ThriftHiveMetastore.java:3295)
  at com.qubole.shaded.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_partitions_by_filter(ThriftHiveMetastore.java:3279)
  at com.qubole.shaded.hadoop.hive.metastore.HiveMetaStoreClient.listPartitionsByFilter(HiveMetaStoreClient.java:1540)
  at com.qubole.shaded.hadoop.hive.metastore.HiveMetaStoreClient.listPartitionsByFilter(HiveMetaStoreClient.java:1534)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at com.qubole.shaded.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:212)
  at com.sun.proxy.$Proxy29.listPartitionsByFilter(Unknown Source)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at com.qubole.shaded.hadoop.hive.metastore.HiveMetaStoreClient$SynchronizedHandler.invoke(HiveMetaStoreClient.java:2956)
  at com.sun.proxy.$Proxy29.listPartitionsByFilter(Unknown Source)
  at com.qubole.shaded.hadoop.hive.ql.metadata.Hive.getPartitionsByFilter(Hive.java:3484)
  at com.qubole.spark.datasources.hiveacid.HiveAcidRelation.getRawPartitions(HiveAcidRelation.scala:267)
  at com.qubole.spark.datasources.hiveacid.HiveAcidRelation.buildScan(HiveAcidRelation.scala:177)
  at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$10.apply(DataSourceStrategy.scala:293)
  at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$10.apply(DataSourceStrategy.scala:293)
  at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:338)
  at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:337)
  at org.apache.spark.sql.execution.datasources.DataSourceStrategy.pruneFilterProjectRaw(DataSourceStrategy.scala:415)
  at org.apache.spark.sql.execution.datasources.DataSourceStrategy.pruneFilterProject(DataSourceStrategy.scala:333)
  at org.apache.spark.sql.execution.datasources.DataSourceStrategy.apply(DataSourceStrategy.scala:289)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
  at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
  at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
  at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
  at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
  at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
  at scala.collection.Iterator$class.foreach(Iterator.scala:893)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
  at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
  at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
  at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
  at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
  at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
  at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3254)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2489)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2703)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:723)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:682)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:691)
  ... 49 elided

Do you know why ?

Thanks

Thomas

Decoupling Transaction state and operations from HiveAcidTable

Changes:
1. Making HiveAcidTable threadsafe by removing the global state of the Transaction (isLocal). It also would allow nesting of transactions, however we donot recommend that.
2. Secondly, multiple operations were couple with the creation of transaction.  Now they have been decoupled. This allows the flexibility of using a transaction for multiple operations
as in the case of MERGE.

spark-acid incorrectly reads/writes pre-Gregorian timestamps

In beeline:

0: jdbc:hive2://localhost:10000> create table ts_acid (ts TIMESTAMP) stored as orc TBLPROPERTIES ('transactional' = 'true');
No rows affected (0.132 seconds)
0: jdbc:hive2://localhost:10000> insert into ts_acid values ('1200-01-01 00:00:00.0');
No rows affected (2.339 seconds)
0: jdbc:hive2://localhost:10000> select * from ts_acid;
20/09/16 17:51:51 INFO lockmgr.DbTxnManager: Stopped heartbeat for query: bruce_20200916175151_d8bc218d-da7d-491a-8f97-e38fb2416748
+------------------------+
|       ts_acid.ts       |
+------------------------+
| 1200-01-01 00:00:00.0  |
+------------------------+
1 row selected (0.207 seconds)
0: jdbc:hive2://localhost:10000> 

In Spark:

scala> val df = spark.read.format("HiveAcid").options(Map("table" -> "default.ts_acid")).load
20/09/16 17:53:22 WARN HiveConf: HiveConf of name hive.metastore.local does not exist
df: org.apache.spark.sql.DataFrame = [ts: timestamp]

scala> df.show(truncate=false)
20/09/16 17:53:29 WARN HiveConf: HiveConf of name hive.metastore.local does not exist
20/09/16 17:53:31 ERROR AcidUtils: Failed to get files with ID; using regular API: Only supported for DFS; got class org.apache.hadoop.fs.LocalFileSystem
+-------------------+
|ts                 |
+-------------------+
|1199-12-24 16:00:00|
+-------------------+

scala> 

Conversely, in Spark:

scala> val df = Seq(java.sql.Timestamp.valueOf("1400-01-01 00:00:00.0")).toDF("ts")
df: org.apache.spark.sql.DataFrame = [ts: timestamp]

scala> df.write.format("HiveAcid").option("table", "default.ts_acid").mode("append").save()
20/09/16 18:05:30 WARN HiveConf: HiveConf of name hive.metastore.local does not exist
20/09/16 18:05:31 WARN HiveConf: HiveConf of name hive.metastore.local does not exist
20/09/16 18:05:32 WARN HiveConf: HiveConf of name hive.metastore.local does not exist

scala> 

Then, in beeline:

0: jdbc:hive2://localhost:10000> select * from ts_acid;
20/09/16 18:06:25 INFO lockmgr.DbTxnManager: Stopped heartbeat for query: bruce_20200916180625_03a152d1-2be1-41c7-b205-33d38e68ce2f
+------------------------+
|       ts_acid.ts       |
+------------------------+
| 1200-01-01 00:00:00.0  |
| 1400-01-09 08:00:00.0  |    <--- This should be 1400-01-01 00:00:00.0
+------------------------+
2 rows selected (0.283 seconds)
0: jdbc:hive2://localhost:10000> 

Fix the Cardinality check for Merge

Currently Cardinality check assumes RowId is unique across table. But in reality they are unique for partitions. so that should be accounted in the cardinality check.

Add all dynamic partitions touched in write to hive metastore

For Writes that are going to be touching the dynamic partitions, we are supposed to do 2 things:

  1. During taking Lock for write, specify that it is dynamic partition query.
  2. For every dynamic partition written into call HiveMetastoreClient.add_dynamic_partition()
    First call ensures that a dummy value is put into TXN_COMPONENT table
    Second call ensures that dummy value is replaced by actual entry with partitions touched.This ensures that compaction can be called on those partitions. Another use case is that if query gets aborted/failed after taking lock, then dummy value will ensure that cleaner cleans up all the aborted delta files from the table. Without the fix, If the transaction is aborted between openTxn and addPartitions and data has been written on the table the transaction manager will think it's an empty transaction and no cleaning will be done. Aborted delta files will not be read until this transaction is present in TXN tables as aborted transaction. But later when this is removed from TXNS table assuming its an empty transaction, those aborted delta files would still be present leading to issues.

Revisit read Txn and Lock table for read operations

Currently read does not acquire locks due to life time of RDD is unspecified hence would lead to blocking of the InsertOverwrites. So protecting it against the DDL / Truncate / Compaction is operational issue as it stands.

Currently whenever DF materialises first time the transactional snapshot of read is acquired at that time.

Above two need rethinking ..

orc_write ETA

Great work!

May I know the ETA of orc_write feature?

Add Doc for Merge

Issue to update README with MERGE functionality. It should contain detailed explanation of:

  1. SQL Syntax
  2. Performance consideration.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.