Code Monkey home page Code Monkey logo

emma's Introduction

Emma

A quotation-based Scala DSL for scalable data analysis.

Build Status

Goals

Our goal is to improve developer productivity by hiding parallelism aspects behind a high-level, declarative API which maximises reuse of native Scala syntax and constructs.

Emma supports state-of-the-art dataflow engines such as Apache Flink and Apache Spark as backend co-processors.

Features

DSLs for scalable data analysis are embedded through types. In contrast, Emma is based on quotations (similar to Quill). This approach has two benefits.

First, it allows to reuse Scala-native, declarative constructs in the DSL. Quoted Scala syntax such as for-comprehensions, case-classes, and pattern matching are thereby lifted to an intermediate representation called Emma Core.

Second, it allows to analyze and optimize Emma Core terms holistically. Subterms of type DataBag[A] are thereby transformed and off-loaded to a parallel dataflow engine such as Apache Flink or Apache Spark.

Examples

The emma-examples module contains examples from various fields.

Learn More

Check emma-language.org for further information.

Build

  • JDK 7+ (preferably JDK 8)
  • Maven 3

Run

mvn clean package -DskipTests

to build Emma without running any tests.

For more advanced build options including integration tests for the target runtimes please see the "Building Emma" section in the Wiki.

emma's People

Contributors

aalexandrov avatar akunft avatar asteriosk avatar dbehrouz avatar felixneutatz avatar fschueler avatar ggevay avatar harrygav avatar jorokr21 avatar lmadaitahy avatar parkl avatar skunert avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

emma's Issues

Parallelize `for {a <- b; c <- DataBag(f(a))} yield Foo(c, a)`

Parallelize currently fails for this strange flatMap:

for {
  a <- b
  c <- DataBag(f(a)) // f returns some Seq
} yield Foo(c, a) // Note that we refer to `a` here as well

At first glance, rewriting this to combinators looks like this:

b.flatMap(a => DataBag(f(a))).map(c => Foo(c, a))

But this doesn't work, since a is not in scope in the UDF of the map.

Implement TPC-H test suite.

This entails

  1. Adding more TPC-H examples in the tpch package.
  2. Adding integration tests for the implemented queries in TPCHTest.

You can use the already available implementations as a reference point.

The issue will not be assigned to a single person. I'll use the following list to track progress and volunteers:

  • Query01 (added in 5f12540)
  • Query02 (added in 14b7ec3)
  • Query03 (added in 5f12540)
  • Query04
  • Query05 (added in abae186)
  • Query06 (added with #118)
  • Query07 (added with #118)
  • Query08 (added with #118)
  • Query09 (added with #118)
  • Query10 (added with #124)
  • Query11
  • Query12 (added with #157)
  • Query13
  • Query14
  • Query15
  • Query16
  • Query17
  • Query18
  • Query19
  • Query20
  • Query21
  • Query22

Running Emma Jobs on a cluster

Running Emma Jobs on a Flink cluster:

Works fine :)

Example:

mvn clean package -Pflink 

./flink run --classpath /tmp/emma/codegen/ \
-c eu.stratosphere.emma.examples.CommandLineInterface \
/home/felix/emma/emma/emma-sketchbook/target/emma-sketchbook-1.0-SNAPSHOT.jar \
tc /home/felix/emma/emma/emma-sketchbook/src/test/resources/graphs/triangle-cnt/edges.tsv

Running Emma Jobs on a Spark cluster:

works in general, e.g. for this case:

val algorithm = emma.parallelize {    
    val len = 10
    val inp = fromPath(input)
    val size = DataBag(inp).withFilter(_.length > len).fetch().size
    println("size: " + size)
    size
}

with command:

mvn clean package -Pspark

cd spark-1.4.1
mvn -Dscala-2.11 -DskipTests clean package

./bin/spark-submit \
--verbose \
--class eu.stratosphere.emma.examples.CommandLineInterface \
--master spark://Tardis:7077 \
--conf spark.executor.extraClassPath=/tmp/emma/codegen/ \
--conf spark.driver.extraClassPath=/tmp/emma/codegen/ \
/home/felix/emma/emma/emma-sketchbook/target/emma-sketchbook-1.0-SNAPSHOT.jar \
bt \
spark-remote spark://Tardis 7077 \
/home/felix/emma/emma/emma-common/src/test/resources/lyrics/Jabberwocky.txt

Doesn't work if you want to use CSVInputFormat.
Error: macro implementation not found: materializeCSVConvertors

When I write a simple Spark Job and call the macro from the Jar e.g. in a mapper, this works perfectly fine.

So the next thing I tried was to integrate the Macro code into the Spark DataFlowGenerator. Now the code is executed but it doesn't find the class by which it is typed. E.g. in case of the triangle count it doesn't find the edge class:

java.lang.NoClassDefFoundError: eu/stratosphere/emma/examples/graphs/TriangleCount$Schema$Edge

which is really strange because this is in the jar of the executed program.

I will further investigate ...

Realease an Emma-compatible Flink fork

Unfortunately several critical PRs didn't make it in the 0.9 release, so we need to prepare and offer a fork which works with Emma out of the box.

The concrete steps are

  • sync up the apache/flink master with the stratosphere/flink master
  • remove all branches which were created by forking from stratosphere/flink
  • base the emma branch on the release-0.9
  • change the groupID to 'eu.stratosphere' (cherry pick this commit)
  • change the default profile to Scala 2.11 (cherry pick this commit)
  • merge FLINK-1789
  • merge FLINK-1297 (not yet)
  • push to the Stratosphere Maven Central

The TODO list up to the FLINK-1789 merge will be handled by @FelixNeutatz. The actual release and Maven push will be handled by @twalthr.

Cannot apply more than one fold-group fusion in the same comprehension

While trying to get BeliefPropagation to work with parallelize, I encountered a situation, where one of the com.comprehension.substitute(foldTree, replTree) calls in fold-group fusion introduces a type error: the expression to be replaced is used as an argument to a generic function, where the type argument is explicitly given, and this type argument should be changed together with the substitution. Concretely, the following happens:

There is the ArrowAssoc class, for creating Tuple2s with ->, which looks like this: def -> [B](y: B): Tuple2[A, B], and there is a call to it, that looks like this:

foo.->[Double](g.values.fold[Double](/*some complicated thing here*/))

And after substitution, this looks like this:

foo.->[Double](g.values)

The problem with this, is that the type of g.values is not Double, but DataBag[Double], so after the substitution, this should look like this:

foo.->[DataBag[Double]](g.values)

@joroKr21, will this problem be solved by solving #69 ?

If you would like to observe the problem "in action", then here is my debugging branch: https://github.com/ggevay/emma/tree/fold-group-fusion-subst-bug . Put breakpoints to ComprehensionAnalysis.scala lines 360 and 364.
(And you can see the resulting code wreaking havoc, if you put a breakpoint to ComprehensionModel.scala line 244.)

empty() semantics

The semantics of DataBag.empty() differ between the sequential and parallel backends. The former is equivalent to Scala's isEmpty, whereas the latter is the same as nonEmpty. We should choose one and maybe rename the method.

TicTacToe is not working on Spark

Specifically, this is the part that fails:

    val inDegrees = for {
      g <- allEdgeTargets.groupBy {x => x}
    } yield (g.key, g.values.count())

allEdgeTargets looks OK, but then inDegrees is empty. The generated code looks like this:

object inDegrees$comp$2 {
  import _root_.org.apache.spark.SparkContext._;
  def run(sc$spark$1: org.apache.spark.SparkContext, allEdgeTargets: _root_.eu.stratosphere.emma.api.DataBag[eu.stratosphere.emma.examples.graphs.TicTacToe.GameState]) = {
    val result$spark$5 = allEdgeTargets.asInstanceOf[_root_.eu.stratosphere.emma.api.ParallelizedDataBag[eu.stratosphere.emma.examples.graphs.TicTacToe.GameState, _root_.org.apache.spark.rdd.RDD[eu.stratosphere.emma.examples.graphs.TicTacToe.GameState]]].repr.map(((x: eu.stratosphere.emma.examples.graphs.TicTacToe.GameState) => scala.Tuple2(x, {
      val x$6 = x;
      1L
    }))).reduceByKey(((x$7: Long, x$5: Long) => x$7.+(x$5))).map({
      case scala.Tuple2((x$spark$1 @ _), (y$spark$1 @ _)) => _root_.eu.stratosphere.emma.api.Group(x$spark$1, y$spark$1)
    }).map(((g: eu.stratosphere.emma.api.Group[eu.stratosphere.emma.examples.graphs.TicTacToe.GameState,Long]) => scala.Tuple2.apply[eu.stratosphere.emma.examples.graphs.TicTacToe.GameState, scala.Long](g.key, ((((g.values): scala.Long)): scala.Long)))).cache();
    result$spark$5.foreach(((x$5) => ()));
    result$spark$5
  }
}

I inserted prints into the UDFs of the generated code, and the UDF of the first map as well as the UDF of the reduceByKey gets called (lots of times), but the UDF of the second map doesn't get called. (And this is the case even if I make the UDF of the second map a non-partial function, so the problem is not with the pattern matching.) So I guess Spark is messing up something during the execution of the reduceByKey, but I have no idea what.

Scalac typecheck and untypecheck are not idempotent

As we have seen in this issue with scalac, typecheck and untypecheck are currently not idempotent. There is no theoretical limitation behind this, it's just a buggy implementation. This means that the UDF code we pass on to the generated dataflows for parallel execution might get broken in subtle ways, especially when it contains any of the following:

  • case classes or case objects
  • extractors (pattern matching statements)
  • lazy vals

The compiler should eventually be fixed (maybe in Scala 2.12), but in the mean time it might be interesting to explore how other libraries that transform UDF code with macros (e.g. scala/async) solve this particular problem.

Planning: 0.1.0 Release

Backlog

The backlog for the 0.1.0 release can be roughly divided into the following stories.

Compiler Pipeline

Cluster Experiments

Documentation & Tutorials

Shadowing of package names

The macros are not properly sanitized. E.g. if we define one of the following values inside emma.parallelize: eu, org, java, scala or some other top level package, compilation will fail. The solution is to prepend _root_ to package accesses that are nested in UDF code. This issue is related to #22, but has a different cause.

Fold-Group fusion might lead to suble errors when reusing names

When group names are reused in for comprehensions (see below), fold-group fusion might lead to subtle and hard to debug errors, usually type-check errors or non-existing member errors.

for (g <- bag1 groupBy key1) yield g.values.fold(...)
...
for (g <- bag2 groupBy key2) yield g.values.fold(...)

Initially I thought that this is due to #22, but it turns out to be a separate issue. Specifically, the helper method generatorFor finds the first group with the same name, which will in both cases return the group generated by bag1.

Ideally we could extract a list of group definitions and a list of their respective generators and zip them together. But as far as I can tell, Trees are traversed top-down, whereas Expressions are traversed bottom-up, so there is no way to guarantee that they are in the correct order. This leaves the other option of using TermSymbols instead of TermNames for the lhs of generators.

The expression problem

Like any more involved AST, the comprehension model is suffering from the infamous expression problem. It's easy to add operations, but hard to add combinators (would involve modifying all operations). I can't think of a reason to add more combinators right now, but still...

There is however, a neat solution by Oliviera and Cook (2012), utilizing object algebras. I think it's a worthwhile read at this point.

Planning: Website

This is the meta-issue tracking the progress on the Emma Website and pointing to sub-issues.

Technologies

We the following choice of technologies.

Backend Frameworks

I am very familiar with the YII framework (PHP), but I also worked at some projects with the Play (JAVA) and spring boot (JAVA) frameworks. Are there any preferences?? I think a java framework would be more suitable for our purpose (invoking the emma plans with java).

Frontend CSS frameworks

Palettes

  1. Variant A

Fonts

  1. Try using one of these 8 fonts for the logo
<link href='http://fonts.googleapis.com/css?family=Norican|Pacifico|Courgette|Cookie|Niconne|Satisfy|Damion|Berkshire+Swash' rel='stylesheet' type='text/css'>

Tasks

  • @aalexandrov invite @andi3 to the Emma resources Dropbox folder
  • create tutorial out of ipython (begin emma project setup)
  • check for typesetting layout for text flow & Scala code, look at Symfony Doc

Serialization of combinators

While reading about quasiquotes it occurred to me that we could implement the serialization of combinators by making them liftable like this. Then we would just write q"$combinator" and it would be automatically lifted into a Tree. Does that make any sense?

Fold type inference

The way DataBag.fold is defined currently prevents type inference, i.e. one has to explicitly provide the result type of the fold, like this:

val count = fold[Int](0, _ => 1, (x, y) => x + y)

although theoretically it can be inferred from the zero element z. This is a deficiency of Scala's typechecker. The workaround is to use 2 parameter lists like this:

def fold[B](z: B)(s: A => B, p: (B, B) => B): B
val count = fold(0)(_ => 1, (x, y) => x + y)

For reference see foldLeft and foldRight in the standard library. The question is what makes more sense from a user's perspective? I would argue that emulating the Scala collection API is less surprising for the programmer.

Collect cost formulas for basic operators.

For the upcoming cost-based optimizer, we have to rely on cost formulas for the basic operators. As a starting point, I suggest do dig up the formulas from the Flink optimizer and write them down in this issue.

Constructor of Stateful.Bag should do a deep copy

This is the ctor of Stateful.Bag:

private[api] def this(bag: DataBag[S]) =
      this(bag.fold(mutable.Map.empty[K, S])(s => mutable.Map(s.identity -> s), _ ++ _))

The problem here is that we are making a shallow copy of s, and then changed state in the stateful can be seen in the original DataBag as well.

Normalize filter predicates

Complex boolean predicates should be normalized to CNF and treated as a chain of filters in the comprehend method time.

For example, the following comprehension

val res = for (
  x <- X; 
  y <- Y; 
  z <- Z; 
  if x.a = "a" && x.b = "b" && x.a = y.a && x.b = z.b) yield (x,y,z)

should be executed effectively rewritten as

val res = for (
  x <- X; 
  y <- Y; 
  z <- Z; 
  if x.a = "a"; if x.b = "b"; if x.a = y.a; if x.b = z.b) yield (x,y,z)

The CombinatorRewriteEngine state machine will then handle predicate push-down and generate a plan with early filters and joins instead of a crosses followed by a filter.

Add StatefulBackend for Spark

An implementation of AbstractStatefulBackend should be made for Spark, that is similar to the StatefulBackend we have for Flink.

Planning: Tutorials

For the tutorials the first step is to test and make a comprehensive guide for the following:

  • Quick-start with downloaded binaries / maven / sbt
  • Run Emma on Spark/Flink single-node
  • Run Emma on Spark/Flink YARN
  • Minimal running example (e.g. word count)

From there we can continue with a programming guide to showcase the potential of Emma.
Any suggestion on what to include in the programming guide are welcome.

Of course this should be integrated seamlessly on the website.

Interactive shell

Reuse the Scala REPL to make an interactive Emma shell. There are two main points to this:

  • CLI-based shell
  • Zeppelin interpreter (no binaries available, however)

We can add include those in the release with almost no work at all.

Add zipWithIndex() to DataBag API

I was wondering if this would work right now how it should (create unique indices):

var index = 0

val indexedWords = for (word <- words) yield (word, index += 1)

Flink and Spark offer zipWithIndex which does that - (how) can we have this in Emma?
We could probably recover it somehow in the macro but it might be easier to include it into the API and directly delegate it to the corresponding Spark/Flink implementations.

Re-assign enclosing object properties to local vals.

Code bracketed with parallelize is normalized as a first step of the compiler chain.

As part of the normalization

  • read references to properties of the enclosing class should be assigned to local vars (resolved via 04032bd);
  • write references to properties of the enclosing class (e.g. u) should be prohibited (resolved via #141)
  • references to methods of the enclosing class (e.g. g) should be prohibited (resolved via #141)

For example, in the following code example the "parallelized" code accesses k.

class KMeans(val k) {

  def g(x: Int): Int = x + 42

  var u = 0

  def run() = {
    val alg = emma.parallelize {
      val z = f(k) // OK as f is not a local method
      val w = g(k) // not OK as g is local method
      // ...
      xs.map(x => { /* ... */; u = u + 1; /* ... */ } // not OK as u is local state
    }
  } 
}

As part of the normalization, the code should be rewritten as:

      val __this$k = k
      val z = f(__this$k)
      // ...

Migrate to symbol based-substitution

ะขhe lookup in the substitute methods is based on a TermName parameter. With the changes introduced by #68, it makes sense to do the following:

  • add substitute variants with lookup based on a TermSymbol parameter,
  • mark the TermName based substitution variants as deprecated and remove them after a while once (we can verify that they are really not needed anymore)
  • rework the code to use the new substitute methods.

Add linear algebra primitives to the API

The DataBag type currently offers only an abstraction for parallel collections (DataBag[A]). While this allows expression of basic dataflows, certain classes of algorithms (for example, machine learning) might benefit from a direct linear algebra abstraction.

As a first part of this effort, I suggest to define two core API traits:

  • Matrix[A : Numeric],
  • Vector[A : Numeric],

with empty methods (def op(...): T = ???).

We can use the approach suggested by @joroKr21 in #50 and implement the operators as macros.

@fschueler, @FelixNeutatz: do you want to take a stab on this with me next week?

[emma-sketchbook] Multiple jetty dependencies with Spark & Flink in maven

Flink and Spark both depend on jetty but with different versions.
The sketchbook depends on both emma-flink and emma-spark. Maven picks the jetty dependencies from both modules, leading to incompatibilities with spark which needs 8.1.14 vs flink with 8.0.

If we want tests for different systems without multiple profiles and test-files we have to find a way around this. An error occurs for example when running the TPCHTests for spark and flink:

java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
    at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaVanillaMethodMirror2.jinvokeraw(JavaMirrors.scala:384)
    at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaMethodMirror.jinvoke(JavaMirrors.scala:339)
    at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaVanillaMethodMirror.apply(JavaMirrors.scala:355)
    at eu.stratosphere.emma.runtime.package$.factory(package.scala:99)
    at eu.stratosphere.emma.examples.tpch.TPCHTest.testQuery03Spark(TPCHTest.scala:90)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
    at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:74)
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:211)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:67)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
Caused by: java.lang.NoClassDefFoundError: org/eclipse/jetty/util/SingletonList
    at org.eclipse.jetty.http.PathMap.put(PathMap.java:185)
    at org.eclipse.jetty.servlet.ServletHandler.updateMappings(ServletHandler.java:1074)
    at org.eclipse.jetty.servlet.ServletHandler.setServletMappings(ServletHandler.java:1159)
    at org.eclipse.jetty.servlet.ServletHandler.addServletWithMapping(ServletHandler.java:798)
    at org.eclipse.jetty.servlet.ServletContextHandler.addServlet(ServletContextHandler.java:338)
    at org.apache.spark.ui.JettyUtils$.createServletHandler(JettyUtils.scala:99)
    at org.apache.spark.ui.JettyUtils$.createServletHandler(JettyUtils.scala:87)
    at org.apache.spark.ui.WebUI.attachPage(WebUI.scala:67)
    at org.apache.spark.ui.WebUI$$anonfun$attachTab$1.apply(WebUI.scala:60)
    at org.apache.spark.ui.WebUI$$anonfun$attachTab$1.apply(WebUI.scala:60)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.ui.WebUI.attachTab(WebUI.scala:60)
    at org.apache.spark.ui.SparkUI.initialize(SparkUI.scala:50)
    at org.apache.spark.ui.SparkUI.<init>(SparkUI.scala:61)
    at org.apache.spark.ui.SparkUI$.create(SparkUI.scala:151)
    at org.apache.spark.ui.SparkUI$.createLiveUI(SparkUI.scala:106)
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:268)
    at eu.stratosphere.emma.runtime.SparkLocal.<init>(Spark.scala:77)
    ... 37 more

Here is the result of mvn dependency:tree -Dverbose -Dincludes=org.eclipse.jetty:

[INFO] Scanning for projects...
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Build Order:
[INFO] 
[INFO] emma
[INFO] emma-common-macros
[INFO] emma-common
[INFO] emma-backend
[INFO] emma-language
[INFO] emma-flink
[INFO] emma-sketchbook
[INFO] emma-spark
[INFO] 
[INFO] Using the builder org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder with a thread count of 1
[INFO]                                                                         
[INFO] ------------------------------------------------------------------------
[INFO] Building emma 1.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO] 
[INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ emma ---
[INFO]                                                                         
[INFO] ------------------------------------------------------------------------
[INFO] Building emma-common-macros 1.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO] 
[INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ emma-common-macros ---
[INFO]                                                                         
[INFO] ------------------------------------------------------------------------
[INFO] Building emma-common 1.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO] 
[INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ emma-common ---
[INFO]                                                                         
[INFO] ------------------------------------------------------------------------
[INFO] Building emma-backend 1.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO] 
[INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ emma-backend ---
[INFO]                                                                         
[INFO] ------------------------------------------------------------------------
[INFO] Building emma-language 1.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO] 
[INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ emma-language ---
[INFO]                                                                         
[INFO] ------------------------------------------------------------------------
[INFO] Building emma-flink 1.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO] 
[INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ emma-flink ---
[INFO] eu.stratosphere:emma-flink:jar:1.0-SNAPSHOT
[INFO] \- eu.stratosphere:flink-clients:jar:0.9-SNAPSHOT:provided
[INFO]    +- eu.stratosphere:flink-runtime:jar:0.9-SNAPSHOT:provided
[INFO]    |  +- (org.eclipse.jetty:jetty-server:jar:8.0.0.M1:provided - omitted for duplicate)
[INFO]    |  +- (org.eclipse.jetty:jetty-security:jar:8.0.0.M1:provided - omitted for duplicate)
[INFO]    |  \- (org.eclipse.jetty:jetty-servlet:jar:8.0.0.M1:provided - omitted for duplicate)
[INFO]    +- org.eclipse.jetty:jetty-server:jar:8.0.0.M1:provided
[INFO]    |  +- org.eclipse.jetty:jetty-continuation:jar:8.0.0.M1:provided
[INFO]    |  \- org.eclipse.jetty:jetty-http:jar:8.0.0.M1:provided
[INFO]    |     \- org.eclipse.jetty:jetty-io:jar:8.0.0.M1:provided
[INFO]    |        \- org.eclipse.jetty:jetty-util:jar:8.0.0.M1:provided
[INFO]    +- org.eclipse.jetty:jetty-security:jar:8.0.0.M1:provided
[INFO]    |  \- (org.eclipse.jetty:jetty-server:jar:8.0.0.M1:provided - omitted for duplicate)
[INFO]    \- org.eclipse.jetty:jetty-servlet:jar:8.0.0.M1:provided
[INFO]       \- (org.eclipse.jetty:jetty-security:jar:8.0.0.M1:provided - omitted for duplicate)
[INFO]                                                                         
[INFO] ------------------------------------------------------------------------
[INFO] Building emma-sketchbook 1.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO] 
[INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ emma-sketchbook ---
[INFO]                                                                         
[INFO] ------------------------------------------------------------------------
[INFO] Building emma-spark 1.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO] 
[INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ emma-spark ---
[INFO] eu.stratosphere:emma-spark:jar:1.0-SNAPSHOT
[INFO] \- org.apache.spark:spark-core_2.11:jar:1.2.1:provided
[INFO]    +- org.eclipse.jetty:jetty-plus:jar:8.1.14.v20131031:provided
[INFO]    |  +- org.eclipse.jetty:jetty-webapp:jar:8.1.14.v20131031:provided
[INFO]    |  |  +- org.eclipse.jetty:jetty-xml:jar:8.1.14.v20131031:provided
[INFO]    |  |  |  \- (org.eclipse.jetty:jetty-util:jar:8.1.14.v20131031:provided - omitted for duplicate)
[INFO]    |  |  \- org.eclipse.jetty:jetty-servlet:jar:8.1.14.v20131031:provided
[INFO]    |  |     \- (org.eclipse.jetty:jetty-security:jar:8.1.14.v20131031:provided - omitted for duplicate)
[INFO]    |  \- org.eclipse.jetty:jetty-jndi:jar:8.1.14.v20131031:provided
[INFO]    |     \- (org.eclipse.jetty:jetty-server:jar:8.1.14.v20131031:provided - omitted for duplicate)
[INFO]    +- org.eclipse.jetty:jetty-security:jar:8.1.14.v20131031:provided
[INFO]    |  \- (org.eclipse.jetty:jetty-server:jar:8.1.14.v20131031:provided - omitted for duplicate)
[INFO]    +- org.eclipse.jetty:jetty-util:jar:8.1.14.v20131031:provided
[INFO]    \- org.eclipse.jetty:jetty-server:jar:8.1.14.v20131031:provided
[INFO]       +- org.eclipse.jetty:jetty-continuation:jar:8.1.14.v20131031:provided
[INFO]       \- org.eclipse.jetty:jetty-http:jar:8.1.14.v20131031:provided
[INFO]          \- org.eclipse.jetty:jetty-io:jar:8.1.14.v20131031:provided
[INFO]             \- (org.eclipse.jetty:jetty-util:jar:8.1.14.v20131031:provided - omitted for duplicate)
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary:
[INFO] 
[INFO] emma .............................................. SUCCESS [  2.334 s]
[INFO] emma-common-macros ................................ SUCCESS [  0.285 s]
[INFO] emma-common ....................................... SUCCESS [  1.107 s]
[INFO] emma-backend ...................................... SUCCESS [  0.133 s]
[INFO] emma-language ..................................... SUCCESS [  0.380 s]
[INFO] emma-flink ........................................ SUCCESS [  2.904 s]
[INFO] emma-sketchbook ................................... SUCCESS [  0.163 s]
[INFO] emma-spark ........................................ SUCCESS [  2.890 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 11.606 s
[INFO] Finished at: 2015-03-30T15:54:30+01:00
[INFO] Final Memory: 28M/439M
[INFO] ------------------------------------------------------------------------

Create quickstarts for Spark / Flink

Based on the Flink quickstarts, create an emma-quickstart module with an archetype Maven pom and a quickstart.sh script which can be used to bootstrap an Emma related project.

TODOs for the PR:

  • quickstart.sh works and offers a choice: Flink or Spark (Flink as default);
  • The generated quickstart project builds a fat jar out of the box;
  • The generated fat jar can be submitted against a standalone Flink or Spark version in order to run some standard examples using dummy data;

Semantic checks

This is an umbrella issue for cases when it is possible to write some code where it isn't clear semantically what should it do. These cases should be dealt with by either clearly defining what they do, or disallowing them. The latter means adding a semantic check to the native API and/or parallelize that detects the problem, and prints a helpful error message that explains the situation to the user.

Create initial Spark integration

Create an initial emma-spark integration module. For the initial step, let's focus on source, sink and map support.

As a starting point, use the current dev version of the emma-flink package.

Create a new package emma-spark which compiles against Spark 1.2.1 with Hadoop 2.4+ and Scala 2.11.

Implement the execute(Write) method for Spark using Flink's DataflowGenerator as a basis. Use the corresponding unit tests from CodegenTest to validate the implementation.

Bindings are incorrectly interpreted as closures by the comprehension compiler

The following example:

val algorithm = emma.parallelize {
  val numbers = for (x <- DataBag(Seq(1, 2, 3)))
    yield x match { case y => y + 1 }
}

fails to compile with the following error:

Error:(6, 36) exception during macro expansion: 
scala.reflect.macros.TypecheckException: not found: value y [...]

This is the result from emma.comprehend on the code above:

{
  val numbers = {
    import scala.reflect.runtime.universe._;
    import eu.stratosphere.emma.ir;
    val __root = ir.TempSink("numbers", ir.Map[Int, Int]("((y: Int) => ((x: Int) => x match {\n  case (y @ _) => y.+(1)\n}))", ir.Scatter(scala.collection.Seq.apply[Int](1, 2, 3))));
    engine.executeTempSink(__root, "numbers", y)
  };
  ()
}

We can see that y is interpreted as a closure, which in fact it is not.

The solution is very simple, I will make a PR shortly.

Simple fold expression fails

Trying to run a simple fold expression fails:

emma.parallelize { DataBag(1 to N).fold[Int](0, identity, _ + _) }

with a wild:

Error:(550, 32) exception during macro expansion: 
scala.reflect.macros.ParseException: illegal start of simple expression

The culprit code looks like this:

{
  (((({
    import scala.reflect.runtime.universe._;
    import eu.stratosphere.emma.ir;
    val __root = ir.TempSink("comprehension$macro$123", );
                                                      ^
    engine.executeTempSink(__root, "comprehension$macro$123")
  }): _root_.scala.Int)): _root_.scala.Int)
}

On the other hand, if we look at BaseCodegenTest we see that running a simple sum works just fine:

emma.parallelize { DataBag(1 to N).sum() }

`val (a, b) = g.key` not working when the result is used in a join

The following code fails at runtime:

    compareWithNative(emma.parallelize {
      val imdbTop100 = read(materializeResource("/cinema/imdb.csv"), new CSVInputFormat[IMDBEntry])
      val z = for (g <- imdbTop100.groupBy(x => (x.year / 10, x.rating.toInt))) yield {
        val (year, rating) = g.key
        (year, rating, g.values.count())
      }

      for {
        x <- z
        y <- imdbTop100
        if x == (y.year, y.rating, 1)
      } yield (x, y)
    })

Exception is:

scala.tools.reflect.ToolBoxError: reflective typecheck has failed: object key is not a member of package g

(This issue came from BeliefPropagation, but we can just do a workaround there for now.)

Substitution doesn't avoid capture

Correct me if I'm wrong, but I think the current implementation of the substitute primitive (both in TermSubstituter and SymbolSubstituter) isn't correct, because it substitutes by name and doesn't avoid capture (i.e. it doesn't respect scope). This can lead to problems when reusing the same variable name in UDFs.

Ideally a substitution goes like this:

  1. Extract all free variables from the substitute.
  2. Rename any clashing bindings within the target with fresh names.
  3. Recursively substitute only free matching variables within the target.

For reference see Capture-avoiding substitutions. This is crucial for proper macro sanitization and I would be really surprised if the reflection API doesn't provide any primitives for this (at least fresh name generation). Should I work on a fix or am I missing something?

Readup on basic compiler pipeline

For the next 2 weeks, please take a look at the basic compiler pipeline as implemented in the current master branch. As a starting point, take a look at the following code paths:

As reference reading, take a look at:

  1. Chapters 2-4 of our SIGMOD paper.
  2. Chapters 1-3 from Torsten Grust, Marc H. Scholl: "How to Comprehend Queries Functionally". J. Intell. Inf. Syst. 12(2-3): 191-218 (1999)". This is especially helpful in order to understand the intuition behind the "fold"-based fundamentals, as well as the suggested rewrite-based translations that implement normalization and introduce combinators.

You can play around with the code and observe the Scala macros in action by setting up a dedicated configuration for macro debugging of the "TranslationPrototype" using the following screencast as a reference point. Here is a Screenshot of my debug configuration.

Scala Macros debug configuration

Please keep track of questions with pointers in the code and the papers. We can meet later this week or next week in another session where we can discuss parts which are hard to grasp.

Once you've gained some experience, we can start assigning small tasks that touch the Macros.

Spark backend does not free resources

The ALS2 test fails when run against spark with OutOfMemory error, which can be an indicator that we might not be freeing the cached datasets early enough.

Further investigation of the memory allocation pattern with a profiling tool is needed.

Planning: Demo

Components

  • scratchbook job (empty skeleton where people can enter their own stuff)
  • pickable jobs
  • overlay with rectangles over the identified "comprehendable" terms
  • job execution graph (animated on execution)

Migrate to symbol based-enfironments

With the additions from @joroKr21 substitutions in the Macro pipeline are almost entirely symbol-based.

The remaining part which requires fixing are the usages of ValDefOps#reset method.

To get rid of those, the following needs to be done:

  • Add usedVars and definedVars method support to all IR expressions.
  • Add root to RewriteEngine and implementations.
  • Replace the usages of ScalaExpr#usedVars with Expression#usedVars.
  • Remove the obsolete ScalaExpr#usedVars method.
  • Migrate to symbol-based foldGroupFusion.
  • Replace the usages of ScalaExpr#vars with Expression#definedVars.
  • Remove obsolete 'TreeOps#freeEnv' method.
  • Remove the obsolete ScalaExpr#vars parameter.

Some observations that should probably be solved by other issues:

1. Typed Expressions resulting from Fold Macro Expansions

The expansion of the fold macros produces typed expressions, e.g.

xs.sum()

expands to

(xs.fold[Long](...): Long)

This means that filter predicates of form xs.fold(...) for example are represented in the comprehension IR as Scala nodes (since the top-node cannot be comprehended). Concidentally, this plays an impact in the reworked version of the symbol-based foldGroupFusion which assumes that all nested comprehensions are not directly inlined in the parent.

In long term, we should probably move to 'on-demand' comprehending of a term, since keeping everything in sync otherwise seems to be a huge overhead.

Planning: Nov 2015

  • Migrate to Flink 0.10 vanilla - resolved via #120 (@aalexandrov)
  • Initial cluster experiments
    • Relational (TPC-H, TPC-DS, BigBench, IMDB) on Emma / Table API / Spark SQL
    • TicTacToe
    • ALS (dataset: MovieLens)
  • Documentation & tutorials
    • wrap-up the Zeppelin fork
    • rectangles for the demo
  • Holistic optimizations
    • define algorithms and model for reasoning about fields and interesting properties
  • LA representation
    • partitioned data structures (partitioned matrix, partitioned bag) + natural transformations

Planning: Sep 2015

TODOs:

  • Check for available group IDs (org.emma???, org.peel???)
  • Run on Spark (standalone)
  • Peel Bundle with
    • TPC-H Queries (Emma-Flink, Emma-Spark, Flink Table API)
    • Triangle Enumeration
    • ALS

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.