Code Monkey home page Code Monkey logo

ds2's Introduction

DS2: fast, accurate, automatic scaling decisions for distributed streaming dataflows.

DS2 is a low-latency controller for dynamic scaling of streaming analytics applications. It can accurately estimate parallelism for all dataflow operators within a single scaling decision, and operates reactively online. DS2 bases scaling decisions on real-time performance traces which it collects through lightweight system-level instrumentation.

This repository contains the following DS2 components:

  • The Scaling policy implements the scaling model and estimates operator parallelism using metrics collected by the reference system instrumentation.
  • The Scaling manager periodically invokes the policy when metrics are available and sends scaling commands to the reference stream processor.
  • The Apache Flink 1.4.1 instrumentation patch contains the necessary instrumentation for Flink to be integrated with DS2.

DS2 can be integrated with any dataflow stream processor as long as it can provide the instrumentation and metrics that its scaling policy requires. For more details on required metrics and integration, please see the OSDI'18 paper.

Building DS2

These instructions assume a Unix-like system.

  1. DS2 is mainly developed in Rust. You can install Rust by following these instructions.

  2. Compile the code (dependencies will be fetched automatically):

    $ cd controller/
    $ cargo build --release --all

Executing DS2

There are two ways to execute DS2: online and offline. To run DS2 online, follow the steps below:

  1. Set up the DS2 configuration parameters in the ds2.toml file

  2. Go to controller/ and start the scaling manager:

$ cargo run --release --bin manager

On success, the scaling manager starts monitoring the specified metrics repository and performs the following actions:

  • Updates its state every time a new rate file is created by the instrumented system. The scaling manager expects exactly one rate file per operator instance and time window (epoch) following the naming convention "some_name-epoch_number.file_extension". Example rate files generated with this patch for Flink can be found here.
  • Invokes the scaling policy periodically according to the particular configuration.
  • Re-configures the streaming system automatically. To start and re-configure the running system, you must write two respective bash scripts, such as these scripts for Flink.

DS2 scaling policy can also be invoked offline on a collection of metrics generated during the execution of a dataflow. To do so, you need:

  • A file containing the configuration of the executed dataflow.
  • A log file with the collected operator rates for one or more policy intervals (epochs).

As an example, go to controller/ and run:

$ cargo run --release --bin policy -- --topo examples/offline/flink_wordcount_topology.csv --rates examples/offline/flink_rates.log --system flink

This command evaluates the scaling policy on the --topo topology for each epoch included in the --rates file assuming Flink as the streaming system.

For more information about offline execution parameters, try --help as follows:

$ cargo run --release --bin policy -- --help

Example input files for both online and offline DS2 execution are provided here, along with details on their format.

Documentation

The complete DS2 documentation can be found here.

License

DS2 is primarily distributed under the terms of both the MIT license and the Apache License (Version 2.0). See LICENSE-APACHE, and LICENSE-MIT for details.

ds2's People

Contributors

antiguru avatar dependabot[bot] avatar jliagouris avatar mattforshaw avatar vasia avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

ds2's Issues

Hi, I have some problem with manager.rs.

Hi, thanks for sharing the code
Ok, we first have a watcher to notify files' change.(But i am a beginner for rust)
as the condition to enter loop : Ok(DebouncedEvent::Create(p))
why my file has really changed( firstly rm ,and then create)
but this condition is not be satisfied, does anyone have seen this problem before?

My rustc version is 1.57.0

Any help is greatly appreciated.

Compilation Warning/Errors

Im trying to compile and run DS2, encountering some issues

system: Ubuntu 20.04
rustc --version: rustc 1.64.0 (a55dd71d5 2022-09-19)

Some of the warnings look ignorable, but the errors look like they are due to upstream version syntax changes causing issues.

$ cargo run --release --bin manager output:

warning: value assigned to `true_processing_rate` is never read
   --> src/policy/scaling.rs:320:17
    |
320 |         let mut true_processing_rate = Rate::default();
    |                 ^^^^^^^^^^^^^^^^^^^^
    |
    = note: `#[warn(unused_assignments)]` on by default
    = help: maybe it is overwritten before being read?

warning: value assigned to `estimated_true_output_rate` is never read
   --> src/policy/scaling.rs:328:17
    |
328 |         let mut estimated_true_output_rate = Rate::default();
    |                 ^^^^^^^^^^^^^^^^^^^^^^^^^^
    |
    = help: maybe it is overwritten before being read?

warning: value assigned to `observed_output_rate` is never read
   --> src/policy/scaling.rs:329:17
    |
329 |         let mut observed_output_rate = Rate::default();    // Used only in estimating source rate ratios
    |                 ^^^^^^^^^^^^^^^^^^^^
    |
    = help: maybe it is overwritten before being read?

warning: unused variable: `threshold`
   --> src/policy/scaling.rs:293:62
    |
293 | pub fn evaluate_scaling_policy_at_epoch(topo: &mut Topology, threshold: f64, factor: f64, epoch: Epoch, verbose: bool) -> String
    |                                                              ^^^^^^^^^ help: if this is intentional, prefix it with an underscore: `_threshold`
    |
    = note: `#[warn(unused_variables)]` on by default

warning: variable does not need to be mutable
   --> src/dataflow/parse.rs:161:16
    |
161 |         for (_,mut logs) in instance_logs.iter_mut()
    |                ----^^^^
    |                |
    |                help: remove this `mut`
    |
    = note: `#[warn(unused_mut)]` on by default

warning: variable does not need to be mutable
   --> src/dataflow/parse.rs:209:24
    |
209 |                 for (_,mut logs) in instance_logs.iter_mut()
    |                        ----^^^^
    |                        |
    |                        help: remove this `mut`

warning: variable does not need to be mutable
   --> src/dataflow/parse.rs:505:16
    |
505 |         for (_,mut logs) in instance_logs.iter_mut()
    |                ----^^^^
    |                |
    |                help: remove this `mut`

warning: variable does not need to be mutable
   --> src/policy/scaling.rs:345:21
    |
345 |         if let Some(mut input_rate) = true_output_rate.remove(&idx)
    |                     ----^^^^^^^^^^
    |                     |
    |                     help: remove this `mut`

warning: `ds2` (lib) generated 8 warnings
   Compiling ds2 v0.1.0 (/home/pbangert/Code/msc_thesis/ds2/controller)
warning: use of deprecated macro `try`: use the `?` operator instead
   --> src/bin/manager.rs:163:5
    |
163 |     try!(watcher.watch(metrics_repo_path.as_path(), RecursiveMode::Recursive));
    |     ^^^
    |
    = note: `#[warn(deprecated)]` on by default

warning: use of deprecated macro `try`: use the `?` operator instead
   --> src/bin/manager.rs:161:43
    |
161 |     let mut watcher: RecommendedWatcher = try!(Watcher::new(tx.clone(), Duration::from_secs(policy_interval as u64)));
    |                                           ^^^

error[E0624]: associated function `new` is private
  --> src/bin/manager.rs:63:28
   |
63 |     let mut conf = Config::new();
   |                            ^^^ private associated function
   |
  ::: /home/pbangert/.cargo/registry/src/github.com-1ecc6299db9ec823/config-0.13.2/src/config.rs:39:5
   |
39 |     pub(crate) fn new(value: Value) -> Self {
   |     --------------------------------------- private associated function defined here

error[E0061]: this function takes 1 argument but 0 arguments were supplied
  --> src/bin/manager.rs:63:20
   |
63 |     let mut conf = Config::new();
   |                    ^^^^^^^^^^^-- an argument of type `Value` is missing
   |
note: associated function defined here
  --> /home/pbangert/.cargo/registry/src/github.com-1ecc6299db9ec823/config-0.13.2/src/config.rs:39:19
   |
39 |     pub(crate) fn new(value: Value) -> Self {
   |                   ^^^
help: provide the argument
   |
63 |     let mut conf = Config::new(/* Value */);
   |                    ~~~~~~~~~~~~~~~~~~~~~~~~

warning: use of deprecated associated function `config::Config::merge`: please use 'ConfigBuilder' instead
  --> src/bin/manager.rs:64:16
   |
64 |     match conf.merge(File::with_name("config/ds2.toml")) {
   |                ^^^^^

Some errors have detailed explanations: E0061, E0624.
For more information about an error, try `rustc --explain E0061`.
warning: `ds2` (bin "manager") generated 3 warnings
error: could not compile `ds2` due to 2 previous errors; 3 warnings emitted


Question about file extension

Hi,

thank you for releasing the code! I'm having trouble running some of the experiments. In particular, the online manager is expecting to listening from a metric repository folder containing files with certain extensions. However, the patch seems to be only writing to a file without any extensions.

Is this an issue of my configuration? Right now, I'm able to produce the following files in $HOME/rates folder:

'Operator 1 -> Operator 2-2-5'  'Sink: Sink-1-24'  'Source: Source-1-2'
'Sink: Sink-1-0'                'Sink: Sink-1-25'  'Source: Source-1-3'
...

And they are formatted correctly in csv form

Sink: Sink,1,1,1596485970170338,26923.039268476547,0.0,2.132279033470391,0.0

However, even when I put the following to the toml file

metrics_repo = "/home/ubuntu/rates"
file_extension = ""

The manager seems to hang forever after kicking off the job. It will print out

Submitted Flink job with id: 2aee4606a4cc421ed35757a0707fd398

and never print out anything again.

Implementation of MetricsManager for Flink

Hello, I'm trying to build ds2 and adapting it to my scenario. Everything goes well when following the steps in readme.
I found examples of rates for each operator instance and epoch. I would like to know how the true_processing_rate,true_output_rate,observed_processing_rate and observed_output_rate are measured. In my previous works, the metric system of flink was used to get metrics information, but it seems too naive now.
Please give some hints for the implementation of MetricsManager. Should I insert stubs to flink's source code?
thanks!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.