Code Monkey home page Code Monkey logo

Comments (2)

dai-chen avatar dai-chen commented on August 23, 2024

Quick tested joined queries with windowing function but failed due to missing support in Spark 3.3.1:

  test("create materialized view with join") {
    withTempDir { checkpointDir =>
      sql(s"""
           | CREATE MATERIALIZED VIEW $testMvName
           | AS
           | SELECT
           |   t1.startTime AS eventTime,
           |   t1.name,
           |   t2.productId
           | FROM (
           |   SELECT
           |     window.start AS startTime,
           |     name
           |   FROM $testTable
           |   GROUP BY
           |     TUMBLE(time, '10 Minutes'),
           |     name
           | ) AS t1
           | JOIN (
           |   SELECT
           |     window.start AS startTime,
           |     productId
           |   FROM $testTable2
           |   GROUP BY
           |     TUMBLE(transactionDate, '10 Minutes'),
           |     productId
           | ) AS t2
           | ON t1.startTime = t2.startTime
           | WITH (
           |   auto_refresh = true,
           |   checkpoint_location = '${checkpointDir.getAbsolutePath}',
           |   watermark_delay = '1 Second'
           | )
           |""".stripMargin)

      val job = spark.streams.active.find(_.name == testFlintIndex)
      job shouldBe defined
      failAfter(streamingTimeout) {
        job.get.processAllAvailable()
      }

      flint.queryIndex(testFlintIndex).show()
    }
  }


org.apache.spark.sql.AnalysisException: Multiple streaming aggregations are not supported with streaming DataFrames/Datasets; line 1 pos 0;
Project [startTime#160 AS eventTime#162, name#173, productId#178]
+- Join Inner, (startTime#160 = startTime#161)
   :- SubqueryAlias t1
   :  +- Aggregate [window#187-T1000ms, name#173], [window#187-T1000ms.start AS startTime#160, name#173]
   :     +- Project [named_struct(start, precisetimestampconversion(((precisetimestampconversion(time#172-T1000ms, TimestampType, LongType) - (((precisetimestampconversion(time#172-T1000ms, TimestampType, LongType) - 0) + 600000000) % 600000000)) - 0), LongType, TimestampType), end, precisetimestampconversion((((precisetimestampconversion(time#172-T1000ms, TimestampType, LongType) - (((precisetimestampconversion(time#172-T1000ms, TimestampType, LongType) - 0) + 600000000) % 600000000)) - 0) + 600000000), LongType, TimestampType)) AS window#187-T1000ms, time#172-T1000ms, name#173, age#174, address#175]
   :        +- Filter isnotnull(time#172-T1000ms)
   :           +- EventTimeWatermark time#172: timestamp, 1 seconds
   :              +- SubqueryAlias spark_catalog.default.mv_test
   :                 +- StreamingRelation DataSource(org.apache.spark.sql.test.TestSparkSession@3db2665d,CSV,List(),Some(StructType(StructField(time,TimestampType,true),StructField(name,StringType,true),StructField(age,IntegerType,true),StructField(address,StringType,true))),List(),None,Map(header -> false, delimiter -> 	, path -> file:/Users/daichen/IdeaProjects/opensearch-spark/spark-warehouse/mv_test),None), FileSource[file:/Users/daichen/IdeaProjects/opensearch-spark/spark-warehouse/mv_test], [time#172, name#173, age#174, address#175]
   +- SubqueryAlias t2
      +- Aggregate [window#188-T1000ms, productId#178], [window#188-T1000ms.start AS startTime#161, productId#178]
         +- Project [named_struct(start, precisetimestampconversion(((precisetimestampconversion(transactionDate#177-T1000ms, TimestampType, LongType) - (((precisetimestampconversion(transactionDate#177-T1000ms, TimestampType, LongType) - 0) + 600000000) % 600000000)) - 0), LongType, TimestampType), end, precisetimestampconversion((((precisetimestampconversion(transactionDate#177-T1000ms, TimestampType, LongType) - (((precisetimestampconversion(transactionDate#177-T1000ms, TimestampType, LongType) - 0) + 600000000) % 600000000)) - 0) + 600000000), LongType, TimestampType)) AS window#188-T1000ms, transactionId#176, transactionDate#177-T1000ms, productId#178, productsAmount#179, customerId#180, year#181, month#182]
            +- Filter isnotnull(transactionDate#177-T1000ms)
               +- EventTimeWatermark transactionDate#177: timestamp, 1 seconds
                  +- SubqueryAlias spark_catalog.default.mv_test_2
                     +- StreamingRelation DataSource(org.apache.spark.sql.test.TestSparkSession@3db2665d,CSV,List(),Some(StructType(StructField(transactionId,StringType,true),StructField(transactionDate,TimestampType,true),StructField(productId,StringType,true),StructField(productsAmount,IntegerType,true),StructField(customerId,StringType,true),StructField(year,IntegerType,true),StructField(month,IntegerType,true))),List(),None,Map(header -> false, delimiter -> 	, path -> file:/Users/daichen/IdeaProjects/opensearch-spark/spark-warehouse/mv_test_2),None), FileSource[file:/Users/daichen/IdeaProjects/opensearch-spark/spark-warehouse/mv_test_2], [transactionId#176, transactionDate#177, productId#178, productsAmount#179, customerId#180, year#181, month#182]

from opensearch-spark.

dai-chen avatar dai-chen commented on August 23, 2024

I quickly tested joined queries without using windowing functions (instead, the conditions were specified in the join clause) and observed the following limitations:

  1. Watermark delay: It is not possible to specify a watermark delay for each table, as the watermark_delay option in the WITH clause does not take effect.
  2. Streaming join type: Both sides of the join are treated as streaming sources, resulting in a stream-stream join, with no current way to express a stream-static join.

Despite these limitations, the MV with joined queries seems function as intended:

  protected def createTimeSeriesTransactionTable(testTable: String): Unit = {
    sql(s"""
      | CREATE TABLE $testTable
      | (
      |   transactionId STRING,
      |   transactionDate TIMESTAMP,
      |   productId STRING,
      |   productsAmount INT,
      |   customerId STRING
      | )
      | USING $tableType $tableOptions
      | PARTITIONED BY (
      |    year INT,
      |    month INT
      | )
      |""".stripMargin)

    // Update data insertion
    // -- Inserting records into the testTable for April 2023
    sql(s"""
      | INSERT INTO $testTable PARTITION (year=2023, month=4)
      | VALUES
      | ('txn001', CAST('2023-04-01 10:30:00' AS TIMESTAMP), 'prod1', 2, 'cust1'),
      | ('txn001', CAST('2023-04-01 14:30:00' AS TIMESTAMP), 'prod1', 4, 'cust1'),
      | ('txn002', CAST('2023-04-02 11:45:00' AS TIMESTAMP), 'prod2', 1, 'cust2'),
      | ('txn003', CAST('2023-04-03 12:15:00' AS TIMESTAMP), 'prod3', 3, 'cust1'),
      | ('txn004', CAST('2023-04-04 09:50:00' AS TIMESTAMP), 'prod1', 1, 'cust3')
      |  """.stripMargin)

    // Update data insertion
    // -- Inserting records into the testTable for May 2023
    sql(s"""
      | INSERT INTO $testTable PARTITION (year=2023, month=5)
      | VALUES
      | ('txn005', CAST('2023-05-01 08:30:00' AS TIMESTAMP), 'prod2', 1, 'cust4'),
      | ('txn006', CAST('2023-05-02 07:25:00' AS TIMESTAMP), 'prod4', 5, 'cust2'),
      | ('txn007', CAST('2023-05-03 15:40:00' AS TIMESTAMP), 'prod3', 1, 'cust3'),
      | ('txn007', CAST('2023-05-03 19:30:00' AS TIMESTAMP), 'prod3', 2, 'cust3'),
      | ('txn008', CAST('2023-05-04 14:15:00' AS TIMESTAMP), 'prod1', 4, 'cust1')
      |  """.stripMargin)
  }

  test("create materialized view with join") {
    withTempDir { checkpointDir =>
      sql(s"""
           | CREATE MATERIALIZED VIEW $testMvName
           | AS
           | SELECT
           |   t1.startTime AS eventTime,
           |   t1.transactionId,
           |   t2.productId
           | FROM (
           |   SELECT
           |     transactionId,
           |     transactionDate AS startTime,
           |     productId
           |   FROM $testTable2
           | ) AS t1
           | JOIN (
           |   SELECT
           |     transactionId,
           |     transactionDate AS startTime,
           |     productId
           |   FROM $testTable3
           | ) AS t2
           | ON
           |   t1.transactionId = t2.transactionId
           |   AND
           |   t1.startTime BETWEEN t2.startTime AND t2.startTime + INTERVAL 1 DAY
           | WITH (
           |   auto_refresh = true,
           |   checkpoint_location = '${checkpointDir.getAbsolutePath}',
           |   watermark_delay = '1 Second'
           | )
           |""".stripMargin)

      val job = spark.streams.active.find(_.name == testFlintIndex)
      job shouldBe defined
      failAfter(streamingTimeout) {
        job.get.processAllAvailable()
      }

      flint.queryIndex(testFlintIndex)
        .select("eventTime", "transactionId", "productId")
        .orderBy("eventTime", "transactionId").show()
    }
  
+-------------------+-------------+---------+
|          eventTime|transactionId|productId|
+-------------------+-------------+---------+
|2023-04-01 10:30:00|       txn001|    prod1|
|2023-04-01 14:30:00|       txn001|    prod1|
|2023-04-01 14:30:00|       txn001|    prod1|
|2023-04-02 11:45:00|       txn002|    prod2|
|2023-04-03 12:15:00|       txn003|    prod3|
|2023-04-04 09:50:00|       txn004|    prod1|
|2023-05-01 08:30:00|       txn005|    prod2|
|2023-05-02 07:25:00|       txn006|    prod4|
|2023-05-03 15:40:00|       txn007|    prod3|
|2023-05-03 19:30:00|       txn007|    prod3|
|2023-05-03 19:30:00|       txn007|    prod3|
|2023-05-04 14:15:00|       txn008|    prod1|
+-------------------+-------------+---------+

from opensearch-spark.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.