Code Monkey home page Code Monkey logo

opensearch-spark's Introduction

OpenSearch Flint

OpenSearch Flint is ... It consists of two modules:

  • flint-core: a module that contains Flint specification and client.
  • flint-spark-integration: a module that provides Spark integration for Flint and derived dataset based on it.
  • ppl-spark-integration: a module that provides PPL query execution on top of Spark See PPL repository.

Documentation

Please refer to the Flint Index Reference Manual for more information. For PPL language see PPL Reference Manual for more information.

Prerequisites

Version compatibility:

Flint version JDK version Spark version Scala version OpenSearch
0.1.0 11+ 3.3.1 2.12.14 2.6+
0.2.0 11+ 3.3.1 2.12.14 2.6+
0.3.0 11+ 3.3.2 2.12.14 2.13+
0.4.0 11+ 3.3.2 2.12.14 2.13+
0.5.0 11+ 3.3.2 2.12.14 2.13+

Flint Extension Usage

To use this application, you can run Spark with Flint extension:

spark-sql --conf "spark.sql.extensions=org.opensearch.flint.spark.FlintSparkExtensions"

PPL Extension Usage

To use PPL to Spark translation, you can run Spark with PPL extension:

spark-sql --conf "spark.sql.extensions=org.opensearch.flint.spark.FlintPPLSparkExtensions"

Running With both Extension

spark-sql --conf "spark.sql.extensions='org.opensearch.flint.spark.FlintPPLSparkExtensions, org.opensearch.flint.spark.FlintSparkExtensions'"

Build

To build and run this application with Spark, you can run:

sbt clean standaloneCosmetic/publishM2

then add org.opensearch:opensearch-spark_2.12 when run spark application, for example,

bin/spark-shell --packages "org.opensearch:opensearch-spark_2.12:0.5.0-SNAPSHOT"

PPL Build & Run

To build and run this PPL in Spark, you can run:

sbt clean sparkPPLCosmetic/publishM2

then add org.opensearch:opensearch-spark_2.12 when run spark application, for example,

bin/spark-shell --packages "org.opensearch:opensearch-spark-ppl_2.12:0.5.0-SNAPSHOT"

Code of Conduct

This project has adopted an Open Source Code of Conduct.

Security

If you discover a potential security issue in this project we ask that you notify OpenSearch Security directly via email to [email protected]. Please do not create a public GitHub issue.

License

See the LICENSE file for our project's licensing. We will ask you to confirm the licensing of your contribution.

Copyright

Copyright OpenSearch Contributors. See NOTICE for details.

opensearch-spark's People

Contributors

asuresh8 avatar dai-chen avatar dtaivpp avatar kaituo avatar nocharger avatar penghuo avatar rupal-bq avatar seankao-az avatar vamsi-amazon avatar varun-lodaya avatar yang-db avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

opensearch-spark's Issues

[FEATURE] Fail long waiting statement

when spark job pick up the statement, if the statement in waiting state longer than 10 minutes compare to submitTime, spark job should put the state to FAIL and error = timeout.

[FEATURE] Redefine drop index semantic as logical deletion

Is your feature request related to a problem?

Currently the semantic of DROP statement for all Flint index is:

  1. Terminate incremental refreshing job in current SparkContext
  2. Delete physical Flint index data in OpenSearch

This is inconvenient for user who wants to do step #1 only (there is no STOP refresh job command in the meanwhile).

What solution would you like?

Redefine drop index statement as logical deletion with the following changes:

  1. Only stop refreshing job without deleting Flint index data
  2. [TBD] Provide new VACCUM index statement which deletes both index data and checkpoint data (if any)
  3. [TBD] Provide new a way (REFRESH or a new command) to revive the logical deleted index back to ACTIVE state

What alternatives have you considered?

N/A

[BUG] Column names unescaped during index creation.

What is the bug?
When trying to work with tables that have escaped column names, this escaping is thrown away during index creation.

How can one reproduce the bug?
Steps to reproduce the behavior:

  1. Create a table with column names that require escaping:
CREATE TABLE
  mys3.default.sample_table_dot_cols (`fields.name` string, `fields.count` int) USING PARQUET LOCATION 's3://sample-location/sample-dotcol';
  1. Then attempt to create an index on that table:
CREATE INDEX sample_index ON mys3.default.sample_table_dot_cols (`fields.name`, `fields.count`)
WITH
  (auto_refresh = true);
  1. Wait for query to fail and check logs, you'll see the error:
23/10/19 22:30:42 ERROR FlintJob: Fail to verify existing mapping or write result
org.apache.spark.sql.catalyst.parser.ParseException: 
Syntax error at or near '.': extra input '.'(line 1, pos 6)

== SQL ==
fields.name string not null,fields.count int not null
------^^^

Which indicates that at some point in internal processing (maybe in FlintSparkIndex.scala), the column names are being unescaped.

What is the expected behavior?
If the column name is escaped in the index creation query, it should be correctly handled. Alternatively, if escaping column names isn't supported, there should be an error on table creation.

What is your host/environment?

Do you have any screenshots?
N/A

Do you have any additional context?
N/A

[FEATURE]Add flint's optimization rules to PPL FlintPPLSparkExtensions

Is your feature request related to a problem?
Adding Flint's Query Optimization rules for the FlintPPLSparkExtensions so that queries may benefit the acceleration capabilities

What solution would you like?
Adding the optimizer to the PPL spark extension class:

    extensions.injectOptimizerRule { spark =>
      new FlintSparkOptimizer(spark)
    }

Added FlintPPLSparkExtensions jar to the maven publish job

Do you have any additional context?

[BUG] Show materialized view returns full name in single column

What is the bug?

Currently SHOW MATERIALIZED VIEW returns a single column with the entire mv name. This doesn't align with show tables or show materialized views in other database.

How can one reproduce the bug?

SHOW MATERIALIZED VIEWS IN myglue;
materialized_view_name
myglue.default.http_logs_metrics
myglue.default.lineitem_metrics

What is the expected behavior?

SHOW MATERIALIZED VIEWS IN myglue;
materialized_view_name  catalog_name  database_name
http_logs_metrics  myglue  default
lineitem_metrics   myglue   default

[FEATURE] REPL Job support Blue/Green upgrade

  • proposal
when client launch new jobs, 
  copy existing spark conf, 
  add --conf spark.flint.deployment.excludeJobs = "job-1"
  • prototype
REPLMain() {
  String sessionId = env.get(SESSION_D)
  String myJobId = env.get(EMR_JOB_ID)
  Optional<String> excludeJobs = env.get(EXCLUDE_JOB_ID)

  excldueJobs(myJobId, excldueJobIds)
  
  while(True) {
    if (canPickUpNextStatement()) {
      execute(pick up new statement)
    } else {
      break;
    }
  }
}

void pick up new statement() {
  statementState = running()
}

void canPickUpNextStatement() {
  (excludeJobs, jobId) = SessionStore.of(sessionId).read()
  if (myJobId != runJobId) {
    return false
  }
  if (excludeJobs == null) {
    return true;
  }
  if (excludeJobs != null && myJobId in excludeJobs) {
    return false;
  }
  return true; 
}

void excldueJobs(myJobId, excldueJobIds) {
  (currentJobId, seqNo, primaryNo) = read(sessionId)
  if (excldueJobIds == currentJobId) {
    update(myJobId, excldueJobIds, seqNo, primaryNo)
  }
}

[FEATURE] Support partial indexing for skipping and covering index

Is your feature request related to a problem?

Currently there is no way to provide a start timestamp or WHERE clause in create index statement. That means skipping and covering index has to refresh data from the beginning. This may cause unnecessary computation and storage waste.

What solution would you like?

Support partial indexing by either:

  1. Some index option like startTime
  2. Or generic WHERE clause to accept any filtering condition, e.g. CREATE INDEX ... WHERE status != 200 WITH (...)

Note that one challenge for this is the correctness of query rewrite. Skipping index query rewriter has to compare this filtering condition and one in query and decide if the query can be accelerated.

[FEATURE] Support incremental refresh in refresh index statement

Is your feature request related to a problem?

Currently, all refresh index statement only support manual refresh and always pass RefreshMode.FULL to underlying refreshIndex() API. Re-trigger incremental refresh is also useful when we can support stop incremental refreshing.

What solution would you like?

  1. Remove RefreshMode param in refreshIndex() API [TBD]
  2. refreshIndex() API decides RefreshMode accordingly to auto_refresh option in index metadata

[RFC] OpenSearch and Apache Spark Integration

Introduction

We received a feature request for query execution on object stores in OpenSearch.

We have investigated the possibility to build a new solution for OpenSearch uses and leverage object store as storage. Which includes

We found the challenges are

  • OpenSearch aggregation framework is the simplified MPP frameworks and does not support shuffle stage.
  • OpenSearch query framework missing key feature support, E.g. JOIN, Subquery.

We found these work have been solved by general purpose data preprocessing system, E.g. Presto, Spark, Trino. And build such a platform require years to mature.

Idea

The initial idea is

  1. Using SQL as interface.
  2. Leverage spark as query/compute execution engine.

High level diagram:

Screenshot 2023-06-16 at 8 21 37 AM

User Experience

  1. User configure SPARK cluster as computation resource, E.g. https://SPARK:7707.
  2. User submit SQL to OpenSearch cluster use _plugins/_sql REST API.
    1. SQL engine parse and analysis the SQL query.
    2. SQL engine decide whether route the query to SPARK cluster or run query locally.
  3. In phase-1, we provide interface to let user create derived dataset from data on object store and store in OpenSearch. Then query will be optimized based derived dataset automatically during query time.
  4. In phase-2, we provide opt-in optimization choice for user. The derived dataset will be create automatically based on query pattern.

Epic

[Feature] OpenSearch and Apache Spark Integration

More reading at #4

Phase-0: Proof of Concepts

Goals

  • Verify the solution for end to end production use case
  • Demo the solution

Tasks

Deliverables


Phase-1: Spark Connector and Flint API Support

Goals

  • OpenSearch Release: 2.9.0
    • Support create spark-submit datasource in OpenSearch.
    • Support build visualization in OpenSearch Dashboard using spark datasource.
  • OpenSearch Spark Extension Release.
    • Support create skipping index for Hive Table.
// 1. define the context
USING FLINT
PROPERTIES (
  OPENSEARCH = "https://my-opensearch.com"
)

// 2. create skipping index
CREATE SKIPPING INDEX index_name ON TABLE [db_name].table_name
FOR COLUMNS (col1 INDEX_TYPE, ...)
[REFRESH_ON_CREATE]            <- auto refresh the table after table create
[AUTO_REFRESH TRUE]            <- auto refresh the index when new data appended.

Tasks

Deliverables


Phase-2: Covering Index and Materialized View Support

Goals

  • OpenSearch release: preview in 2.11
  • User story: opensearch-project/sql#1376
  • User guide: guidance for performance, cost estimation and OpenSearch/Spark cluster sizing

Non-Goals

  • Covering index and MV are not accessible in Spark and thus no query rewrite support

Tasks

[BUG] REPL Job should return meaning full error message.

What is the bug?

  • submit query with syntax error.
select * from mys3.default.http_logs limit1 1"
  • query response
{
  "status": "FAILED",
  "error": "command fail: org.opensearch.flint.app.FlintCommand@1f13eabb"
}
  • expected
Syntax error at or near '1': extra input '1'(line 1, pos 44)

== SQL ==
select * from mys3.default.http_logs limit1 1
--------------------------------------------^^^

[BUG] Materialized view creation showing no error, but no view is found

What is the bug?
When creating a materialized view connected to Parquet data, no discoverable view is created, but creation cannot be re-attempted due to a collision.

How can one reproduce the bug?
Steps to reproduce the behavior:

  1. Create a Parquet table on EMR with some data in it, including a timestamp field.
CREATE EXTERNAL TABLE IF NOT EXISTS mys3.default.elb_logs_parquet (
  type string,
  time timestamp,
  elb string
)
USING parquet
LOCATION 's3://[sample_location]/data/elb-spark-parquet/';
  1. Attempt to create a materialized view on this table, aggregating count over time:
CREATE MATERIALIZED VIEW mys3.default.elb_logs_metrics_3
AS
SELECT
  window.start AS `start.time`,
  COUNT(*) AS count
FROM mys3.default.elb_logs_parquet
GROUP BY TUMBLE(`time`, '1 Minutes')
WITH (
  auto_refresh = true,
  checkpoint_location = "s3://[sample_location]/data/checkpoint/job-3",
  watermark_delay = '30 Minutes'
);
  1. Observe that the view creation is reported as successful
  2. Attempt to query the view:
SELECT * FROM mys3.default.elb_logs_metrics_3;

The query fails with the error:

-- Fail to analyze query. Cause: Table or view not found: mys3.default.elb_logs_metrics_3; line 1 pos 14; 'Project [*] +- 'UnresolvedRelation [mys3, default, elb_logs_metrics_3], [], false 
  1. Observe that if you try to recreate the view with the same query as above, it fails with the error:
Fail to write result, cause: Flint index elb_logs_metrics_3 already exists
  1. However, if you try to drop the view:
DROP TABLE mys3.default.elb_logs_metrics_3;

You get:

Fail to analyze query. Cause: Table or view not found: mys3.default.elb_logs_metrics_3; line 1 pos 11; 'DropTable false, false +- 'UnresolvedTableOrView [mys3, default, elb_logs_metrics_3], DROP TABLE, true 

After this, I'm not aware of any way to break out of the loop. You need to restart with a new view name and try again. (Incidentally, that's why we're on elb_logs_metrics_3.)

What is the expected behavior?
Either MV creation should fail gracefully, or DROP TABLE should work on the faulty created MV.

What is your host/environment?

  • EMR

Do you have any screenshots?
N/A

Do you have any additional context?
N/A

[FEATURE] Support COPY operation

Feature - COPY

Overview

OpenSearch is the search and analytics suite powering popular use cases such as application search, log analytics, and more. (1) Users use the _bulk indexing API to ingest and index. The current _bulk indexing API places a high configuration burden on users today to avoid RejectedExecutionException due to TOO_MANY_REQUESTS. (2) While OpenSearch is part of critical business and application workflows, it is seldom used as a primary data store because there are no strong guarantees on data durability as the cluster is susceptible to data loss in case of hardware failure. In this document, we propose providing a solution to let customers manage raw data on a highly reliable object storage (e.g. S3), then use the COPY command to transfer data to OpenSearch at any time.

COPY

SYNTAX

LOAD DATA *index**-name* 
FROM *data_source* LOCATION *location*
[ COMPRESSION *file-compression* ] 
[ DATA FORMAT *data**-**format* ] [ *parameter* [ *argument* ] [, ... ] ]
[ AUTO ON | OFF [ notification ]]

Overview

You can perform a COPY operation with as few as three parameters: a index name, a data source and a location.
OpenSearch COPY command enable you to load data in several data formats from multiple data sources, control access to load data, manage data transformations, and manage the load operation.

Index name

The name of the index for the COPY command. The index must already exist in the OpenSearch. The COPY command appends the new input data to any existing docs in the index.

FROM data_source LOCATION location

data source

The data source must already exist in OpenSearch. More reading Datasource Metadata Management.

location - Amazon S3

E.g. object path to load data from Amazon S3.

["s3://objectpath"]

File compression

File compression parameters

  • BZIP2, A value that specifies that the input file or files are in compressed bzip2 format (.bz2 files). The COPY operation reads each compressed file and uncompresses the data as it loads.
  • GZIP, A value that specifies that the input file or files are in compressed gzip format (.gz files). The COPY operation reads each compressed file and uncompresses the data as it loads.

Data format

You can load data from text files in fixed-width, character-delimited, comma-separated values (CSV), or JSON format, or from Avro files.

JSON

The source data is in JSON format. The JSON data file contains a set of objects. COPY load each JSON object into index as a doc. Order in a JSON object doesn't matter. Internally, engine use _bulk api to index JSON object.
For each error, OpenSearch records a row in the STL_LOAD_ERRORS system table. The LINE_NUMBER column records the last line of the JSON object that caused the error.

AUTO

If AUTO is set to true, the OpenSearch COPY operation will automatically detect any newly added objects and index them automatically.
User could enable Amazon S3 event notification, then instead of pulling new data regularly, the COPY operation can pull objects after receiving the notification.

Usage

Load data from Amazon S3 into logs index.

LOAD DATA logs
FROM myS3 LOCATION "s3://my_http_logs"
COMPRESSION gzip
DATA FORMAT json

Solution

Leverage opensearch-project/sql#948.

[BUG] Missing errors when connecting to a Parquet table with incorrect types

What is the bug?
If accidentally attempting to make a table connecting to Parquet where a type is mismatched, there is no useful error message on either table creation or querying.

How can one reproduce the bug?
Steps to reproduce the behavior:

  1. Create a parquet file with some data that has a mismatched type, in my case there was a time field that I expected to be a string but it was an int
  2. Create a table connected to that file
CREATE EXTERNAL TABLE IF NOT EXISTS sample_s3.default.elb_logs_parquet (
  type string,
  time string,
  elb string
)
USING parquet
LOCATION 's3://[sample-location]/data/truncated-elb-parquet/';
  1. At this point, the query should fail, but it returns a success, so we continue by trying to query it:
SELECT * FROM sample_s3.default.elb_logs_parquet;
  1. This fails with the error: Fail to write result, cause: null. If you dig deeper into the EMR logs, you find the error,
org.apache.spark.sql.execution.QueryExecutionException: Parquet column cannot be converted in file s3://[location]/data/truncated-elb-parquet/logs.parquet. Column: [time], Expected: string, Found: INT64

What is the expected behavior?
Table validation should fail on creation, but barring that, there should at least be a helpful error returned by the API if this happens.

What is your host/environment?

  • AWS EMR

Do you have any screenshots?
image

Do you have any additional context?
N/A

[FEATURE]nativity support PPL inside spark

Is your feature request related to a problem?

The purpose of this issue is to propose an alternative for allowing to query spark using PPL query language.
The next concepts are the main purpose of introduction this functionality:

  • Transforming PPL to become OpenSearch default query language (specifically for logs/traces/metrics signals)
  • Promoting PPL as a viable candidate for the proposed CNCF Observability universal query language.
  • Seamlessly Interact with different datasources (S3 / Prometheus / data-lake) from within OpenSearch
  • Improve and promote PPL to become extensible and general purpose query language to be adopted by the community

Acknowledging spark is an excellent conduit for promoting these goals and showcasing the capabilities of PPL to interact & federate data across multiple sources and domains.

Another byproduct of introducing PPL on spark would be the much anticipated JOIN capability that will emerge from the usage of Spark compute engine.

What solution would you like?

For PPL to become a library which has a simple and easy means of importing and extending, PPL client (the thin API layer) which can interact and provide a generic query composition framework to be used in any type of application independently of OpenSearch plugins.

PPL endpoint

As depicted in the above image, the protocol & AST (antler based language traversals ) verticals should be detached and composed into a self sustainable component that can be imported regardless of OpenSearch plugins.


PPL On Spark

Running PPL on spark is a goal for allowing simple adoption of PPL query language and also for simplifying the Flint project to allow visualization for federated queries using the Observability dashboards capabilities.

Background

In Apache Spark, the DataFrame API serves as a programmatic interface for data manipulation and queries, allowing the construction of complex operations using a chain of method calls. This API can work in tandem with other query languages like SQL or PPL.

For instance, if you have a PPL query and a translator, you can convert it into DataFrame operations to generate an optimized execution plan. Spark's underlying Catalyst optimizer will convert these DataFrame transformations and actions into an optimized physical plan executed over RDDs or Datasets.

The following section describes the two main options for translating the PPL query (using the logical plan) into the spark corespondent component (either dataframe API or spark logical plan)

Translation Process

Using Dataframe API

The following PPL query:
search source=t'| where a=1

Would produce the next dataframe set of API method chaining:

// Equivalent of 'search source=t' 
val df = spark.read.format("some_format").load("some_path") 
// Equivalent of '| where a=1' 
val filteredDf = df.filter("a = 1") filteredDf.show()

The following PPL query:
source=t | stats count(a) by b

Would produce the next dataframe set of API method chaining:

// Equivalent of 'source=t'
val df = spark.read.format("some_format").load("some_path") 
// Equivalent of 'stats count(a) by b' 
val aggregatedDf = df.groupBy("b").agg(count("a")) 

Using Catalyst Logical Plan Grammar
Another Option for translation would be using the Catalyst Grammar for directly translating the Logical plan steps
Here is an example of such translation outcome:

Our goal would be translating the PPL into the Unresolved logical plan so that the Analysis phase would behave in the similar manner to the SQL originated query.

spark execution process

The following PPL query:
search source=t'| where a=1

Translates into the PPL logical plan:
Relation(tableName=t, alias=null), Compare(operator==, left=Field(field=a, fieldArgs=[]), right=1)

Would be transformed into the next catalyst Plan:

// Create an UnresolvedRelation for the table 't'
val table = UnresolvedRelation(TableIdentifier("t"))
// Create an EqualTo expression for "a == 1" 
val equalToCondition = EqualTo(UnresolvedAttribute("a"), ..Literal(1))
// Create a Filter LogicalPlan
val filterPlan = Filter(equalToCondition, table) 

The following PPL query:
source=t | stats count(a) by b

Would produce the next PPL Logical Plan":

Aggregation(aggExprList=[Alias(name=count(a), delegated=count(Field(field=a, fieldArgs=[])), alias=null)], 
sortExprList=[], groupExprList=[Alias(name=b, delegated=Field(field=b, fieldArgs=[]), alias=null)], span=null, argExprList=[Argument(argName=partitions, value=1), Argument(argName=allnum, value=false), Argument(argName=delim, value= ), Argument(argName=dedupsplit, value=false)], child=[Relation(tableName=t, alias=null)])

Would be transformed into the next catalyst Plan:

// Create an UnresolvedRelation for the table 't'
 val table = UnresolvedRelation(TableIdentifier("t"))
 // Create an Alias for the aggregation expression 'count(a)' 
val aggExpr = Alias(Count(UnresolvedAttribute("a")), "count(a)")() 
// Create an Alias for the grouping expression 'b' 
val groupExpr = Alias(UnresolvedAttribute("b"), "b")() 
// Create an Aggregate LogicalPlan val aggregatePlan = Aggregate(Seq(groupExpr), Seq(groupExpr, aggExpr), table) 

Design Options

In general when translating between two query languages we have the following options:

1) Source Grammar Tree To destination Dataframe API Translation
This option uses the syntax tree to directly translate from one language syntax grammar tree to the other language (dataframe) API thus eliminating the parsing phase and creating a strongly validated process that can be verified and tested with high degree of confidence.

Advantages :

  • Simpler solution to develop since the abstract structure of the query language is simpler to transform into compared with other transformation options. -using the build-in traverse visitor API
  • Optimization potential by leveraging the specific knowledge of the actual original language and being able to directly use specific grammar function and commands directly.

Disadvantages :

  • Fully depended on the Source Code of the target language including potentially internal structure of its grammatical components - In spark case this is not a severe disadvantage since this is a very well know and well structured API grammar.
  • Not sufficiently portable since this api is coupled with the

2) Source Logical Plan To destination Logical Plan (Catalyst) [Preferred Option]
This option uses the syntax tree to directly translate from one language syntax grammar tree to the other language syntax grammar tree thus eliminating the parsing phase and creating a strongly validated process that can be verified and tested with high degree of confidence.

Once the target plan is created - it can be analyzed and executed separately from the translations process (or location)

  SparkSession spark = SparkSession.builder()
                .appName("SparkExecuteLogicalPlan")
                .master("local")
                .getOrCreate();

        // catalyst logical plan - translated from PPL Logical plan
        Seq<NamedExpression> scalaProjectList = //... your project list
        LogicalPlan unresolvedTable = //... your unresolved table
        LogicalPlan projectNode = new Project(scalaProjectList, unresolvedTable);

        // Analyze and execute
        Analyzer analyzer = new Analyzer(spark.sessionState().catalog(), spark.sessionState().conf());
        LogicalPlan analyzedPlan = analyzer.execute(projectNode);
        LogicalPlan optimizedPlan = spark.sessionState().optimizer().execute(analyzedPlan);

        QueryExecution qe = spark.sessionState().executePlan(optimizedPlan);
        Dataset<Row> result = new Dataset<>(spark, qe, RowEncoder.apply(qe.analyzed().schema()));

Advantages :

  • A little more complex develop compared to the first option but still relatively simple since the abstract structure of the query language is simpler to transform into another’s language syntax grammar tree

  • Optimization potential by leveraging the specific knowledge of the actual original language and being able to directly use specific grammar function and commands directly.

Disadvantages :

  • Fully depended on the Source Code of the target language including potentially internal structure of its grammatical components - In spark case this is not a severe disadvantage since this is a very well know and well structured API grammar.
  • Add the additional phase for analyzing the logical plan and generating the physical plan and the execution part itself.

3) Source Grammar Tree To destination Query Translation
This option uses the syntax tree to from the original query language into the target query (SQL in our case). This is a more generalized solution that may be utilized for additional purposes such as direct query to an RDBMS server.

Advantages :

  • A general purpose solution that may be utilized for other SQL compliant servers

Disadvantages :

  • This is a more complicated use case since it requires additional layer of complexity to be able to correctly translate the original syntax tree to a textual representation of the outcome language that has to be parsed and verified
  • SQL plugin already support SQL so its not clear what is the advantage of translating PPL back to SQL since our plugin already supports SQL out of the box.

Do you have any additional context?

[RFC] Add support for PPL Correlation Language and engine

PPL Correlation Command

Overview

In the past year OpenSearch Observability & security teams have been busy with many aspects of improving data monitoring and visibility.
The key idea behind our work was to enable the users to dig in their data and emerge the hidden insight within the massive corpus of logs, events and observations.

One fundamental concept that will help and support this process is the ability to correlate different data sources according to common dimensions and timeframe.
This subject is well documented and described and this RFC will not dive into the necessity of the correlation (appendix will refer to multiple resources related) but for the structuring of the linguistic support for such capability .

Problem definition

In the appendix I’ll add some formal references to the domain of the problem both in Observability / Security, but the main takeaway is that such capability is fundamental in the daily work of such domain experts and SRE’s.
The daily encounters with huge amount of data arriving from different verticals (data-sources) which share the same time-frames but are not synchronized in a formal manner.

The correlation capability to intersect these different verticals according to the timeframe and the similar dimensions will enrich the data and allow the desired insight to surface.

Example
Lets take the Observability domain for which we have 3 distinct data sources
- Logs
- Metrics
- Traces

Each datasource may share many common dimensions but to be able to transition from one data-source to another its necessary to be able to correctly correlate them.
According to the semantic naming conventions we know that both logs, traces and metrics

Lets take the following examples:

Log

{
  "@timestamp": "2018-07-02T22:23:00.186Z",
  "aws": {
    "elb": {
      "backend": {
        "http": {
          "response": {
            "status_code": 500
          }
        },
        "ip": "10.0.0.1",
        "port": "80"
      },
      ...
     "target_port": [
        "10.0.0.1:80"
      ],
      "target_status_code": [
        "500"
      ],
      "traceId": "Root=1-58337262-36d228ad5d99923122bbe354",
      "type": "http"
    }
  },
  "cloud": {
    "provider": "aws"
  },
  "http": {
    "request": {
    ...
  },
  "communication": {
    "source": {
      "address": "192.168.131.39",
      "ip": "192.168.131.39",
      "port": 2817
    }
  },
  "traceId": "Root=1-58337262-36d228ad5d99923122bbe354"
}

This is an AWS ELB log arriving from a service residing on aws.
It shows that a backend.http.response.status_code was 500 - which is an error.

This may come up as part of a monitoring process or an alert triggered by some rule. Once this is identified, the next step would be to collect as much data surrounding this event so that an investigation could be done in the most Intelligent and thorough way.

The most obviously step would be to create a query that brings all data related to that timeframe - but in many case this is too much of a brute force action.

Data may be too large to analyze and would result in spending most of the time only filtering the none-relevant data instead of actually trying to locate the root cause of the problem.

Suggest Correlation command

The next approach would allow to search in a much fine-grained manner and further simplify the analysis stage.

Lets review the known facts - we have multiple dimensions that can be used to correlate data data from other sources:

  • IP - "ip": "10.0.0.1" | "ip": "192.168.131.39"

  • Port - "port": 2817 | "target_port": "10.0.0.1:80"

So assuming we have the additional traces / metrics indices available and using the fact that we know our schema structure (see appendix with relevant schema references) we can generate a query for getting all relevant data bearing these dimensions during the same timeframe.

Here is a snipped of the trace index document that has http information that we would like to correlate with:

{
  "traceId": "c1d985bd02e1dbb85b444011f19a1ecc",
  "spanId": "55a698828fe06a42",
  "traceState": [],
  "parentSpanId": "",
  "name": "mysql",
  "kind": "CLIENT",
  "@timestamp": "2021-11-13T20:20:39+00:00",
  "events": [
    {
      "@timestamp": "2021-03-25T17:21:03+00:00",
       ...
    }
  ],
  "links": [
    {
      "traceId": "c1d985bd02e1dbb85b444011f19a1ecc",
      "spanId": "55a698828fe06a42w2",
      },
      "droppedAttributesCount": 0
    }
  ],
  "resource": {
    "service@name": "database",
    "telemetry@sdk@name": "opentelemetry",
    "host@hostname": "ip-172-31-10-8.us-west-2.compute.internal"
  },
  "status": {
    ...
  },
  "attributes": {
    "http": {
      "user_agent": {
        "original": "Mozilla/5.0"
      },
      "network": {
         ...
        }
      },
      "request": {
         ...
        }
      },
      "response": {
        "status_code": "200",
        "body": {
          "size": 500
        }
      },
      "client": {
        "server": {
          "socket": {
            "address": "192.168.0.1",
            "domain": "example.com",
            "port": 80
          },
          "address": "192.168.0.1",
          "port": 80
        },
        "resend_count": 0,
        "url": {
          "full": "http://example.com"
        }
      },
      "server": {
        "route": "/index",
        "address": "192.168.0.2",
        "port": 8080,
        "socket": {
         ...
        },
        "client": {
         ...
         }
        },
        "url": {
         ...
        }
      }
    }
  }
}

In the above document we can see both the traceId and the http’s client/server ip that can be correlated with the elb logs to better understand the system’s behaviour and condition .

New Correlation Query Command

Here is the new command that would allow this type of investigation :

source alb_logs, traces | where alb_logs.ip="10.0.0.1" AND alb_logs.cloud.provider="aws"|
correlate exact fields(traceId, ip) scope(@timestamp, 1D) mapping(alb_logs.ip = traces.attributes.http.server.address, alb_logs.traceId = traces.traceId )

Lets break this down a bit:

1. source alb_logs, traces allows to select all the data-sources that will be correlated to one another

2. where ip="10.0.0.1" AND cloud.provider="aws" predicate clause constraints the scope of the search corpus

3. correlate exact fields(traceId, ip) express the correlation operation on the following list of field :

- ip has an explicit filter condition so this will be propagated into the correlation condition for all the data-sources
- traceId has no explicit filter so the correlation will only match same traceId’s from all the data-sources

The fields names indicate the logical meaning the function within the correlation command, the actual join condition will take the mapping statement described bellow.

The term exact means that the correlation statements will require all the fields to match in order to fulfill the query statement.

Other alternative for this can be approximate that will attempt to match on a best case scenario and will not reject rows with partially match.

Addressing different field mapping

In cases where the same logical field (such as ip ) may have different mapping within several data-sources, the explicit mapping field path is expected.

The next syntax will extend the correlation conditions to allow matching different field names with similar logical meaning
alb_logs.ip = traces.attributes.http.server.address, alb_logs.traceId = traces.traceId

It is expected that for each field that participates in the correlation join, there should be a relevant mapping statement that includes all the tables that should be joined by this correlation command.

Example**:**
In our case there are 2 sources : alb_logs, traces
There are 2 fields: traceId, ip
These are 2 mapping statements : alb_logs.ip = traces.attributes.http.server.address, alb_logs.traceId = traces.traceId

Scoping the correlation timeframes

In order to simplify the work that has to be done by the execution engine (driver) the scope statement was added to explicitly direct the join query on the time it should scope for this search.

scope(@timestamp, 1D) in this example, the scope of the search should be focused on a daily basis so that correlations appearing in the same day should be grouped together. This scoping mechanism simplifies and allows better control over results and allows incremental search resolution base on the user’s needs.

Diagram
These are the correlation conditions that explicitly state how the sources are going to be joined.
[Image: Screenshot 2023-10-06 at 12.23.59 PM.png]* * *

Supporting Drivers

The new correlation command is actually a ‘hidden’ join command therefore the only following PPL drivers support this command:

  • ppl-spark
    In this driver the correlation command will be directly translated into the appropriate Catalyst Join logical plan

Example:
source alb_logs, traces, metrics | where ip="10.0.0.1" AND cloud.provider="aws"| correlate exact on (ip, port) scope(@timestamp, 2018-07-02T22:23:00, 1 D)

Logical Plan:

'Project [*]
+- 'Join Inner, ('ip && 'port)
   :- 'Filter (('ip === "10.0.0.1" && 'cloud.provider === "aws") && inTimeScope('@timestamp, "2018-07-02T22:23:00", "1 D"))
      +- 'UnresolvedRelation [alb_logs]
   +- 'Join Inner, ('ip && 'port)
      :- 'Filter (('ip === "10.0.0.1" && 'cloud.provider === "aws") && inTimeScope('@timestamp, "2018-07-02T22:23:00", "1 D"))
         +- 'UnresolvedRelation [traces]
      +- 'Filter (('ip === "10.0.0.1" && 'cloud.provider === "aws") && inTimeScope('@timestamp, "2018-07-02T22:23:00", "1 D"))
         +- 'UnresolvedRelation [metrics]

Catalyst engine will optimize this query according to the most efficient join ordering.


Appendix

Derived dataset auto refresh benchmark

Goals

  • Size the cluster to auto refresh derived dataset
  • Latency of auto refresh derived dataset.
  • Cost of auto refresh derived dataset

Test Plan

Dimensions

  • Queries
    • Auto refresh Skipping Index
    • Auto refresh Covering Index
    • Auto refresh Materialized View
  • Dataset. http_logs dataset
  • Ingestion
    • Ingestion traffic pattern.
      • StreamingIngestion, 10 file per 10 seconds, file size is 1MB (24MB uncompressed) - streaming ingestion
      • BatchIngestion, 10 file per 30 minutes, file size is 100MB - log case
  • Configuration
    • streaming source model
      • pull mode: MicroBatch pull files from s3.
      • push mode: MicroBatch read from SQS/SNS. (TBD)
    • configure job with dynamicAllocation enable / disable
    • configure job with different executors configuration
      • executors = 3 (default)
      • executors = 10
      • executors = 30
    • spark streaming job interval
      • default
      • 10mins

Measurement

  • Latency: p90 and p75 of Latency. Latency is the time difference between the moment of data production at the source (PUT object on S3) and the moment that the data has produced an output.
  • Cost: p90 and p75 Billed resource utilization

Latency

We measure the event-time latency.
For skipping index. We define event-time latency to be the interval between a file’s event-time and its emission time from the output operator.

  1. The generator append eventTime to filename.
  2. streaming system calculate latency = processTime - extractTimeFromFileName.
    Screenshot 2023-09-20 at 11 12 06 AM

For covering index. We define event-time latency to be the interval between a file’s event-time and its emission time from the output operator.

  1. The generator append eventTime for each tuple.
  2. streaming system calculate latency = processTime - eventTime.

For MV. We define event-time latency to be the interval between a tuple’s event-time and its emission time from the output operator.

  1. The generator append eventTime for each tuple.
  2. streaming system re-calculate eventTime = max(eventTime contribute to the window).
  3. latency = processingTime - eventTime
Screen Shot 2022-11-17 at 3 52 49 PM

[FEATURE] Checkpoint folder data cleanup limitation

Is your feature request related to a problem?

Currently Flint pass the exact same checkpoint location path given by user to Spark. It's user responsibility to:

  1. Make sure the checkpoint location is used by only 1 Flint index refreshing
  2. Clean up the checkpoint folder after Flint index drop

What solution would you like?

Flint cleans up the folder when user drops index. Add this to limitation doc if no time to improve this in 0.1.1 release.

[FEATURE] Add Flint config for mandatory checkpoint location option

Is your feature request related to a problem?

Currently checkpoint location option is optional or mandatory based on Spark stream requirement. In some case, Flint library user may want to make checkpoint mandatory for strong resilience.

What solution would you like?

Add a new Flint configuration spark.flint.index.checkpoint.mandatory with default true.

What alternatives have you considered?

Make checkpoint location option required without flexibility.

[FEATURE] Push mode streaming support

Is your feature request related to a problem?

Currently, the refreshing of the Flint index is dependent on "polling" within the Spark FileStreamSource operator. This approach can potentially lead to performance issues, especially when dealing with a source table containing a substantial number of partitions and files.

What solution would you like?

The proposal is to allow user provide SNS topic for S3 data source. In this way, the streaming execution can find out "delta" (changed file list) efficiently.

Questions to think about:

  1. Is this option provided on source table or Flint index DDL statement?
  2. Do we only handle new changes via notification or we can also load cold data?

What alternatives have you considered?

Provide some way for user to refresh source table metadata periodically. But need to figure out how-to because:

  1. Spark Hive table: MSTK REPAIR statement works for this purpose but Hive table doesn't support Spark structured streaming
  2. Spark data source table: as aforementioned, FileStreamSource polls S3 file list

Do you have any additional context?

N/A

[FEATURE] Covering index and materialized view refresh idempotency

Is your feature request related to a problem?

Skipping index incremental refresh makes use of unique file path to generate OpenSearch doc id for idempotency. Covering index and materialized view needs similar mechanism to deduplicate once streaming job restart.

What solution would you like?

TBD:

  1. May provide index option to ask user to provide column or multiple columns to generate ID
  2. By default may use current file path and timestamp column (if any) to generate ID

[RFC] Automatic Workload-Driven Query Acceleration by OpenSearch

Is your feature request related to a problem?

In a database engine, there are different ways to optimize query performance. For instance, rule-based/cost-based optimizer and distributed execution layer tries to find best execution plan by cost estimate and equivalent transformation of query plan. Here we're proposing an alternative approach which is to accelerate query execution by materialized view for time-space tradeoff.

What solution would you like?

Architecture

Here is a reference architecture that illustrates components and the entire workflow which essentially is a workload-driven feedback loop:

  1. Input: Query plan telemetry collected
  2. Generating feedback: Feeding it into a workload-driven feedback generator
  3. Output: Feedback for optimizer to rewrite query plan in future

Basically, feedback is referring to various materialized view prebuilt (either online or offline) which hints acceleration opportunity to query optimizer.

AutoMV (1) (1)

There are 2 areas and paths moving forward for both of which lack open source solutions:

  • OpenSearch engine acceleration: accelerate DSL or SQL/PPL engine execution
  • MPP/Data Lake engine acceleration: accelerate Spark, Presto, Trino

General Acceleration Workflow

1.Workload Telemetry Collecting

Collect query plan telemetry generated in query execution and emit it as feedback generation input.

  • Query Plan Telemetry: Specifically, query plan telemetry means statistics collected on each physical node (sub-query or sub-expression) when execution. Generally, the statistics include input/output size, column cardinality, running time etc. Eventually logical plan is rewritten to reuse materialized view, so the statistics in execution may need to be linked to logical plan before emitting telemetry data.
  • Challenge: Efforts required in this stage depends on to what extent the query engine is observable and how easy telemetry can be collected.

2.Workload Telemetry Preprocessing

Preprocess query plan telemetry into uniform workload representation.

  • Workload Representation: uniform workload representation decouples subsequent stages from specific telemetry data format and store.

3.View Selection

Analyze workload data and select sub-query as materialization candidate according to view selection algorithm.

  • Algorithm
    • View selection algorithm can be heuristic rule, such as estimate high performance boost and low materialization cost, or by more complex learning algorithm.
    • Alternatively the selection can be manually done by customers with access to all workload statistics.
    • In between is giving acceleration suggestion by advisor and allow customer intervene to change the default acceleration strategy.
  • Select Timing
    • Online: analyze and select view candidate at query processing time which benefits interactive/ad-hoc queries
    • Offline: shown as in figure above
  • Challenge: Automatic workload analysis and view selection is challenging and may require machine learning capability. Simple heuristic rules mentioned above may be acceptable. Alternative options include view recommendation advisor or manual selection by customers.

4.View Materialization and Maintenance

Materialize selected view and maintain the consistency between source data and materialized view data, by incrementally refreshing for example.

  • Materialized View: is a database object that contains the results of a query. The result may be subset of a single table or multi-table join, or may be a summary using an aggregate function
    • Query Result Cache
      • Full Result: MV that stores entire result set and can only be reused by same deterministic query
      • Intermediate Result: MV that stores result for a subquery (similar as Covering Index if filtering operation only)
    • Secondary Index
      • Data Skipping Index: MV that stores column statistics in coarse-grained way, Small Materialized Aggregate, and thus skip those impossible to produce a match. Common SMA includes Min-Max, Bloom Filter, Value List.
      • Covering Index: MV that stores indexed column value(s) and included column value(s) so that index itself can answer the query and no need to access original data. Common index implementation includes B-tree family, Hash, Z-Ordering index.
    • Approximate Data Structure
  • Materialization Timing
    • Ingestion Time: for a view defined and materialized at ingestion time, it can be “registered” to Selected View table in figure above (ex. by DDL CREATE MV). In this way the query acceleration framework can take care of query plan optimization
      • Parquet: min-max SMA
      • S3 query federation: user defined transformation as final resulting MV. More details in opensearch-project/sql#595
      • Loki: labels as skipping hash index
      • DataSketch: approximate data structure
    • Online (Query Processing Time): add materialization overhead to first query in future
    • Offline: shown as in figure above
  • Challenge: To ensure consistency, the materialized view needs to be in sync with source data. Without real-time notification to refresh or invalidate, hybrid scan or similar mechanism is required to reuse partial stale materialized view.

5.Query Plan Optimization

At last, query optimizer checks the existing materialized view and replace original operator with scan on materialized view.

  • View Matching: match sub-query with materialized view
  • Query Rewrite: replace query plan node with materialized view scan operator

[FEATURE]Support existing Index usage in Flint

Is your feature request related to a problem?
As a flint user, I'd like to use existing indices / index-templates for using as the index targets of the flint accelerated tables

What solution would you like?
Use existing index name to create the acceleration process - this will actually not create an index but use the given name as the target of the acceleration ETL store.

The next SQL syntax suggested:

CREATE (SKIPPING/COVERING/MV) INDEX
ON alb_logs USING ss4o_logs-elb-prod
WITH (
  auto_refresh = true,
  refresh_interval = '1 minute',
  checkpoint_location = 's3://test/'
)

Would initiate the acceleration ETL sync process without actually creating a new index in OpenSearch, it will use the ss4o_logs-elb-prod index (index template) as the data store for the acceleration content.

It may validate the following:

  • match source table (catalog) columns exist in the given index_mapping
  • match the index has an alias with the correct flint naming convention
  • match the index_template (mapping) has the correct covering - matadata fields exist (covering / MV columns)

Do you have any additional context?
Using existing SS4O schema definition

Add data skipping index support

Flint Skipping Index Detailed Design

The detailed design is still in progress and subject to change. Once finalized, this will be added to dev docs.

1.Flint Skipping Index

What are provided:

  • Flint skipping index provides user-friendly SQL support as well as low level API support
  • Skipping index can be auto refreshed and guarantee eventual consistency

Constraints:

  • Every base table can only have 1 skipping index
  • Skipping index cannot be altered after create

2.User Interfaces

2.1 SQL

Flint configuration:

USING FLINT    # TBD
PROPERTIES (
  OPENSEARCH = "https://my-opensearch.com"
)

Flint skipping index SQL support provides CREATE, DESCRIBE, DROP statement. For CREATE statement, user can provide filter predicate to improve index maintenance and storage cost.

CREATE SKIPPING INDEX
ON <object>
FOR COLUMNS ( column <index_type> [, ...] )
WHERE <filter_predicate>

DESCRIBE SKIPPING INDEX ON <object>

DROP SKIPPING INDEX ON <object>

<object> ::= [db_name].[schema_name].table_name

Skipping index type includes:

<index_type> ::= { <bloom_filter>, <min_max>, <value_list> }

<bloom_filter> ::= BLOOM_FILTER( bitCount, numOfHashFunctions ) #TBD
<min_max> ::= MIN_MAX
<value_list> ::= VALUE_LIST

Example:

CREATE SKIPPING INDEX ON alb_logs
FOR COLUMNS (
  client_ip BLOOM_FILTER,
  elb_status VALUE_LIST
)
WHERE time > '2023-04-01 00:00:00'

DESCRIBE SKIPPING INDEX ON alb_logs

DROP SKIPPING INDEX ON alb_logs

2.2 API

object FlintSpark(conf: Map[String, String]):

  def createIndex(index: FlintIndex)
  
  def describeIndex(indexName: String)
  
  def dropIndex(indexName: String)
  
  def explain(query: DataFrame) <-- detailed why/whyNot index applied

Example (corresponding to SQL example above):

# Initialize Flint object
val conf = { "opensearch.location": "localhost:9200" }
val flint = FlintSpark(conf)

# Create and manage index lifecycle
flint.createIndex(
  FlintSkippingIndex(
    tableName="alb_logs",
    filterPred="time > 2023-04-01 00:00:00",
    indexedColumns=[
      BloomFilterSketch("client_ip"),
      ValueListSketch("elb_status")
    ]
  )
)

flint.describeIndex(FlintSkippingIndex.onTable("alb_logs"))

flint.dropIndex(FlintSkippingIndex.onTable("alb_logs"))

val query = spark.table("alb_logs").filter("client_ip = '10.1.2.3'")
flint.explain(query)

3.Detailed Design

3.1 Components

Screenshot 2023-05-01 at 9 58 54 AM

3.2 Core Classes

Screenshot 2023-05-01 at 11 55 48 AM

3.3 Class Interactions

TODO: sequence diagram

[FEATURE] Support IF NOT EXISTS in create index statement

Is your feature request related to a problem?

Currently Flint index already exist error happens if Spark application job with CREATE statement restarts. This is inconvenient for user to restart streaming job.

What solution would you like?

For example, support CREATE SKIPPING INDEX IF NOT EXISTS ... so user can simply restart the application.

What alternatives have you considered?

Require user to submit REFRESH statement to restart streaming job

Do you have any additional context?

N/A

[FEATURE] Add index refresh job management commands

Is your feature request related to a problem?

Currently, there is no dedicated commands for index refresh management. For restart, we overload CREATE IF NOT EXISTS to serve this purpose. However, this caused problem like there is no way to differentiate create or restart.

What solution would you like?

Proposed solution is to provide dedicated commands as below (spec will be finalized later):

SHOW INDEX JOBS [ON tableName]

Description: Show Flint index info including properties. This is like an enhanced SHOW command with some DESC info for caller convenience.

# Example: Show all jobs
SHOW INDEX JOBS
Flint job name | kind | auto_refresh | properties
-----------------------------------------------------------
flint_http_logs_skipping_index | skipping | false | {}
flint_alb_logs_elb_and_requestUri_index | covering | true | { ... }
flint_alb_logs_elb_and_clientIp_index | covering | true | { ... }

# Example: Show specific job
SHOW INDEX JOBS ON alb_logs
Flint job name | kind | auto_refresh | properties
-----------------------------------------------------------
flint_alb_logs_elb_and_requestUri_index | true | { ... }
flint_alb_logs_elb_and_clientIp_index | true | { ... }

# Example: Show MV job
SHOW INDEX JOBS ON alb_logs_metrics
Flint job name | kind | auto_refresh | properties
-----------------------------------------------------------
flint_alb_logs_metrics | mv | true | { ... }

RECOVER INDEX JOB flintJobName

Description: Restart index streaming job. It's caller responsibility to make sure previous streaming job already stopped (because it maybe in other SparkContext).

# Examples: Recover skipping index job
RECOVER INDEX JOB flint_http_logs_skipping_index
success

# Examples: Recover covering index job
RECOVER INDEX JOB flint_alb_logs_elb_and_requestUri_index
success

# Examples: Recover MV job
RECOVER INDEX JOB flint_alb_logs_metrics
success

[BUG] Find correct catalog name in query rewriter

Is your feature request related to a problem?

In #48, there is limitation that Spark query rewriter lose the access to catalog name. There is no such information in logical plan probably because LogicalRelation already has concrete implementation of BaseRelation after analyzed.

This results in limitation that query rewriter always use current catalog name regardless of table name in FROM.

What solution would you like?

Unknown for now.

[BUG] Streaming job application exits unexpectedly

What is the bug?

Create index statement submitted via Spark application exits shortly before any index data refreshing.

What is the expected behavior?

Create index statement submitted via Spark application is expected to be long running. In Spark, this is done by StreamingQuery.awaitTermination(). However, Spark application executes create statement by spark.sql() without access to the StreamingQuery instance. Need to figure out how to get Spark application main thread hang and wait for all streaming job thread complete.

[BUG] Preserve dots in table name when generate index name

What is the bug?

Currently Flint replaces all dots in qualified table name with underscore. However, this doesn't align with PPL.

Flint:

https://github.com/opensearch-project/opensearch-spark/blob/main/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala#L91

PPL:

https://github.com/opensearch-project/sql/blob/main/spark/src/test/java/org/opensearch/sql/spark/flint/IndexQueryDetailsTest.java#L67

What is the expected behavior?

Modify the generate index name logic and align with PPL behavior.

[FEATURE] Incremental refresh index on Hive source table

Is your feature request related to a problem?

Currently Spark structured streaming only supports Spark Data Source table. For Hive table, it throws exception when starting streaming job.

What solution would you like?

Opt-1: Support streaming on Hive table either enhance stream source operator or convert Hive table internally (not sure if possible)
Opt-2: Give user clear message simply or how to create Spark DS table guide upon error. Related to #65

[BUG] Query error details should reflect the underlying bug.

What is the bug?
23/10/18 21:34:51 ERROR FlintJob: Fail to verify existing mapping or write result
org.apache.spark.sql.AnalysisException: Table default.http_logs_non_vpc already exists.

at org.apache.spark.sql.errors.QueryCompilationErrors$.tableAlreadyExistsError(QueryCompilationErrors.scala:1778) ~[spark-catalyst_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.sql.execution.command.CreateDataSourceTableCommand.run(createDataSourceTables.scala:57) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:104) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) ~[spark-catalyst_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) ~[spark-catalyst_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) ~[spark-sql_2.12-3.3.2-amzn-0.jar:0.1.0-SNAPSHOT]
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:101) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:97) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:626) ~[spark-catalyst_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:179) ~[spark-catalyst_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:626) ~[spark-catalyst_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31) ~[spark-catalyst_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]

Current execution throws generic error ERROR FlintJob: Fail to verify existing mapping or write result which is not reflective of
the underlying bug.

[BUG] Flint index name doesn't include catalog name

What is the bug?

Currently Flint index name (OpenSearch index) consists of Flint prefix, db name, index name and suffix. This may have conflict if OpenSearch is used by Spark with different catalog name (default name is spark_catalog but configurable) or multi-catalog enabled in a Spark session.

What is the expected behavior?

We can encode catalog name in Flint index name, such as:

  1. CREATE SKIPPING INDEX myS3.default.alb_logs => flint_myS3_default_alb_logs_skipping_index
  2. CREATE INDEX idx_elb ON myS3.default.alb_logs => flint_myS3_default_alb_logs_idx_elb_index
  3. CREATE MV myS3.default.alb_logs_metrics AS => flint_myS3_default_alb_logs_metrics

[FEATURE] Metrics Requirements - REPL

  • Statement executionTime
    • opt-1 REPL update lastUpdateTime field of statement doc when ever update statement state. client calculate executionTime = lastUpdateTime - submitTime.
    • opt-2 REPL update executionTime.

[BUG] CODEOWNERS is out of sync

What is the bug?

CODEOWNERS should match MAINTAINERS

Do you have any additional context?

Caught by some automation. Create a CODEOWNERS file with the list of maintainer aliases please.

[FEATURE] Add Flint config for env to persist in metadata

Is your feature request related to a problem?

In https://github.com/opensearch-project/opensearch-spark/pull/58/files, we hardcoded the runtime env variable name and persisted them to Flint metadata. The hardcoding makes Flint codebase aware of underlying runtime env like EMR-S and inflexible if we want to persist

What solution would you like?

Provide another Flint configuration spark.flint.index.metadata.env. Example: with --conf spark.flint.index.metadata.env=envKey1,envKey2,..., Flint extension will persist these env variables to metadata when create index.

Test Plan

Scalability Test

  1. Skipping index is includes 100K files

[FEATURE] Large backlog processing when materialized view cold start

Is your feature request related to a problem?

When auto refresh on materialized view starts, an initial source file list is generated by FileStreamSource. By default, it just returns files sorted by modified time. In the case of aggregate MV, if the timestamp of events in these file are not strictly aligned with file modified time, disordered (late coming) data may be dropped due to watermark bumped.

What solution would you like?

Provide source option for user to enable MV cold start processing backlog sorted by event timestamp in data rather than file modified time in file metadata.

What alternatives have you considered?

Guide user to give initial starting point by WHERE clause and meanwhile configure large watermark to avoid disorder data dropped. But this may consume more memory because of aggregate window open waiting for possible late data for long time.

[FEATURE] Flint index state management

  • CREATE INDEX, Spark streaming Job,
    1. CREATE NEW DOC in .query_execution_request_{mys3}, write docId into system.env.
    2. Flint-Extension, read system.env.

  type: streaming
    state = "running"
    jobId = “job1”
    appId = “app1”
    lastupdateTime = 0
  • RECOVER INDEX
    1. CREATE NEW DOC in .query_execution_request_{mys3}, write docId into system.env.
    2. Flint-Extension, read system.env.

    docID = FlintExt.state().docId()
    if (state == "running") {
      // occ
      updateState(indices, docId): {
        type: streaming
        state = "running"
        jobId = “job2”
        appId = “app2”
        lastupdateTime = 0
      }
    }
  • MANUAL REFRESH
    1. CREATE NEW DOC in .query_execution_request_{mys3}, write docId into system.env.
    2. Flint-Extension, read system.env.

    type: streaming
      state = "running"
      jobId = “job3”
      appId = “app3”
      lastupdateTime = 0

Refactoring Flint metadata and build index API

Code needs refactoring after materialized view added finally:

  • Finalize FlintMetadata and refactor serde code:
    • common code moved into flint-core and other left in flint-spark-integration
    • update Flint user reference doc accordingly
  • Refactor FlintSpark.createIndex which always uses foreach sink
    • MV build DataFrame doesn't use foreach sink
    • check if covering index can still foreach sink or not

[BUG] Show covering index runs into error when index is not present

What is the bug?
Show covering index runs into error when index is not present.
{"data":{"ok":true,"resp":{"status":"FAILED","error":"Fail to run query or write result"}}}

How can one reproduce the bug?
Steps to reproduce the behavior:

  1. Make sure a covering index is not present in table ex.alb_logs.
  2. Run SHOW INDEX ON myS3.default.alb_logs

What is the expected behavior?
This should return success with empty [] response.
Error/Failed state should only be sent when query execution runs into error.

[FEATURE] Improve validation for SQL statement

Is your feature request related to a problem?

Improve validation for SQL create statement:

  1. For DDL statement (create):
    a. Validate WITH options and report error if invalid given
    b. Check if given column is not supported by skipping/covering index, report error early instead of reporting when submitting DataFrame job at background
  2. For DML statement (show/desc/refresh), report error if given table name invalid

What solution would you like?

  • For 1a) WITH options, Add validate logic in FlintSparkIndexOptions
  • For 1b) streaming job, need to figure out how to validate it early
  • For 2, add IT to verify if this is the current behavior

[FEATURE] Support more options in create index statement

Is your feature request related to a problem?

Currently Flint streaming job uses Foreach Batch sink which creates temporary checkpoint folder by default. The checkpoint will be lost after restart.

What solution would you like?

Add another option in create statement to allow user configure where they want to store the checkpoint data. Without the location provided, we can assume user doesn't concern about this and stick with temporary checkpoint as before.

For example:

CREATE SKIPPING INDEX ON alb_logs
(
  client_ip VALUE_SET
)
WITH (
  auto_refresh = true,
  checkpoint_location = 's3://...'
)

What alternatives have you considered?

Use some default location of current database. For example, because we map data source concept to SparkSQL catalog, we may use the root folder of a S3 data source. But this needs user awareness and write permission.

Do you have any additional context?

N/A

[FEATURE] Skipping index and materialized view refresh synchronization

Is your feature request related to a problem?

Currently there is no coordination between skipping index and materialized view auto refresh. In the case of materialized view with filtering condition that can be accelerated by skipping index, the initial backlog processing should be fine. However, when MV refresh catches up, each micro batch of skipping index refresh on same source file list is not guaranteed to finish before MV. If hybrid scan is not enabled, skipping index may tell MV there is no file matched the filtering condition for current micro batch.

What solution would you like?

  1. Add basic IT to make sure MV refresh can be accelerated by skipping index
  2. Look into code to verify if the issue above may happen
  3. Quick solution would be enabling hybrid scan mode in this case

[BUG]Covering index not present inside query

What is the bug?

Query on S3 based data after defining a covering index doesn’t use the covering index during the execution ...

How to Reproduce
Created the next query based on the next table:

Table

"CREATE EXTERNAL TABLE mys3.default.http_logs (
   `@timestamp` TIMESTAMP,
    clientip STRING,
    request STRING, 
    status INT, 
    size INT, 
    year INT, 
    month INT, 
    day INT) 
USING json PARTITIONED BY(year, month, day) OPTIONS (path 's3://flint-data-dp-eu-west-1-beta/data/http_log/http_logs_partitioned_json_bz2/', compression 'bzip2')"

Covering Index

CREATE INDEX status_and_day
ON mys3.default.http_logs ( status, day )
WITH (
  auto_refresh = true,
  refresh_interval = '1 minute',
  checkpoint_location = 's3://flint-data-dp-eu-west-1-beta/data/http_log/checkpoint_status_and_day'
)

Query

SELECT
    day,
    status
FROM mys3.default.http_logs
WHERE status >= 400
GROUP BY day, status
LIMIT 100;

Explain Query

-- Explain:
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- CollectLimit 100
 +- HashAggregate(keys=[day#107, status#103], functions=[])
 +- Exchange hashpartitioning(day#107, status#103, 1000), ENSURE_REQUIREMENTS, [plan_id=75]
 +- HashAggregate(keys=[day#107, status#103], functions=[])
 +- Project [status#103, day#107]
 +- Filter (isnotnull(status#103) AND (status#103 >= 400))
 +- FileScan json default.http_logs[status#103,year#105,month#106,day#107] Batched: false, DataFilters: [isnotnull(status#103), (status#103 >= 400)], Format: JSON, Location: CatalogFileIndex(1 paths)[s3://flint-data-dp-eu-west-1-beta/data/http_log/http_logs_partitioned_j..., PartitionFilters: [], PushedFilters: [IsNotNull(status), GreaterThanOrEqual(status,400)], ReadSchema: struct<status:int>

What is the expected behavior?
I'm expecting the covering index rule to kick in and be part of the physical execution plan

Do you have any screenshots?
If applicable, add screenshots to help explain your problem.

Do you have any additional context?
Add any other context about the problem.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.