Code Monkey home page Code Monkey logo

flock's People

Contributors

dependabot[bot] avatar gangliao avatar sunisdown avatar zejunliu0 avatar zhenghanghu avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

flock's Issues

Feed data to the first node of the subplan

Since the physical plan is a nested structure, we need to recursively find all input nodes, and correctly map the data obtained from the parent lambda to each of them.

Add seed to the payload of lambda functions

After lambda functions with concurrency > 1 are dynamically executed, they are aggregated into the same subsequent lambda function through consistent hashing of the same seed in their payloads.

In the process of distributed computing, lambda functions with concurrency > 1 will send payloads to subsequent lambda function with concurrency = 1, in order to ensure that it can receive all payloads (not received by other functions of the same group with different names), We need to use the consistent hashing to deterministically select a subsequent function name from the same group. Consistent hashing with seed can address all potential problems.

RecordBatch Error

    Finished test [unoptimized + debuginfo] target(s) in 0.38s
     Running /home/gangliao/ServerlessCQ/target/debug/deps/aws-a69112207b9ac015

running 2 tests
test tests::generic_lambda ... ignored
Object({"Records": Array([Object({"awsRegion": Null, "eventID": String("3hnr7qHLGDq06"), "eventName": String("IlyOM8ax4B"), "eventSource": String("FMNn0INVZ5fvSKBp"), "eventSourceARN": Null, "eventVersion": Null, "invokeIdentityArn": Null, "kinesis": Object({"approximateArrivalTimestamp": Number(9175183041.66), "data": String("eyJjMSI6NzYsImMyIjo3LCJjMyI6Ikp6VHY1YmkzWFNOVCJ9"), "encryptionType": String("lw6Wno4dWTs"), "partitionKey": Null, "sequenceNumber": Null, "kinesisSchemaVersion": String("yl41SRAm")})}), Object({"awsRegion": Null, "eventID": String("aNx4RRFsXZOI6ZzOG"), "eventName": String("xbPpX"), "eventSource": Null, "eventSourceARN": Null, "eventVersion": Null, "invokeIdentityArn": String("4A94IjQCvzGERX3S2zp"), "kinesis": Object({"approximateArrivalTimestamp": Number(3633130223.529), "data": String("eyJjMSI6NzcsImMyIjo2LCJjMyI6InNETllzRkJ3cE5aIn0="), "encryptionType": String("snBCj3Woxv3xGkU"), "partitionKey": String("RHslkOPKM5uy8gEHu"), "sequenceNumber": Null, "kinesisSchemaVersion": Null})})])})
KinesisEvent { records: [KinesisEventRecord { aws_region: None, event_id: Some("3hnr7qHLGDq06"), event_name: Some("IlyOM8ax4B"), event_source: Some("FMNn0INVZ5fvSKBp"), event_source_arn: None, event_version: None, invoke_identity_arn: None, kinesis: KinesisRecord { approximate_arrival_timestamp: SecondTimestamp(2260-10-01T05:57:21.066Z), data: Base64Data([123, 34, 99, 49, 34, 58, 55, 54, 44, 34, 99, 50, 34, 58, 55, 44, 34, 99, 51, 34, 58, 34, 74, 122, 84, 118, 53, 98, 105, 51, 88, 83, 78, 84, 34, 125]), encryption_type: Some("lw6Wno4dWTs"), partition_key: None, sequence_number: None, kinesis_schema_version: Some("yl41SRAm") } }, KinesisEventRecord { aws_region: None, event_id: Some("aNx4RRFsXZOI6ZzOG"), event_name: Some("xbPpX"), event_source: None, event_source_arn: None, event_version: None, invoke_identity_arn: Some("4A94IjQCvzGERX3S2zp"), kinesis: KinesisRecord { approximate_arrival_timestamp: SecondTimestamp(2085-02-16T02:50:23.529Z), data: Base64Data([123, 34, 99, 49, 34, 58, 55, 55, 44, 34, 99, 50, 34, 58, 54, 44, 34, 99, 51, 34, 58, 34, 115, 68, 78, 89, 115, 70, 66, 119, 112, 78, 90, 34, 125]), encryption_type: Some("snBCj3Woxv3xGkU"), partition_key: Some("RHslkOPKM5uy8gEHu"), sequence_number: None, kinesis_schema_version: None } }] }
[123, 34, 99, 49, 34, 58, 55, 54, 44, 34, 99, 50, 34, 58, 55, 44, 34, 99, 51, 34, 58, 34, 74, 122, 84, 118, 53, 98, 105, 51, 88, 83, 78, 84, 34, 125, 123, 34, 99, 49, 34, 58, 55, 55, 44, 34, 99, 50, 34, 58, 54, 44, 34, 99, 51, 34, 58, 34, 115, 68, 78, 89, 115, 70, 66, 119, 112, 78, 90, 34, 125]
batch_size: 2
input.len: 71
utf: {"c1":76,"c2":7,"c3":"JzTv5bi3XSNT"}{"c1":77,"c2":6,"c3":"sDNYsFBwpNZ"}
thread 'tests::centralized_execution' panicked at 'called `Result::unwrap()` on an `Err` value: JsonError("Not valid JSON: trailing characters at line 1 column 37")', src/runtime/src/datasource/kinesis.rs:115:19
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
test tests::centralized_execution ... FAILED

failures:

failures:
    tests::centralized_execution

test result: FAILED. 0 passed; 1 failed; 1 ignored; 0 measured; 0 filtered out; finished in 0.02s

Customize max_concurrency for SortExec and GlobalLimitExec at cloud runtime

/// Limit execution plan
#[derive(Debug, Serialize, Deserialize)]
pub struct GlobalLimitExec {
    /// Input execution plan
    input: Arc<dyn ExecutionPlan>,
    /// Maximum number of rows to return
    limit: usize,
    /// Number of threads to run parallel LocalLimitExec on
    concurrency: usize,
}

/// Sort execution plan
#[derive(Debug, Serialize, Deserialize)]
pub struct SortExec {
    /// Input schema
    input: Arc<dyn ExecutionPlan>,
    /// Sort expressions
    expr: Vec<PhysicalSortExpr>,
    /// Number of threads to execute input partitions on before combining into a single partition
    concurrency: usize,
}

we need to set concurrency value at runtime because the lambda instance is dynamic, changing depending on many factors.

Data Source

  1. New York City Taxi & Limousine Commission Trip Record Data Link

The yellow and green taxi trip records include fields capturing pick-up and drop-off dates/times,
pick-up and drop-off locations, trip distances, itemized fares, rate types, payment types, and driver-reported passenger counts. The data is provided in CSV format.

US Accidents (3.5 million records) [Link]

This is a countrywide car accident dataset, which covers 49 states of the USA. The accident data are collected from February 2016 to June 2020, using two APIs that provide streaming traffic incident (or event) data. These APIs broadcast traffic data captured by a variety of entities, such as the US and state departments of transportation, law enforcement agencies, traffic cameras, and traffic sensors within the road-networks. Currently, there are about 3.5 million accident records in this dataset.

The customer is applying a continuous filter to only retain records of interest.

Forked Arrow

We need to modify some of the features of the open-source software called Arrow, such as serialization and deserialization of query's physical plan. We will regularly merge the latest official code into this forked version.

We will push all our changes to the branch scq:
https://github.com/DSLAM-UMD/arrow/tree/scq

RFC: S3 - Near Cloud Storage Computing

In ServerlessCQ, if caching miss due to Lambda rescheduling, we can use the existing S3 Select feature to accelerate not only simple database operators like select and project, but also complex operators like join, group-by, and top-K.

Furthermore, starting today [1][2], Amazon S3 delivers strong read-after-write consistency automatically, which makes many of Starling's (SIGMOD'20) designs obsolete.

  1. https://aws.amazon.com/s3/consistency/
  2. https://news.ycombinator.com/item?id=25271791

ServerlessCQ CLI

Users can issue continuous queries via either high-level language or CLI.

CLI features:

  • Parse stream SQL queries, and generate multiple optimized physical plans through AWS state languages.
  • Deploy AWS step functions automatically.
  • Collect metrics from CloudWatch on Lambda functions (for adaptive optimization)

https://github.com/clap-rs/clap

Physical Plan Partition

Through the client's query engine, we can get the physical plan of the query statement. Before generating, compiling, and deploying lambda function code, we need to divide the entire plan into independent sub-plans, where each sub-plan corresponds to a lambda function.

The implementation logic of this part is:

input: physical plan tree

output: return topologically sorted sub plans

serde trait vectors

We have to find a way to serialize and deserialize some execution plans from datafusion that contain Vec<Arc<dyn traits>>.

/// Hash aggregate execution plan
#[derive(Debug)]
pub struct HashAggregateExec {
    mode: AggregateMode,
    group_expr: Vec<(Arc<dyn PhysicalExpr>, String)>,
    aggr_expr: Vec<Arc<dyn AggregateExpr>>,
    input: Arc<dyn ExecutionPlan>,
    schema: SchemaRef,
}

/// Execution plan for a projection
#[derive(Debug)]
pub struct ProjectionExec {
    /// The projection expressions stored as tuples of (expression, output column name)
    expr: Vec<(Arc<dyn PhysicalExpr>, String)>,
    /// The schema once the projection has been applied to the input
    schema: SchemaRef,
    /// The input plan
    input: Arc<dyn ExecutionPlan>,
}

How to get BatchRecord from data source

If the input looks like the following JSON content:

{"a":1, "b":2.0, "c":false, "d":"4"}
{"a":-10, "b":-3.5, "c":true, "d":"4"}
{"a":2, "b":0.6, "c":false, "d":"text"}
{"a":1, "b":2.0, "c":false, "d":"4"}
{"a":7, "b":-3.5, "c":true, "d":"4"}
{"a":1, "b":0.6, "c":false, "d":"text"}
{"a":1, "b":2.0, "c":false, "d":"4"}
{"a":5, "b":-3.5, "c":true, "d":"4"}
{"a":1, "b":0.6, "c":false, "d":"text"}
{"a":1, "b":2.0, "c":false, "d":"4"}
{"a":1, "b":-3.5, "c":true, "d":"4"}
{"a":100000000000000, "b":0.6, "c":false, "d":"text"}
use arrow::datatypes::{DataType, Field, Schema};
use arrow::json;
use std::io::BufReader;
use std::sync::Arc;
use arrow::json::reader::infer_json_schema;

let inferred_schema = infer_json_schema(&mut BufReader::new(some_records), None).unwrap();
// if the window size is 1024
let mut json_reader = json::Reader::new(BufReader::new(whole_batch_records), Arc::new(inferred_schema), 1024, projection);  
let batch = json_reader.next().unwrap().unwrap(); //  batch is RecordBatch!

If we use Arrow's API directly, it seems that we don't need to convert JSON data into RecordBatch by ourselves.

RFC: New Proposal

The investigation is finished, I plan to write a new proposal with a feasible technical solution.

Give a try: AWS Step Functions

AWS Step Functions look like a fantastic solution to cope with the data flow stream model.

  • Passing derived streams between state machines in AWS Step Functions

Question:

  • What types of parameters does it support?
  • Its size limit?
  • How does it enable functions to transmit information?

RFC: SQL on cloud function services

Continuous query using SQL is one requirement of real-time stream processing [1].

This project has great potential to make another contribution to simplifying the serverless programming model via directly recasting stream queries into data flow models where each operator or node in the graph is a cloud function service managed by AWS Step Functions [3]. In this way, we can support graphs cyclic dataflows and iterations on streams.

Materialize does all of this by recasting SQL92 queries as dataflows. We can go further and directly convert SQL as FaaS dataflow. It's quite easy to parse SQL and support custom dialects [2].

References

  1. The 8 Requirements of Real-Time Stream Processing. Michael Stonebraker, Uğur Çetintemel and Stan Zdonik
  2. https://github.com/ballista-compute/sqlparser-rs
  3. https://docs.aws.amazon.com/step-functions/latest/dg/welcome.html

Transmuting u8 buffer to/from struct

use std::slice;
use std::mem;

#[repr(C, packed)]
#[derive(Debug, Copy, Clone)]
struct Header {
    some: u8,
    thing: u8,
}


fn main() {
    let header = Header {some: 1, thing: 2};
    let p: *const Header = &header;     // the same operator is used as with references
    let p: *const u8 = p as *const u8;  // convert between pointer types
    let s: &[u8] = unsafe { 
        slice::from_raw_parts(p, mem::size_of::<Header>())
    };
    
    println!("{:?}", s);
    
    let (head, body, _tail) = unsafe { s.align_to::<Header>() };
    assert!(head.is_empty(), "Data was not aligned");
    let my_struct = &body[0];

    println!("{:?}", my_struct);
}

To do list

  • parse time schedule and async trigger the repeated continuous query. @gangliao
  • fetch window data (avoid duplicate reads) from data sources like Kinesis and Kafka in the 1st lambda function.
  • read offline data (Parquet and CSV) from S3.
  • window data partition for distributed data flow processing --- identify window number, sequence number, and total number, and so forth.
  • record batch partition for multithreading processing inside the lambda function.
  • understand Flink, Spark, and Presto (their related cloud services) for experimental evaluation. @ZejunLiu0
  • collect useful open data set for both stream processing and batch processing
  • how to handle lambda instance timeout (~15 min)
  • compress/decompress and serialize/deserialize lambda context to cloud environment (4KB limit size) @gangliao
  • Feed record batch to the leaf node of the subplan @gangliao

CoalesceBatchesExec

In order to coalesce batches in the lambda function with concurrency = 1, shall we add a CoalesceBatchesExec to the head of the sub-plan?

Example: how to manually serialize and deserialize Fn in struct

Arrow Pull Request: umd-dslam/arrow#1

/// Physical expression of a scalar function
#[derive(Serialize, Deserialize)]
pub struct ScalarFunctionExpr {
    fun: ScalarFunctionImplementation,
    name: String,
    args: Vec<Arc<dyn PhysicalExpr>>,
    return_type: DataType,
}

/// Scalar function
pub type ScalarFunctionImplementation =
    Arc<dyn Fn(&[ArrayRef]) -> Result<ArrayRef> + Send + Sync>;
error[E0277]: the trait bound `dyn for<'r> Fn(&'r [Arc<(dyn arrow::array::Array + 'static)>]) -> std::result::Result<Arc<(dyn arrow::array::Array + 'static)>, DataFusionError> + Sync + std::marker::Send: physical_plan::_::_serde::Serialize` is not satisfied
   --> datafusion/src/physical_plan/functions.rs:391:5
    |
391 |     fun: ScalarFunctionImplementation,
    |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ the trait `physical_plan::_::_serde::Serialize` is not implemented for `dyn for<'r> Fn(&'r [Arc<(dyn arrow::array::Array + 'static)>]) -> std::result::Result<Arc<(dyn arrow::array::Array + 'static)>, DataFusionError> + Sync + std::marker::Send`
    |
    = note: required because of the requirements on the impl of `physical_plan::_::_serde::Serialize` for `Arc<dyn for<'r> Fn(&'r [Arc<(dyn arrow::array::Array + 'static)>]) -> std::result::Result<Arc<(dyn arrow::array::Array + 'static)>, DataFusionError> + Sync + std::marker::Send>`

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.