Code Monkey home page Code Monkey logo

stratosphere's Introduction

Repository Has Moved

Stratosphere is now an Apache incubator project and has been renamed to Apache Flink.

This repository will not be maintained anymore.

Please move to the following GitHub repository:

git clone https://github.com/apache/incubator-flink.git

Thanks!


If you have an existing clone of the old Stratosphere repository, you can update your remote to point to the new repository:

git remote set-url origin https://github.com/apache/incubator-flink.git

stratosphere's People

Contributors

aalexandrov avatar aaronchlam avatar alexff91 avatar aljoscha avatar andrehacker avatar asteriosk avatar asubmissions avatar carabolic avatar dersascha avatar dimalabs avatar enijkamp avatar faisalmoeen avatar fhueske avatar filiphaase avatar jcamachor avatar ktzoumas avatar markus-h avatar mathiaspet avatar mleich avatar moewex avatar qmlmoon avatar rmetzger avatar rwaury avatar skunert avatar supermegaciaccount avatar tillrohrmann avatar tommy-neubert avatar twalthr avatar vasia avatar warneke avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

stratosphere's Issues

Implement CoGrouping with Solution Set in Workset Iterations

Currently, only Matching with the Solution set is enabled. Implement a CoGroup with the Solution Set.

NOTE: Unlike the regular CoGroup, which is much like a full outer join and gives empty iterators for a side that does not have a candidate for the respective key, the CoGroup with the solution set is naturally only outer on the non-solution set side.

Class loading for user-defined aggregates failes

Class loading of user-defined data types must always happen with the user-code class loader. The instantiation of the user-defined aggregate types happens in the read() method of the iteration event that transmits them.

Since the event has no access to that particular class loader, it can only receive the serialized data. The actual instantiation of these types must happen lazily when they are accessed by the iteration head or sync tasks, which have the required class loader.

ozone build fails for non-yarn hadoop (pact-hbase fails)

Compiling ozone for a non-yarn version is no longer possible. I tried with the latest stable version 1.2.1.

mvn -P hadoop_v1 -D hadoop.version=1.2.1 -DskipTests clean compile

The reason is that pact-hbase uses the constructor TaskAttemptIDsPattern(String jtIdentifier, Integer jobId, TaskType type, Integer taskId, Integer attemptId) which is available in yarn only.
Yarn: http://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapred/TaskAttemptID.html
1.2.1 http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapreduce/TaskAttemptID.html

I was about to move to yarn anyway, but everyone who tries to build a ozone running on the 4-node cluster (with old hadoop) will run into this problem.

Here the complete log

[INFO] -------------------------------------------------------------
[ERROR] COMPILATION ERROR : 
[INFO] -------------------------------------------------------------
[ERROR] /home/andre/dev/ozone-repo/pact/pact-hbase/src/main/java/eu/stratosphere/pact/common/io/GenericTableOutputFormat.java:[69,49] cannot find symbol
symbol  : constructor TaskAttemptID(java.lang.String,int,org.apache.hadoop.mapreduce.TaskType,int,int)
location: class org.apache.hadoop.mapreduce.TaskAttemptID
[ERROR] /home/andre/dev/ozone-repo/pact/pact-hbase/src/main/java/eu/stratosphere/pact/common/io/GenericTableOutputFormat.java:[71,68] package org.apache.hadoop.mapreduce.task does not exist
[INFO] 2 errors 
[INFO] -------------------------------------------------------------
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary:
[INFO] 
[INFO] ozone ............................................. SUCCESS [0.289s]
[INFO] nephele ........................................... SUCCESS [0.013s]
[INFO] nephele-common .................................... SUCCESS [6.103s]
[INFO] nephele-management ................................ SUCCESS [0.496s]
[INFO] nephele-server .................................... SUCCESS [2.112s]
[INFO] nephele-profiling ................................. SUCCESS [0.267s]
[INFO] nephele-queuescheduler ............................ SUCCESS [0.114s]
[INFO] nephele-clustermanager ............................ SUCCESS [0.198s]
[INFO] nephele-hdfs ...................................... SUCCESS [0.502s]
[INFO] nephele-s3 ........................................ SUCCESS [0.655s]
[INFO] nephele-visualization ............................. SUCCESS [0.582s]
[INFO] nephele-examples .................................. SUCCESS [0.202s]
[INFO] pact .............................................. SUCCESS [0.006s]
[INFO] pact-common ....................................... SUCCESS [1.082s]
[INFO] pact-array-datamodel .............................. SUCCESS [0.199s]
[INFO] pact-runtime ...................................... SUCCESS [1.636s]
[INFO] pact-compiler ..................................... SUCCESS [1.171s]
[INFO] pact-hbase ........................................ FAILURE [0.374s]
[INFO] pact-examples ..................................... SKIPPED
[INFO] pact-compiler-tests ............................... SKIPPED
[INFO] pact-clients ...................................... SKIPPED
[INFO] pact-tests ........................................ SKIPPED
[INFO] stratosphere-dist ................................. SKIPPED
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 16.642s
[INFO] Finished at: Fri Aug 30 16:06:37 CEST 2013
[INFO] Final Memory: 28M/260M
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project pact-hbase: Compilation failure: Compilation failure:
[ERROR] /home/andre/dev/ozone-repo/pact/pact-hbase/src/main/java/eu/stratosphere/pact/common/io/GenericTableOutputFormat.java:[69,49] cannot find symbol
[ERROR] symbol  : constructor TaskAttemptID(java.lang.String,int,org.apache.hadoop.mapreduce.TaskType,int,int)
[ERROR] location: class org.apache.hadoop.mapreduce.TaskAttemptID
[ERROR] /home/andre/dev/ozone-repo/pact/pact-hbase/src/main/java/eu/stratosphere/pact/common/io/GenericTableOutputFormat.java:[71,68] package org.apache.hadoop.mapreduce.task does not exist
[ERROR] -> [Help 1]
[ERROR] 


Cloud connector

We need a cloud connector to ozone.

Why? This is needed already by one partner (Deutsche Telekom), and I expect that more companies organize their clusters as private clouds.

We should create the minimum possible API glue without fancy stuff for now and leave the smart stuff to the outside. We can probably reuse and refactor the ec2-cloudmanager from Stratosphere v0.2.

Ideally, we should cover Amazon EC2 and OpenStack (DT controls their cluster with Vagrant) with one stone

Rework KMeans Sample Data Generator

The KMeans Sample Data Generator produces rather nonsensual data. It samples by default 100 Normal Distributions with a mean between 0 and 1 and a variance of one. This results in data which is not nicely clustered.

Rewrite the generator to use a wider (or confugurable) range in which the distribution means are chosen, and a variance that is relative (and quite a bit smaller) than that range to ensure nicely clustered data.

pact-compiler depends on pact-examples

In a layered architecture, you must only depend on layers below. Running into problems when modifying the examples code because of this wrong dependency.

Implement Web Frontend Display of Workset Iterations

Implement the drawing of Workset Iteration Optimizer plans in the Pact Webfrontend.

Missing is the JavaScript code that takes the JSON objects sent by the server and creates the DOM representing the visual graph.

Fix Multiplexing Deadlock

Change assignment from logical channels to physical connections to prevent starvation effects that cause distributed deadlocks in the network stack.

Configurable Iteration Aggregators

Aggregators are currently hard wired into the bulk iteration runtime code. Make them configurable on the Bulk-Iteration plan construct.

Aggregators must be hosted by the iteration head task, communicated to the convergence criterion and communicated back, such that the tasks can access the previous superstep's aggregation result.

It is open to discussion whether to keep the entire history of aggregation results, only the previous iteration's aggregate result, or whether this should be made configurable.

Support custom and efficient (in-memory) pre-aggregations (without Combiner)

I use and evaluate Stratosphere in the course of my thesis and want to give feedback on what I found is missing or could be improved. This is written from a user perspective.

Requirement

We have partitioned input we want group and aggregate (by a grouping-key) to a single or multiple records. The input is either streamed from hdfs or pipelined from other contracts. Our input is not (necessarily) partitioned by the grouping-key. The input we want to aggregate is the input of any tuple-at-a-time contract such as Map, Cross or Match. We can express this requirement in terms of SQL: “SELECT key, aggregate(value) FROM records GROUP BY key”. Common cases are that the key has cardinality 1, 2, or any other small, medium or high cardinality.

Problem

Currently only the Combine+Reduce/CoGroup strategy is supported: Forward all data to a Combiner, repartition it’s output by the grouping-key and do the final aggregation in Reduce. For N record this always involves emission of N intermediate records to the Combiner. Even though the values are just forwarded to the local Combiner, there is a lot of copying, (de)serialization and probably some networking overhead involved (jobmanager status). Also the user has to manually write the code to serialize the input to a record and include the Combiner. If we want to do a join and aggregate, this could mean that a udf does nothing but to forward the joined tuples to a Combiner/Reducer.

Goal

Enable efficient in-memory aggregation (for the case where efficiency matters - otherwise I can use Combiner/Reducer approach), reduce the (de)serialization overhead, reduce the “stupid” code to be written by the user (e.g. forward all tuples to a combiner).

Use cases

Some use cases that would benefit from efficient pre-aggregation

  • Group and aggregate with low group-key cardinality: Let’s assume cardinality is 1 and we want to do a simple aggregation like a sum, it is obviously much more efficient (and easy to code) to just do the pre-aggregation (sum) in the udf, and send a single record to a reducer.
  • Wordcount: “SELECT word, sum(occurence) FROM word-occurences”. In this case occurence is a column that has constant value 1. Very simple to aggragete in-memory.
  • Machine Learning: Accuracy computation: “SELECT correctly-classified, sum(correctly-classified) GROUP BY correctly-classified“. E.g. if we trained a model (e.g. a numeric weight vector for logistic regression) the accuracy is defined as #correct-classified/#total. I do this in one of my jobs and have currently to emit N records with constant value true or false to Combiner.

Possible solutions

  1. Give close() the option to write output in all tuple-at-a-time contracts (hadoop-way).
  2. Give UDF knowledge about whether this is the last element (hasNext). Almost similar to 1.
  3. Add iterator-option to tuple-at-a-time contracts (map, cross, match). The contracts can be configured to pass an iterator over all records that are pipelined/streamed to this udf. E.g. via CrossContract.builder(MyCross.class).asIterator(true)....build(). I assume that this is easy to implement because the code that calls the udf probably looks like “while (it.hasNext){ udf(it.next) }. Not sure if this is true. The user would then implement a separate stub, e.g. MatchIteratorStub instead of MatchStub.
  4. Keep it as it is (Combiner is the only way to pre-aggregate)

Discussion

  • The current way, to use a Combiner, is very explicit and gives the system more knowledge about what happens. In an ideal world, the optimizer chooses how to do the pre-aggragation, and we would just define the aggregation function in the combiner. Currently however, we have to hardcode the serialization code that forwards everything to the Combiner and the system would have to understand and modify the udf to get rid of the serialization and to do a direct pre-aggregation.
  • Solutions 1-3 do more or less the same. We can write our own pre-aggregation/combiner code. If cardinality is 1, this is just a counter, if cardinality is medium, we can use a HashTable. After the udf processed all records, it can send a single or multiple pre-aggregated records to the Reducer. This is less explicit, but more powerfull and enables high efficiency. The system however still knows that we do a grouping, because we still have to use Reduce.
  • To make it easy for Hadoop users to switch over, solution 1 or 2 would be fine. Solution 3 is basically the same, but looks a bit different.

Looking forward to hearing your opinion. I hope I didn't miss anything and the feature is already existing;)

Rework optimizer cost functions.

The optimizer cost functions and cost heuristics need to be faithfully implemented to produce good plans and fall back to robust plans in case of missing estimates.

Enable Range Partitioner

The range partitioner is currently disabled. We need to implement the following aspects:

  1. Distribution information, if available, must be propagated back together with the ordering property.

  2. A generic bucket lookup structure (currently specific to PactRecord).

Tests to re-enable after fixing this issue:

  • TeraSortITCase
  • GlobalSortingITCase
  • GlobalSortingMixedOrderITCase

Implement deferred index updates Workset Iterations

Currently, the solution set must be updated for a certain key directly after it was accessed for that key. This is the fastest case.

While this covers a large portion of the cases, deferred updates (i.e. when affecting other partitions) may be required and increase the expressiveness of the model.

nephele-ec2cloudmanager dependency still in stratosphere-dist pom.xml

E2C is not longer used eu.stratosphere:nephele-ec2cloudmanager:jar:0.2 is referenced in the pom of stratosphere-dist

Trying to build produces this failure

[ERROR] Failed to execute goal org.apache.maven.plugins:maven-assembly-plugin:2.2-beta-5:single (generate-package) on project stratosphere-dist: Failed to create assembly: Failed to resolve dependencies for project: eu.stratosphere:stratosphere-dist:pom:0.2: Missing:
[ERROR] ----------
[ERROR] 1) eu.stratosphere:nephele-ec2cloudmanager:jar:0.2
[ERROR]

Restructure pact-tests

The abstract classes for running in-jvm tests should be available to the outside (e.g. to framework built on-top of ozone)

Job compilation fails due to missing commons packages

The commons-lang and commons-configuration are missing from the classpath.

In particular, commons-configuration is required by hadoop-core and thereby by pact-runtime, while commons-lang is required by commons-configuration.

In order to solve the issue the appropriate lines for including commons-configuration and commons-lang to the classpath should be added to pact-client.sh, nephele-jobmanager.sh and nephele-taskmanager.sh (not sure about the last two).

Add HBase Tests

The HBase Table Input- and OutputFormats currently lack tests.
We need to add some tests that mock the HBase infrastructure and verify correct setup and correct conversion of the returned data.

Unify Pact and Nephele Memory Buffers

Currently, Pact and Nephele use two different types of memory buffers, which results in unneccessary data movement between memory pools and prevents trading / balancing of these pools.

Attach output of a contract as additional input for another contract (Broadcast variables / Closures)

Another PACT API feedback from user perspective.

Requirement

  • Make the output (all records) of one or more contracts available in every udf call of another contract.
  • This should work in an efficient way. Using the Cross-Workaround, we receive the same pactrecord for every udf call and the first line is usually a deserialization (we cache the value, but then need an additional check if we already cached).
  • It should be as explicit as possible to use this functionality (broadcast/attach). In my opinion it is a very common pattern. This gives the system more knowledge about our intention and more options for optimization.

Problem

  • The often proposed workaround, to use Cross, only works for the special case where 1) the input to broadcast consists of a single record and 2) we want to broadcast the output of a single contract only and 3) it only works for Map, not for Reduce, CoGroup or Match
  • Using Cross for broadcasting of a single output record feels more like a workaround than a natural fit (especially because it works only for a very special cases) Broadcasting, distributing or attaching feels much more natural.

Solutions

  1. Implement a Distributed cache in the hadoop way. It has functionality to read or write files.
  2. Provide generic workaround contracts (like "flatten" multiple records to a single one).
  3. Add explicit support to broadcast and attach the output of a previous contract to a contract as an additional input. Before the udf is called, another method should be called with all the inputs so that the udf can deserialize the records, optionally customize them and store in-memory.
    I didn't think too long about this, but a api (here for Map) could look like this:
MapContract myMap = MapContract.builder(MyMap.class)
  .input(myDataSource)
  .attachAsAdditionalInput(contract1ToAdd)
  .attachAsAdditionalInput(contract2ToAdd
  .build()

And a new method in MapStub that is called before the first map() call

public void additionalInputs(Iterator<PactRecord>[] iterators) {
  // read records from additional inputs, process and cache in-memory 
}

Discussion

  • In Hadoop the usual way to solve this is the distributed cache. This is very powerfull and flexible, but is probably not a good fit for this requirement. The output is not stored as a file (as it is for hadoop) and we want to pipeline it.
  • I currently use workaround contracts, but this is just a (slow) workaround and not a good long term solution in my opinion
  • I would vote for adding explicit support - probably there are many more options than the one I described.

Use Cases / Examples

  • In my current machine learning job I want to basically scan/map over the training data (tf-idf vectors) and during the processing I need the output of two prior contracts. The first output to be broadcasted is a single record (a model = vector). The second output to be broadcasted consists of multiple records. I had to write two workaround contracts. First experiments showed that the pact implementation is slower than Hadoop, an I guess the many workarounds are a reason.
    • The first Reduce workaround "flattens" all output records to a single record. I had to emit a constant key (because the functionality to reduce without key didn't work and I didn't know about it)
    • The second Cross workaround makes one out of two records, to overcome the limitation of two inputs in Cross.

Bug in Nephele Scheduler

For cyclic data flows, the Nephele scheduler may deadlock, causing the job to freeze. This problem is reproducible locally using the PageRankITCase.

A workaround for now is to always use network channels. However, this doe increase scheduling latency, as network-connected tasks are scheduled lazily.

Rework instance configuration.

Right now, Nephele still uses the EC2-inspired instance configuration model. The Pact compiler connects to obtain information about these instances, such as how many are available, and how much memory they have. This is error prone to configure and also a bit buggy, it frequently leads to wrong memory bookkeeping if different instance types are configured.

Do we need support for heterogeneous setups where different nodes have different capabilities and should be assigned a different amount of work? If we defer this to later, we can greatly simplify the logic and configuration:

  1. No configuration for the instance type. The internal instance manager has a default profile which is okay for all cluster instances.

  2. An explicit value of how many slots for parallel operators we have on each node (such as 8 on an eight core machine). There should be a default value in the config which could be overridden via query-specific parameters.

  3. An explicit config entry that defines how much memory should be used for networking and how much for query processing. The query processing memory amount is used to initialize the MemoryManager and is also used by the pact-compiler to parameterize the memory available to the operators. That way we can also get rid of the communication between the compiler and the job manager on plan compilation. Eventually it would be good to run the compiler as a child process of the job-manager anyways.

In the long run we want to make query processing memory and network memory one value (overall system memory, the rest is the UDF Java heap memory) which is shared for materialization in the network stack and the runtime operators.

Fix iteration head merging logic

The iteration head task is chained/merged with the first step function operator, if possible, to reduce the number of channels and threads. This is however, only possible in certain (yet common) situations.

Currently, the head is erroneously merged with the operator, even the initial partial solution (or workset) and the later partial solution (or workset) have different local strategies at that operator. Add a condition in the job graph generation to prevent this situation.

Note: This fixes the CoGroupConnectedComponents test.

Rework Configuration Objects

Currently, the configurations are implemented hacky. Everything is represented as a serialized string and there is no clean interface, such that different flavors of configurations (global-, delegatin-, default) are inconsistent.

I propose to rework the configuration as a map of objects, which are serialized on demand with either a serialization library, or default serialization mechanisms. Factoring out the interface of a Configuration allows to keep all flavors consistent.

Add flag for visualization of pact plan with the nephele SWT visualization

The visualization of pact iterations contains unwanted items like "Fake Tail" and "Bulk-Iteration Sync"
to visualize only the wanted parts, a flag for visualization should be added accessible from to the Classes:
eu.stratosphere.nephele.managementgraph.GroupVertex
eu.stratosphere.nephele.managementgraph.ManagementGroupVertex

Rework Nephele Discovery Service

Currently, the nephele discovery service adds extra startup time and port conflicts, but is never used. It was originally designed for certain cloud setups and is obsolete in this version.

Implement Pact Tuple API

Offer an additional API based on tuples, to avoid the costly and lazy-schema free abstraction of the PactRecord.

Change Maven Artifact Names

Currently, ozone uses the same artifact names as Stratosphere 0.2. That means it is not possible to work with both in parallel (as in code migration, cross testing, ...)

Ozone should have new artifact names and a new Version. I propose Version 0.5 (some leap from 0.2)

Port HBase access to ozone

During the IMR Hackathon, an HBase access implementation to Stratosphere 0.2.1 was created.

We need to port this to ozone and the latest version of HBase.

This is needed by several projects, including IMR and Deutsche Telekom.

Marcus provided some issues that that current prototype has that could be resolved:

  1. Currently, we only support fetching a single row version at a time. If we want to support multiple versions per record, that needs to be added and we need to specify how we map this to pact records.
  2. All columns of a row are currently squeezed into a single pact record field (basically, we serialize the result object returned by HBase). This was fine for the Hackathon and didn't really matter for the Meteor integration (here, we convert this to a JSON structure, and leave to the mapping tho Arvid's schema code).
    But I guess it's not the PACT way of handling this, since you'd rather have columns mapped to record fields for fast access/use as key, right?
    So we need to write some code so people can specify which columns go where.
    This may actually be more tricky than it sounds, because you might not know the column qualifier in advance, only the column family.
    The question here is: how do we handle nesting? There may be different/unknown numbers of qualifier/values pairs for a certain column family. Do we just unnest them and emit multiple records for a single row, or do we want to provide a way to nest all these values into a single records.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.