flock-lab / flock Goto Github PK
View Code? Open in Web Editor NEWFlock: A Low-Cost Streaming Query Engine on FaaS Platforms
Home Page: https://flock-lab.github.io/flock/
License: GNU Affero General Public License v3.0
Flock: A Low-Cost Streaming Query Engine on FaaS Platforms
Home Page: https://flock-lab.github.io/flock/
License: GNU Affero General Public License v3.0
It seems that for serverless stream processing, it's not a good solution to trigger the system for each new record. we’d better use Spark-like micro-batch processing instead of record-by-record processing. Only in this way can the system save money.
Since the physical plan is a nested structure, we need to recursively find all input nodes, and correctly map the data obtained from the parent lambda to each of them.
Automate account creation, and resource provisioning using AWS Service Catalog, AWS Organizations, and AWS Lambda
After lambda functions with concurrency > 1 are dynamically executed, they are aggregated into the same subsequent lambda function through consistent hashing of the same seed in their payloads.
In the process of distributed computing, lambda functions with concurrency > 1
will send payloads to subsequent lambda function with concurrency = 1
, in order to ensure that it can receive all payloads (not received by other functions of the same group with different names), We need to use the consistent hashing to deterministically select a subsequent function name from the same group. Consistent hashing with seed can address all potential problems.
Data Source
We can start with this PR in the future.
Finished test [unoptimized + debuginfo] target(s) in 0.38s
Running /home/gangliao/ServerlessCQ/target/debug/deps/aws-a69112207b9ac015
running 2 tests
test tests::generic_lambda ... ignored
Object({"Records": Array([Object({"awsRegion": Null, "eventID": String("3hnr7qHLGDq06"), "eventName": String("IlyOM8ax4B"), "eventSource": String("FMNn0INVZ5fvSKBp"), "eventSourceARN": Null, "eventVersion": Null, "invokeIdentityArn": Null, "kinesis": Object({"approximateArrivalTimestamp": Number(9175183041.66), "data": String("eyJjMSI6NzYsImMyIjo3LCJjMyI6Ikp6VHY1YmkzWFNOVCJ9"), "encryptionType": String("lw6Wno4dWTs"), "partitionKey": Null, "sequenceNumber": Null, "kinesisSchemaVersion": String("yl41SRAm")})}), Object({"awsRegion": Null, "eventID": String("aNx4RRFsXZOI6ZzOG"), "eventName": String("xbPpX"), "eventSource": Null, "eventSourceARN": Null, "eventVersion": Null, "invokeIdentityArn": String("4A94IjQCvzGERX3S2zp"), "kinesis": Object({"approximateArrivalTimestamp": Number(3633130223.529), "data": String("eyJjMSI6NzcsImMyIjo2LCJjMyI6InNETllzRkJ3cE5aIn0="), "encryptionType": String("snBCj3Woxv3xGkU"), "partitionKey": String("RHslkOPKM5uy8gEHu"), "sequenceNumber": Null, "kinesisSchemaVersion": Null})})])})
KinesisEvent { records: [KinesisEventRecord { aws_region: None, event_id: Some("3hnr7qHLGDq06"), event_name: Some("IlyOM8ax4B"), event_source: Some("FMNn0INVZ5fvSKBp"), event_source_arn: None, event_version: None, invoke_identity_arn: None, kinesis: KinesisRecord { approximate_arrival_timestamp: SecondTimestamp(2260-10-01T05:57:21.066Z), data: Base64Data([123, 34, 99, 49, 34, 58, 55, 54, 44, 34, 99, 50, 34, 58, 55, 44, 34, 99, 51, 34, 58, 34, 74, 122, 84, 118, 53, 98, 105, 51, 88, 83, 78, 84, 34, 125]), encryption_type: Some("lw6Wno4dWTs"), partition_key: None, sequence_number: None, kinesis_schema_version: Some("yl41SRAm") } }, KinesisEventRecord { aws_region: None, event_id: Some("aNx4RRFsXZOI6ZzOG"), event_name: Some("xbPpX"), event_source: None, event_source_arn: None, event_version: None, invoke_identity_arn: Some("4A94IjQCvzGERX3S2zp"), kinesis: KinesisRecord { approximate_arrival_timestamp: SecondTimestamp(2085-02-16T02:50:23.529Z), data: Base64Data([123, 34, 99, 49, 34, 58, 55, 55, 44, 34, 99, 50, 34, 58, 54, 44, 34, 99, 51, 34, 58, 34, 115, 68, 78, 89, 115, 70, 66, 119, 112, 78, 90, 34, 125]), encryption_type: Some("snBCj3Woxv3xGkU"), partition_key: Some("RHslkOPKM5uy8gEHu"), sequence_number: None, kinesis_schema_version: None } }] }
[123, 34, 99, 49, 34, 58, 55, 54, 44, 34, 99, 50, 34, 58, 55, 44, 34, 99, 51, 34, 58, 34, 74, 122, 84, 118, 53, 98, 105, 51, 88, 83, 78, 84, 34, 125, 123, 34, 99, 49, 34, 58, 55, 55, 44, 34, 99, 50, 34, 58, 54, 44, 34, 99, 51, 34, 58, 34, 115, 68, 78, 89, 115, 70, 66, 119, 112, 78, 90, 34, 125]
batch_size: 2
input.len: 71
utf: {"c1":76,"c2":7,"c3":"JzTv5bi3XSNT"}{"c1":77,"c2":6,"c3":"sDNYsFBwpNZ"}
thread 'tests::centralized_execution' panicked at 'called `Result::unwrap()` on an `Err` value: JsonError("Not valid JSON: trailing characters at line 1 column 37")', src/runtime/src/datasource/kinesis.rs:115:19
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
test tests::centralized_execution ... FAILED
failures:
failures:
tests::centralized_execution
test result: FAILED. 0 passed; 1 failed; 1 ignored; 0 measured; 0 filtered out; finished in 0.02s
/// Limit execution plan
#[derive(Debug, Serialize, Deserialize)]
pub struct GlobalLimitExec {
/// Input execution plan
input: Arc<dyn ExecutionPlan>,
/// Maximum number of rows to return
limit: usize,
/// Number of threads to run parallel LocalLimitExec on
concurrency: usize,
}
/// Sort execution plan
#[derive(Debug, Serialize, Deserialize)]
pub struct SortExec {
/// Input schema
input: Arc<dyn ExecutionPlan>,
/// Sort expressions
expr: Vec<PhysicalSortExpr>,
/// Number of threads to execute input partitions on before combining into a single partition
concurrency: usize,
}
we need to set concurrency value at runtime because the lambda instance is dynamic, changing depending on many factors.
Provisioned Concurrency is the end of cold starts, but at what costs for ServerlessCQ! [1]
The yellow and green taxi trip records include fields capturing pick-up and drop-off dates/times,
pick-up and drop-off locations, trip distances, itemized fares, rate types, payment types, and driver-reported passenger counts. The data is provided in CSV format.
US Accidents (3.5 million records) [Link]
This is a countrywide car accident dataset, which covers 49 states of the USA. The accident data are collected from February 2016 to June 2020, using two APIs that provide streaming traffic incident (or event) data. These APIs broadcast traffic data captured by a variety of entities, such as the US and state departments of transportation, law enforcement agencies, traffic cameras, and traffic sensors within the road-networks. Currently, there are about 3.5 million accident records in this dataset.
The customer is applying a continuous filter to only retain records of interest.
We need to modify some of the features of the open-source software called Arrow, such as serialization and deserialization of query's physical plan. We will regularly merge the latest official code into this forked version.
We will push all our changes to the branch scq
:
https://github.com/DSLAM-UMD/arrow/tree/scq
In ServerlessCQ, if caching miss due to Lambda rescheduling, we can use the existing S3 Select feature to accelerate not only simple database operators like select and project, but also complex operators like join, group-by, and top-K.
Furthermore, starting today [1][2], Amazon S3 delivers strong read-after-write consistency automatically, which makes many of Starling's (SIGMOD'20) designs obsolete.
We need to upgrade lambda crates, the current versions are incompatible to support the event source mapping for Kafka.
We should merge Arrow's official codebase at least once a month so that we can get the latest features.
Users can issue continuous queries via either high-level language or CLI.
CLI features:
Through the client's query engine, we can get the physical plan of the query statement. Before generating, compiling, and deploying lambda function code, we need to divide the entire plan into independent sub-plans, where each sub-plan corresponds to a lambda function.
The implementation logic of this part is:
input: physical plan tree
output: return topologically sorted sub plans
We have to find a way to serialize and deserialize some execution plans from datafusion
that contain Vec<Arc<dyn traits>>
.
/// Hash aggregate execution plan
#[derive(Debug)]
pub struct HashAggregateExec {
mode: AggregateMode,
group_expr: Vec<(Arc<dyn PhysicalExpr>, String)>,
aggr_expr: Vec<Arc<dyn AggregateExpr>>,
input: Arc<dyn ExecutionPlan>,
schema: SchemaRef,
}
/// Execution plan for a projection
#[derive(Debug)]
pub struct ProjectionExec {
/// The projection expressions stored as tuples of (expression, output column name)
expr: Vec<(Arc<dyn PhysicalExpr>, String)>,
/// The schema once the projection has been applied to the input
schema: SchemaRef,
/// The input plan
input: Arc<dyn ExecutionPlan>,
}
If the input looks like the following JSON content:
{"a":1, "b":2.0, "c":false, "d":"4"}
{"a":-10, "b":-3.5, "c":true, "d":"4"}
{"a":2, "b":0.6, "c":false, "d":"text"}
{"a":1, "b":2.0, "c":false, "d":"4"}
{"a":7, "b":-3.5, "c":true, "d":"4"}
{"a":1, "b":0.6, "c":false, "d":"text"}
{"a":1, "b":2.0, "c":false, "d":"4"}
{"a":5, "b":-3.5, "c":true, "d":"4"}
{"a":1, "b":0.6, "c":false, "d":"text"}
{"a":1, "b":2.0, "c":false, "d":"4"}
{"a":1, "b":-3.5, "c":true, "d":"4"}
{"a":100000000000000, "b":0.6, "c":false, "d":"text"}
use arrow::datatypes::{DataType, Field, Schema};
use arrow::json;
use std::io::BufReader;
use std::sync::Arc;
use arrow::json::reader::infer_json_schema;
let inferred_schema = infer_json_schema(&mut BufReader::new(some_records), None).unwrap();
// if the window size is 1024
let mut json_reader = json::Reader::new(BufReader::new(whole_batch_records), Arc::new(inferred_schema), 1024, projection);
let batch = json_reader.next().unwrap().unwrap(); // batch is RecordBatch!
If we use Arrow's API directly, it seems that we don't need to convert JSON data into RecordBatch by ourselves.
The investigation is finished, I plan to write a new proposal with a feasible technical solution.
AWS Step Functions look like a fantastic solution to cope with the data flow stream model.
Question:
crate:
Codegen Through handlebars, the lambda template is a more efficient way to generate lambda fuctions.
Compile code from code:
Continuous query using SQL is one requirement of real-time stream processing [1].
This project has great potential to make another contribution to simplifying the serverless programming model via directly recasting stream queries into data flow models where each operator or node in the graph is a cloud function service managed by AWS Step Functions [3]. In this way, we can support graphs cyclic dataflows and iterations on streams.
Materialize does all of this by recasting SQL92 queries as dataflows. We can go further and directly convert SQL as FaaS dataflow. It's quite easy to parse SQL and support custom dialects [2].
People gradually began to realize this direction although it has not yet gone deep.
https://aws.amazon.com/blogs/compute/using-aws-lambda-for-streaming-analytics/
use std::slice;
use std::mem;
#[repr(C, packed)]
#[derive(Debug, Copy, Clone)]
struct Header {
some: u8,
thing: u8,
}
fn main() {
let header = Header {some: 1, thing: 2};
let p: *const Header = &header; // the same operator is used as with references
let p: *const u8 = p as *const u8; // convert between pointer types
let s: &[u8] = unsafe {
slice::from_raw_parts(p, mem::size_of::<Header>())
};
println!("{:?}", s);
let (head, body, _tail) = unsafe { s.align_to::<Header>() };
assert!(head.is_empty(), "Data was not aligned");
let my_struct = &body[0];
println!("{:?}", my_struct);
}
Based on the observation from #203
an embedded key-value store to log queries and their cloud context into the disk.
In order to coalesce batches in the lambda function with concurrency = 1, shall we add a CoalesceBatchesExec
to the head of the sub-plan?
Arrow Pull Request: umd-dslam/arrow#1
/// Physical expression of a scalar function
#[derive(Serialize, Deserialize)]
pub struct ScalarFunctionExpr {
fun: ScalarFunctionImplementation,
name: String,
args: Vec<Arc<dyn PhysicalExpr>>,
return_type: DataType,
}
/// Scalar function
pub type ScalarFunctionImplementation =
Arc<dyn Fn(&[ArrayRef]) -> Result<ArrayRef> + Send + Sync>;
error[E0277]: the trait bound `dyn for<'r> Fn(&'r [Arc<(dyn arrow::array::Array + 'static)>]) -> std::result::Result<Arc<(dyn arrow::array::Array + 'static)>, DataFusionError> + Sync + std::marker::Send: physical_plan::_::_serde::Serialize` is not satisfied
--> datafusion/src/physical_plan/functions.rs:391:5
|
391 | fun: ScalarFunctionImplementation,
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ the trait `physical_plan::_::_serde::Serialize` is not implemented for `dyn for<'r> Fn(&'r [Arc<(dyn arrow::array::Array + 'static)>]) -> std::result::Result<Arc<(dyn arrow::array::Array + 'static)>, DataFusionError> + Sync + std::marker::Send`
|
= note: required because of the requirements on the impl of `physical_plan::_::_serde::Serialize` for `Arc<dyn for<'r> Fn(&'r [Arc<(dyn arrow::array::Array + 'static)>]) -> std::result::Result<Arc<(dyn arrow::array::Array + 'static)>, DataFusionError> + Sync + std::marker::Send>`
Tumbling Window
Stagger Window
Sliding Window
Session Window
Support window function in SQL
Through the binary size optimization (https://github.com/johnthagen/min-sized-rust#), we are able to minimize the current binary size from 60MB to 665KB (with dependency libraries)!
-rw-r--r--. 1 gangliao gangliao 665K Oct 20 19:14 rust.zip
It's very easy to trigger its event on AWS.
In order to make performance comparisons more convenient, we MUST support Kafka.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.