Code Monkey home page Code Monkey logo

Comments (1)

dai-chen avatar dai-chen commented on August 23, 2024

4.Implementation

4.1 High-Level DDL Command

class CreateSkippingIndexCommand(tableName, indexedCols, filterPred):

  def run(session):
    val skippingIndex = FlintSkippingIndex(
      tableName,
      indexedCols.map((col, indexType) -> (col, indexProviders.get(indexType)),
      filterPred)
    
    flintSpark.createIndex(skippingIndex)

4.2 Index Data Physical Layout

Consider partition index and file-level skipping index together:

  1. Denormalized: duplicate data
  2. Denormalized by nested field:
    1. Rely on nested field performance;
    2. Need to update nested field files until no new files coming
  3. Normalized by separate index:
    1. Rely on Spark/OS SQL join;
    2. Low-level Flint API needs aware of more than 1 index created

Denormalized example:

{
  "year": 2023,
  "month": 05,
  "day": 01,
  "hour": 01,
  "file": "s3://.../a.gz",
  "client_ip": {...},
  "elb_status": [200]
},
{
  "year": 2023,
  "month": 05,
  "day": 01,
  "hour": 01,
  "file": "s3://.../b.gz",
  "client_ip": {...},
  "elb_status": [200,404]
},
{
  "year": 2023,
  "month": 05,
  "day": 01,
  "hour": 01,
  "file": "s3://.../c.gz",
  "client_ip": {...},
  "elb_status": [200,503]
},
{
  "year": 2023,
  "month": 05,
  "day": 01,
  "hour": 02,
  ...
}

Denormalized by nested field example:

{
  "year": 2023,
  "month": 05,
  "day": 01,
  "hour": 01,
  "files": [ # Nested field
    {
      "file_path": "s3://.../a.gz",
      "client_ip": {...},  # BloomFilter field
      "elb_status": [200]  # Value list
    },
    {
      "file_path": "s3://.../b.gz",
      "client_ip": {...},
      "elb_status": [200,404]
    },
    {
      "file_path": "s3://.../c.gz",
      "client_ip": {...},
      "elb_status": [200,503]
    },
  ]
},
{
  "year": 2023,
  "month": 05,
  "day": 01,
  "hour": 02,
  ...
}

Normalized by separate index example:

# Partition index
{
  "year": 2023,
  "month": 05,
  "day": 01,
  "hour": 01,
  "files": ["1", "2", "3"]
},
{
  "year": 2023,
  "month": 05,
  "day": 01,
  "hour": 02,
  "files": ...
}

# File index
{
  "file_id": "1",
  "file_path": "s3://.../a.gz"
  "client_ip": {...},
  "elb_status": [200]
}
{
  "file_id": "2",
  "file_path": "s3://.../b.gz"
  "client_ip": {...},
  "elb_status": [200,404]
}
{
  "file_id": "3",
  "file_path": "s3://.../c.gz"
  "client_ip": {...},
  "elb_status": [200,503]
}

4.3 Low-Level API Implementation

trait FlintIndex {
  indexName: String
  metadata: FlintMetadata
  refreshJob: DataFrame
}

class FlintSkippingIndex(
  val tableName: String,
  val indexedCols: Seq[FlintSkippingIndexProvider],
  val filterPred: String
) extends FlintIndex {

  lazy val indexName: String = FlintSkippingIndex.onTable(tableName)

#  "derivedDataset": {
#    "kind": "SkippingIndex",
#    "properties": {
#      "filterCondition": "time > '2023-04-01 00:00:00'",
#      "indexedColumns": { <-- can be found in mapping schema?
#        "client_ip": "bloom_filter",
#        "elb_status": "value_list"
#      }
#    }
#  }
  lazy val metadata: FlintMetadata = {
    FlintMetadata(
       kind="SkippingIndex",
       properties: {
          "filterCondition": filterPred,
          "indexedColumns": tableCols
       },
       "source": tableName
     }
  }

# Equivalent as:
# -----------------------------------
# INSERT INTO alb_logs_skipping_index
# SELECT
#   year, month, day, hour,
#   input_file_name(),
#   BLOOM_FILTER(client_ip),
#   VALUE_LIST(elb_status)
# FROM alb_logs
# WHERE time > '2023-04-01 00:00:00'
# GROUP BY 
#   year, month, day, hour,
#   input_file_name()

  lazy val refreshJob(spark): DataFrame = {
    spark.
      .readStream
      .table(tableName)
      .filter(filterPred)
      .groupBy("input_file_name")
      .agg(
        indexedCols.map(sketch -> sketch.aggregator()))
  }
}
  
object FlintSkippingIndex {

  def getIndexName(tableName: String) = tableName + "-skipping-index"
}

Flint spark:

# Hold global config
object FlintSpark(
  spark: SparkSession,
  conf: Map[String, String],
  flintClient: FlintClient
) {

  def createIndex(index: FlintIndex) = {
    // create index with metadata
    flintClient.createIndex(index.metadata)
    
    // start refreshing data
    index.refreshJob(spark)
      .writeStream
      .queryName(index.indexName)
      .format("flint")
      .option("flint.indexName", index.indexName)
      .option(...) # Copy other options from Flint config?
      .outputMode("append")
      .start()
  }
  
  def describeIndex(indexName: String) {
    flintClient.getIndex(indexName)
  }
  
  def dropIndex(indexName: String) {
    flintClient.deleteIndex(indexName)
  }
  
  def explain(query: DataFrame) {
    // ???
  }
}

4.4 Query Rewrite Rule

class ApplyFlintSkippingIndex:

  def apply(plan):
    case filter @ Filter(pred, Relation(tableName)):
      # Option 1:
      spark.read.format("flint")
        .filter("""
          _meta.derivedDataset.kind = 'SkippingIndex'
          AND _meta.state = 'active'
          AND _meta.enabled = true
        """)
      
      # Option 2:
      val index = FlintSpark.describeIndex(FlintSkippingIndex.onTable(tableName))
      if not index or not index.enabled:
        return plan
     
      if not index.filterPred cover filter: <-- how to check? how to add to explain
        return plan
        
      # Index can be applied
      filter.copy(child = HadoopFsRelation(DataSkippingFileIndex(index, pred))


# Use file list in skipping index and ignore base table
# Suppose FOR COLUMNS (X MIN_MAX)
# WHERE X = 30 --> WHERE X_MIN <= 30 AND 30 <= X_MAX
class DataSkippingFileIndex(index):
  
  def listFiles(pred):
    val rewrittenPred = index.rewritePredicate(pred)
    
    spark.read.format("flint")
      .filter(rewrittenPred)
      .select("file_path")

4.5 Index Provider SPI

# Provider SPI for single-column skipping index
trait FlintSkippingIndexProvider {
 
    // SQL datatype
    def getType(): DataType
    
    // Aggregator for index building
    def getAggregator(): Seq[AggregateFunction]
    
    // Predicate rewrite for query rewrite
    def rewritePredicate(): Option[Expression]
}


# --- ValueList ---

class ValueListSketch extends FlintSkippingIndexProvider {
    def getType(): DataType = {
    }
    
    def getAggregator(): Seq[AggregateFunction] = {
    }
    
    def convertPredicate(): Option[Expression] = {
    }
}

class MinMaxSketch extends FlintSkippingIndexProvider {
    ...
}

class BloomFilterSketch extends FlintSkippingIndexProvider {
    ...
}

from opensearch-spark.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.