Code Monkey home page Code Monkey logo

scalding's Introduction

Scalding

Build status Coverage Status Latest version Chat

Scalding is a Scala library that makes it easy to specify Hadoop MapReduce jobs. Scalding is built on top of Cascading, a Java library that abstracts away low-level Hadoop details. Scalding is comparable to Pig, but offers tight integration with Scala, bringing advantages of Scala to your MapReduce jobs.

Scalding Logo

Word Count

Hadoop is a distributed system for counting words. Here is how it's done in Scalding.

package com.twitter.scalding.examples

import com.twitter.scalding._
import com.twitter.scalding.source.TypedText

class WordCountJob(args: Args) extends Job(args) {
  TypedPipe.from(TextLine(args("input")))
    .flatMap { line => tokenize(line) }
    .groupBy { word => word } // use each word for a key
    .size // in each group, get the size
    .write(TypedText.tsv[(String, Long)](args("output")))

  // Split a piece of text into individual words.
  def tokenize(text: String): Array[String] = {
    // Lowercase each word and remove punctuation.
    text.toLowerCase.replaceAll("[^a-zA-Z0-9\\s]", "").split("\\s+")
  }
}

Notice that the tokenize function, which is standard Scala, integrates naturally with the rest of the MapReduce job. This is a very powerful feature of Scalding. (Compare it to the use of UDFs in Pig.)

You can find more example code under examples/. If you're interested in comparing Scalding to other languages, see our Rosetta Code page, which has several MapReduce tasks in Scalding and other frameworks (e.g., Pig and Hadoop Streaming).

Documentation and Getting Started

Please feel free to use the beautiful Scalding logo artwork anywhere.

Contact

For user questions or scalding development (internals, extending, release planning): https://groups.google.com/forum/#!forum/scalding-dev (Google search also works as a first step)

In the remote possibility that there exist bugs in this code, please report them to: https://github.com/twitter/scalding/issues

Follow @Scalding on Twitter for updates.

Chat: Gitter

Get Involved + Code of Conduct

Pull requests and bug reports are always welcome!

We use a lightweight form of project governence inspired by the one used by Apache projects. Please see Contributing and Committership for our code of conduct and our pull request review process. The TL;DR is send us a pull request, iterate on the feedback + discussion, and get a +1 from a Committer in order to get your PR accepted.

The current list of active committers (who can +1 a pull request) can be found here: Committers

A list of contributors to the project can be found here: Contributors

Building

There is a script (called sbt) in the root that loads the correct sbt version to build:

  1. ./sbt update (takes 2 minutes or more)
  2. ./sbt test
  3. ./sbt assembly (needed to make the jar used by the scald.rb script)

The test suite takes a while to run. When you're in sbt, here's a shortcut to run just one test:

> test-only com.twitter.scalding.FileSourceTest

Please refer to FAQ page if you encounter problems when using sbt.

We use Github Actions to verify the build: Build Status

We use Coveralls for code coverage results: Coverage Status

Scalding modules are available from maven central.

The current groupid and version for all modules is, respectively, "com.twitter" and 0.17.2.

Current published artifacts are

  • scalding-core_2.11, scalding-core_2.12
  • scalding-args_2.11, scalding-args_2.12
  • scalding-date_2.11, scalding-date_2.12
  • scalding-commons_2.11, scalding-commons_2.12
  • scalding-avro_2.11, scalding-avro_2.12
  • scalding-parquet_2.11, scalding-parquet_2.12
  • scalding-repl_2.11, scalding-repl_2.12

The suffix denotes the scala version.

Adopters

  • Ebay
  • Etsy
  • Sharethrough
  • Snowplow Analytics
  • Soundcloud
  • Twitter

To see a full list of users or to add yourself, see the wiki

Authors:

Thanks for assistance and contributions:

A full list of contributors can be found on GitHub.

License

Copyright 2016 Twitter, Inc.

Licensed under the Apache License, Version 2.0

scalding's People

Contributors

adampingel avatar alexanderdean avatar antwnis avatar arkajit avatar avi-stripe avatar avibryant avatar azymnis avatar benpence avatar bgreenlee avatar bholt avatar daniel-sudz avatar dieu avatar fs111 avatar gerashegalov avatar ianoc avatar ianoc-stripe avatar isnotinvain avatar jcoveney avatar johnynek avatar koertkuipers avatar ljrittle avatar mansurashraf avatar oeddyo avatar oscar-stripe avatar reconditesea avatar rubanm avatar sid-kap avatar sriramkrishnan avatar sritchie avatar ttim avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

scalding's Issues

Cannot run sbt update because sbt-assembly is not found

Hi,

I'm trying to install scalding in Mac OS X from Leopard 10.6.8. I have installed sbt using macports and got version 0.11 and checked out scalding version using 'git clone https://github.com/twitter/scalding.git'. After that I enter to my scalding directory doing 'cd scalding/' and run 'sbt update'.

This is the output I get:

$ sudo sbt update
[info] Loading project definition from /Users/raimonbosch/Desktop/scalding/project
[info] Updating {file:/Users/raimonbosch/Desktop/scalding/project/}default-3e56f1...
[warn] module not found: com.eed3si9n#sbt-assembly;0.7.3
[warn] ==== typesafe-ivy-releases: tried
[warn] http://repo.typesafe.com/typesafe/ivy-releases/com.eed3si9n/sbt-assembly/scala_2.9.1/sbt_0.11.0/0.7.3/ivys/ivy.xml
[warn] -- artifact com.eed3si9n#sbt-assembly;0.7.3!sbt-assembly.jar:
[warn] http://repo.typesafe.com/typesafe/ivy-releases/com.eed3si9n/sbt-assembly/scala_2.9.1/sbt_0.11.0/0.7.3/jars/sbt-assembly.jar
[warn] ==== local: tried
[warn] /Users/raimonbosch/.ivy2/local/com.eed3si9n/sbt-assembly/scala_2.9.1/sbt_0.11.0/0.7.3/ivys/ivy.xml
[warn] -- artifact com.eed3si9n#sbt-assembly;0.7.3!sbt-assembly.jar:
[warn] /Users/raimonbosch/.ivy2/local/com.eed3si9n/sbt-assembly/scala_2.9.1/sbt_0.11.0/0.7.3/jars/sbt-assembly.jar
[warn] ==== sbt-plugin-releases: tried
[warn] http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases/com.eed3si9n/sbt-assembly/scala_2.9.1/sbt_0.11.0/0.7.3/ivys/ivy.xml
[warn] -- artifact com.eed3si9n#sbt-assembly;0.7.3!sbt-assembly.jar:
[warn] http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases/com.eed3si9n/sbt-assembly/scala_2.9.1/sbt_0.11.0/0.7.3/jars/sbt-assembly.jar
[warn] ==== public: tried
[warn] http://repo1.maven.org/maven2/com/eed3si9n/sbt-assembly_2.9.1_0.11.0/0.7.3/sbt-assembly-0.7.3.pom
[warn] -- artifact com.eed3si9n#sbt-assembly;0.7.3!sbt-assembly.jar:
[warn] http://repo1.maven.org/maven2/com/eed3si9n/sbt-assembly_2.9.1_0.11.0/0.7.3/sbt-assembly-0.7.3.jar
[warn] ==== Scala-Tools Maven2 Repository: tried
[warn] http://scala-tools.org/repo-releases/com/eed3si9n/sbt-assembly_2.9.1_0.11.0/0.7.3/sbt-assembly-0.7.3.pom
[warn] -- artifact com.eed3si9n#sbt-assembly;0.7.3!sbt-assembly.jar:
[warn] http://scala-tools.org/repo-releases/com/eed3si9n/sbt-assembly_2.9.1_0.11.0/0.7.3/sbt-assembly-0.7.3.jar
[warn] ::::::::::::::::::::::::::::::::::::::::::::::
[warn] :: UNRESOLVED DEPENDENCIES ::
[warn] ::::::::::::::::::::::::::::::::::::::::::::::
[warn] :: com.eed3si9n#sbt-assembly;0.7.3: not found
[warn] ::::::::::::::::::::::::::::::::::::::::::::::
[warn]
[warn] Note: Some unresolved dependencies have extra attributes. Check that these dependencies exist with the requested attributes.
[warn] com.eed3si9n:sbt-assembly:0.7.3 (sbtVersion=0.11.0, scalaVersion=2.9.1)
[warn]
[error] {file:$HOME/scalding/project/}default-3e56f1/*:update: sbt.ResolveException: unresolved dependency: com.eed3si9n#sbt-assembly;0.7.3: not found

What more do I need to do to install sbt-assembly?

joinWithLarger is broken when used with Left/Right/Custom joiner

https://github.com/twitter/scalding/blob/develop/src/main/scala/com/twitter/scalding/JoinAlgorithms.scala#L162

You need to at least reverse. I tend to think the solution here is to throw if you get something other than Inner/Outer/Left/Right, and to reverse on Left/Right.

Something like:

joiner match {
  case outer: OuterJoin => joiner
  case inner: InnerJoin => joiner
  case left: LeftJoin => new RightJoin
  case right: RightJoin => new LeftJoin
  case _ => sys.error("Join with Larger cannot handle custom joins")
}

unable to run sbt assembly

When I run sbt assembly, after passing all tests it throws the following error. It was earlier throwing similar error which I resolved by adding the jasper jar files to excluded jar files in the build.sbt file. Surely I am missing something basic here.

[error] {file:/home/workshop/scalding/}default-cc29ad/*:assembly: deduplicate: different file contents found in the following:
[error] /home/.ivy2/cache/cascading.kryo/cascading.kryo/jars/cascading.kryo-0.3.1.jar:project.clj
[error] /home/.ivy2/cache/com.twitter/kryo/jars/kryo-2.04.jar:project.clj
[error] /home/.ivy2/cache/com.esotericsoftware.reflectasm/reflectasm/jars/reflectasm-1.02.jar:project.clj
[error] /home/.ivy2/cache/com.twitter/meat-locker/jars/meat-locker-0.2.1.jar:project.clj
[error] /home/.ivy2/cache/com.twitter/maple/jars/maple-0.1.7.jar:project.clj
[error] Total time: 328 s, completed 14-May-2012 15:01:41

Also sbt assembly does not work with version 0.7.3, it cannot find the dependencies. I had to change it to 0.8.1 in plugins.sbt file.

Thanks in advance.

Use chill.ClosureCleaner on all functions

Scala captures a reference to the enclosing object always, even if the function doesn't reference that object.

When the job is serialized, this can dramatically increase the size, and cause some things to need to be serializable that don't really need to be.

A fix for this is to go into the few Cascading Operations we have, and call ClosureCleaner(fn) on all the functions passed into constructors.

This could potentially dramatically reduce this size of jobConfs and speed up submission times. Also, it could make it easier for users by reducing the number of cases where they need to hack around weird serialization errors.

.groupAll/.group may return one row per group or all rows

I realize this may be a feature.

if you do

foo.groupAll { _.sortBy('bar)}

you get all the rows but if you do

foo.groupAll { _.size}

you get 1 row

So I'm guessing there is some magic that looks to see if a field is added or not and decides what to do. But then you end up with surprising behavior where

foo.group('bar) { identity }
is just all of the rows of foo even though it looks like you want just the possible values of bar in foo.

Perhaps .group and .groupAll should have an optional parameter rows=('all | 'groups | 'magic)? If rows='all and you add a field you get the original Pipe with the grouped Pipe joined to it.

Add API for custom joiners

We should be able have an API that takes (Iterator[T], Iterator[U], ...) => Iterator[V]

This allows us to do mapping/reducing and joining all at the same time.

scalding + maven project + cdh3u2

Hi,

This is not an "issue" actually, so excuse me if this is not the place to suggest this ...

I have just succeeded running a scalding job on my CDH3U2 cluster.
I really couldn't find a 'walk through' describing clearly what needs to be done to facilitate this, just a few helpful responses on cascading-user mailing list.

Anyway, I have created a few gist's that include my project's pom, maven-assebly-plugin xml and a modified build.sbt .

If the kind owners of Scalding wish, I would be more than happy to contribute them and the protocol to the general public in the form of a wiki or something.

The setup protocol is https://gist.github.com/3cc8399ebdfb0c7ca2bb

I am not really sure if that protocol is any good

All the best,
Amit

Diagnosing/tuning mapper memory blowout

Using a groupBy / sortWithTake, it seems like the map-side partial aggregation is running of of memory. Not sure what the easiest way is to diagnose or tune this with scalding.

Make flatMap functions accept TraversableOnce

TraversableOnce is a superclass of both Iterable and Iterator. Since we only iterate through once, we could accept that and deal with some objects that can't easily restart (such as iterators from disk etc...)

Shouldn't be a source breaking change.

Kryo doesn't handle variable length arguments (for Strings and Symbols)

diff --git a/src/test/scala/com/twitter/scalding/KryoTest.scala b/src/test/scala/com/twitter/scalding/KryoTest.
index 7a9f3b2..b52a736 100644
--- a/src/test/scala/com/twitter/scalding/KryoTest.scala
+++ b/src/test/scala/com/twitter/scalding/KryoTest.scala
@@ -20,6 +20,8 @@ case class TestCaseClassForSerialization(x : String, y : Int)
case class TestValMap(val map : Map[String,Double])
case class TestValHashMap(val map : HashMap[String,Double])

+case class TestVarArgs(val strs : String*)
+
class KryoTest extends Specification {

noDetailedDiffs() //Fixes issue for scala 2.9
@@ -76,6 +78,7 @@ class KryoTest extends Specification {
Vector(1,2,3,4,5),
TestValMap(null),
Some("junk"),

  •                  TestVarArgs("abc"),
                   'hai)
     .asInstanceOf[List[AnyRef]]
    
    serializationRT(test) must be_==(test)

test-only KryoTest
[info] Compiling 1 Scala source to /Users/mikeg/workspace/scalding-mng/scalding/target/scala-2.9.2/test-classes...
[error] x KryoSerializers and KryoDeserializers should
[error] x round trip any non-array object
[error] Encountered unregistered class ID: 95
[error] Serialization trace:
[error] array (scala.collection.mutable.WrappedArray$ofRef)
[error] strs (com.twitter.scalding.TestVarArgs) (DefaultClassResolver.java:113)
[error] com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:113)
[error] com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:596)
[error] com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:707)
[error] com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
[error] com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
[error] com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:692)
[error] com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:521)
[error] com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
[error] com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:637)
[error] com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:515)
[error] com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
[error] com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:615)
[error] cascading.kryo.KryoDeserializer.deserialize(KryoDeserializer.java:37)

Support automatic switching from tiny to smaller based on filesize.

A hash-join on M mappers of a larger file sized L with smaller file sized S, costs M*S bytes (the smaller is replicated to all the mapper).

By contrast, a CoGroup sends all the data over to new nodes, so it takes, L + S bytes. Clearly, we only want to do a hashjoin if M*S < L + S which is to say, S < L/(M-1). It would be really nice to have cascading or scalding automatically switch in this case.

For instance, if you have 10,000 mappers (as is sometimes the case), using a hashJoin seems like a loss in all but the very smallest of cases. Using a hashjoin after a few reduce phases, where the number of tasks has been constrained to < 100 or so, then probably it is more likely a win.

It seems like adding support for this in cascading would be ideal. In this case, you specify an "AutoJoin" on two pipes with either left or inner join mode, then Cascading should pick which is better to use based on the sizes just before it schedules the actual join (i.e. once L and S are already materialized, or accurately estimated).

Fix broken build

Looks like the latest changes to algebird broke the scalding build, since there is no HLLInstance anymore.

The weird thing is that sbt test works for me locally on the master branch, which suggests that something weird is going on with the github sbt plugin.

Skew Join (someone please implement)

Here is what we do:

  1. sample the left and right side way down (like 1/1000, but probably a parameter of the skewJoin).
  2. do a join of the left and right (maybe hashjoin if inner join is okay).
  3. esimate the number of items on the left and right for each key.
  4. set a replication factor for each key for each side, such that every (key, replication) bucket has about the same number of items.
  5. leftJoinWithTiny this replication factor for each key, if there is no replication factor choose 1.
  6. adapt blockJoin to read the key replication from the pipe, so that we can use different replications for each key.
  7. update Matrix API to have skew hinting, and if it is set, use skewJoin in matrix product.
  8. Profit.

Error compiling source with scald.rb with 2.9.1

Hi, near the top of scald.rb you have the code

COMPILE_CMD="java -cp project/boot/scala-2.8.1/lib/scala-library.jar:project/boot/scala-2.8.1/lib/scala-compiler.jar -Dscala.home=project/boot/scala-2.8.1/lib/ scala.tools.nsc.Main"

But this does not work, at least when the project was built with 2.9.1.

$ scripts/scald.rb --local tutorial/Tutorial1.scala
compiling tutorial/Tutorial1.scala
Exception in thread "main" java.lang.NoClassDefFoundError: scala/tools/nsc/Main
Caused by: java.lang.ClassNotFoundException: scala.tools.nsc.Main
at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
Could not find the main class: scala.tools.nsc.Main. Program will exit.

I have scala 2.9.1 installed so I changed it to

COMPILE_CMD="scalac"

Voila!

$ scripts/scald.rb --local tutorial/Tutorial1.scala
compiling tutorial/Tutorial1.scala
12/01/12 14:29:49 INFO util.Version: Concurrent, Inc - Cascading 2.0.0 [hadoop-0.20.2+]
12/01/12 14:29:49 INFO flow.Flow: [] starting
12/01/12 14:29:49 INFO flow.Flow: [] source: FileTap["TextLine[['num', 'line']->[ALL]]"]["tutorial/data/hello.txt"]"]
12/01/12 14:29:49 INFO flow.Flow: [] sink: FileTap["TextLine[['num', 'line']->[ALL]]"]["tutorial/data/output1.txt"]"]
12/01/12 14:29:49 INFO flow.Flow: [] parallel execution is enabled: true
12/01/12 14:29:49 INFO flow.Flow: [] starting jobs: 1
12/01/12 14:29:49 INFO flow.Flow: [] allocating threads: 1
12/01/12 14:29:49 INFO planner.FlowStep: [] starting step: (1/1) local

sbt test fails with 7 failures

I am running scalding on a windows machine. After installing scala, sbt (version 0.11.3) and cloning scalding I am not able to run "sbt test" successfully. Out of 145 tests, 7 of them fail. Below is part of what I am seeing in command prompt from where I am running sbt-test command. I tried searching if anyone else faced this problem but could not get any threads. Any help will be greatly appreciated.bt

[error] scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized
.scala:61)
[error] scala.collection.immutable.List.foreach(List.scala:45)
[error] scala.collection.TraversableLike$class.flatMap(TraversableLike.scala
:227)
[error] scala.collection.immutable.List.flatMap(List.scala:45)
[error] org.specs.specification.ExampleStructure$class.failures(ExampleStruc
ture.scala:64)
[error] org.specs.specification.Examples.failures(Examples.scala:52)
[error] org.specs.specification.Examples.failures(Examples.scala:52)
[error] org.specs.execute.HasResults$class.issues(HasResults.scala:63)
[error] org.specs.specification.Examples.issues(Examples.scala:52)
[error] org.specs.execute.HasResults$class.isOk(HasResults.scala:69)
[error] org.specs.specification.Examples.isOk(Examples.scala:52)
[error] org.specs.runner.NotifierRunner.reportSystem(NotifierRunner.scala:79
)
[error] org.specs.runner.NotifierRunner$$anonfun$reportASpecification$3.appl
y(NotifierRunner.scala:70)
[error] org.specs.runner.NotifierRunner$$anonfun$reportASpecification$3.appl
y(NotifierRunner.scala:66)
[error] scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized
.scala:61)
[error] scala.collection.immutable.List.foreach(List.scala:45)
[error] org.specs.runner.NotifierRunner.reportASpecification(NotifierRunner.
scala:66)
[error] org.specs.runner.NotifierRunner.report(NotifierRunner.scala:59)
[error] org.specs.runner.NotifierRunner.report(NotifierRunner.scala:45)
[error] org.specs.runner.Reporter$class.reportSpecs(Reporter.scala:195)
[error] org.specs.runner.NotifierRunner.reportSpecs(NotifierRunner.scala:45)

[error] org.specs.runner.TestInterfaceRunner$$anonfun$run$3.apply(TestInterf
aceRunner.scala:70)
[error] org.specs.runner.TestInterfaceRunner$$anonfun$run$3.apply(TestInterf
aceRunner.scala:70)
[error] scala.Option.map(Option.scala:129)
[error] org.specs.runner.TestInterfaceRunner.run(TestInterfaceRunner.scala:7
0)
[error] org.specs.runner.TestInterfaceRunner.run(TestInterfaceRunner.scala:6
5)
[error] sbt.TestRunner.delegateRun(TestFramework.scala:62)
[error] sbt.TestRunner.run(TestFramework.scala:56)
[error] sbt.TestRunner.runTest$1(TestFramework.scala:76)
[error] sbt.TestRunner.run(TestFramework.scala:85)
[error] sbt.TestFramework$$anonfun$6$$anonfun$apply$8$$anonfun$7$$anonfun$ap
ply$9.apply(TestFramework.scala:184)
[error] sbt.TestFramework$$anonfun$6$$anonfun$apply$8$$anonfun$7$$anonfun$ap
ply$9.apply(TestFramework.scala:184)
[error] sbt.TestFramework$.sbt$TestFramework$$withContextLoader(TestFramewor
k.scala:196)
[error] sbt.TestFramework$$anonfun$6$$anonfun$apply$8$$anonfun$7.apply(TestF
ramework.scala:184)
[error] sbt.TestFramework$$anonfun$6$$anonfun$apply$8$$anonfun$7.apply(TestF
ramework.scala:184)
[error] sbt.Tests$$anonfun$makeSerial$1$$anonfun$apply$8.apply(Tests.scala:1
15)
[error] sbt.Tests$$anonfun$makeSerial$1$$anonfun$apply$8.apply(Tests.scala:1
15)
[error] scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLik
e.scala:194)
[error] scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLik
e.scala:194)
[error] scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized
.scala:59)
[error] scala.collection.immutable.List.foreach(List.scala:45)
[error] scala.collection.TraversableLike$class.map(TraversableLike.scala:194
)
[error] scala.collection.immutable.List.map(List.scala:45)
[error] sbt.Tests$$anonfun$makeSerial$1.apply(Tests.scala:115)
[error] sbt.Tests$$anonfun$makeSerial$1.apply(Tests.scala:115)
[error] sbt.std.Transform$$anon$3$$anonfun$apply$2.apply(System.scala:47)
[error] sbt.std.Transform$$anon$3$$anonfun$apply$2.apply(System.scala:47)
[error] sbt.std.Transform$$anon$5.work(System.scala:67)
[error] sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:2
21)
[error] sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:2
21)
[error] sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:18)
[error] sbt.Execute.work(Execute.scala:227)
[error] sbt.Execute$$anonfun$submit$1.apply(Execute.scala:221)
[error] sbt.Execute$$anonfun$submit$1.apply(Execute.scala:221)
[error] sbt.CompletionService$$anon$1$$anon$2.call(CompletionService.scala:2
6)
[error] java.util.concurrent.FutureTask$Sync.innerRun(Unknown Source)
[error] java.util.concurrent.FutureTask.run(Unknown Source)
[error] java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
[error] java.util.concurrent.FutureTask$Sync.innerRun(Unknown Source)
[error] java.util.concurrent.FutureTask.run(Unknown Source)
[error] java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Sourc
e)
[error] java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
[error] java.lang.Thread.run(Unknown Source)
[info] + KryoSerializers and KryoDeserializers should
[info] + round trip any non-array object
[info] + handle arrays
[info] + handle scala singletons
[info] + handle Date, RichDate and DateRange
[info] + Serialize a giant list
12/05/20 12:19:09 INFO flow.Flow: [com.twitter.scalding.T...] starting
12/05/20 12:19:09 INFO flow.Flow: [com.twitter.scalding.T...] source: com.twitt
er.scalding.MemoryTap@97b936
12/05/20 12:19:09 INFO flow.Flow: [com.twitter.scalding.T...] source: com.twitt
er.scalding.MemoryTap@e2d51a
12/05/20 12:19:09 INFO flow.Flow: [com.twitter.scalding.T...] sink: com.twitter
.scalding.MemoryTap@e5455c
12/05/20 12:19:09 INFO flow.Flow: [com.twitter.scalding.T...] parallel executio
n is enabled: true
12/05/20 12:19:09 INFO flow.Flow: [com.twitter.scalding.T...] starting jobs: 1
12/05/20 12:19:09 INFO flow.Flow: [com.twitter.scalding.T...] allocating thread
s: 1
12/05/20 12:19:09 INFO flow.FlowStep: [com.twitter.scalding.T...] starting step:
(1/1) local
12/05/20 12:19:09 INFO util.HadoopUtil: using default application jar, may cause
class not found exceptions on the cluster
12/05/20 12:19:09 INFO planner.HadoopPlanner: using application jar: /C:/Documen
ts and Settings/vjain/.ivy2/cache/cascading/cascading-hadoop/jars/cascading-hado
op-2.0.0-wip-291.jar
12/05/20 12:19:09 INFO hadoop.TupleSerialization: using default comparator: com.
twitter.scalding.IntegralComparator
12/05/20 12:19:09 INFO hadoop.TupleSerialization: using default comparator: com.
twitter.scalding.IntegralComparator
12/05/20 12:19:09 INFO flow.Flow: [com.twitter.scalding.T...] starting
12/05/20 12:19:09 INFO flow.Flow: [com.twitter.scalding.T...] source: com.twitt
er.maple.tap.MemorySourceTap@3e0
12/05/20 12:19:09 INFO flow.Flow: [com.twitter.scalding.T...] source: com.twitt
er.maple.tap.MemorySourceTap@3e0
12/05/20 12:19:09 INFO flow.Flow: [com.twitter.scalding.T...] sink: Hfs["TextDe
limited[[UNKNOWN]->[ALL]]"]["/tmp/scalding/com.twitter.scalding.Tsv0"]"]
12/05/20 12:19:09 INFO flow.Flow: [com.twitter.scalding.T...] parallel executio
n is enabled: false
12/05/20 12:19:09 INFO flow.Flow: [com.twitter.scalding.T...] starting jobs: 1
12/05/20 12:19:09 INFO flow.Flow: [com.twitter.scalding.T...] allocating thread
s: 1
12/05/20 12:19:09 INFO flow.FlowStep: [com.twitter.scalding.T...] starting step:
(1/1) ...tDelimited[[UNKNOWN]->[ALL]]"]["/tmp/scalding/com.twitter.scalding.Tsv
0"]"]
12/05/20 12:19:09 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with proces
sName=JobTracker, sessionId= - already initialized
12/05/20 12:19:09 INFO flow.Flow: [com.twitter.scalding.T...] stopping all jobs
12/05/20 12:19:09 INFO flow.FlowStep: [com.twitter.scalding.T...] stopping: (1/1
) ...tDelimited[[UNKNOWN]->[ALL]]"]["/tmp/scalding/com.twitter.scalding.Tsv0"]"]

12/05/20 12:19:09 INFO flow.Flow: [com.twitter.scalding.T...] stopped all jobs
[error] x A TinyJoinJob should
[error] x system error
[error] unhandled exception (BaseFlow.java:796)
[error] cascading.flow.BaseFlow.complete(BaseFlow.java:796)
[error] com.twitter.scalding.JobTest.runJob(JobTest.scala:103)
[error] com.twitter.scalding.JobTest.runHadoop(JobTest.scala:75)
[error] com.twitter.scalding.TinyJoinTest$$anonfun$5.apply$mcV$sp(CoreTest.sca
la:238)
[error] com.twitter.scalding.TinyJoinTest$$anonfun$5.apply(CoreTest.scala:217)

[error] com.twitter.scalding.TinyJoinTest$$anonfun$5.apply(CoreTest.scala:217)

[error] org.specs.specification.LifeCycle$class.withCurrent(ExampleLifeCycle.s
cala:60)
[error] org.specs.specification.Examples.withCurrent(Examples.scala:52)
[error] org.specs.specification.Examples$$anonfun$specifyExample$1.apply(Examp
les.scala:111)
[error] org.specs.specification.Examples$$anonfun$specifyExample$1.apply(Examp
les.scala:111)
[error] org.specs.specification.ExampleExecution$$anonfun$3$$anonfun$apply$5.a
pply(ExampleLifeCycle.scala:213)
[error] scala.Option.getOrElse(Option.scala:104)
[error] org.specs.specification.LifeCycle$class.executeExpectations(ExampleLif
eCycle.scala:84)
[error] org.specs.specification.BaseSpecification.executeExpectations(BaseSpec
ification.scala:58)
[error] org.specs.specification.ExampleContext$$anonfun$executeExpectations$1.
apply(ExampleContext.scala:78)
[error] org.specs.specification.ExampleContext$$anonfun$executeExpectations$1.
apply(ExampleContext.scala:78)
[error] scala.Option.map(Option.scala:129)
[error] org.specs.specification.ExampleContext$class.executeExpectations(Examp
leContext.scala:78)
[error] org.specs.specification.Examples.executeExpectations(Examples.scala:52
)
[error] org.specs.specification.ExampleExecution$$anonfun$3.apply(ExampleLifeC
ycle.scala:213)
[error] org.specs.specification.ExampleExecution$$anonfun$3.apply(ExampleLifeC
ycle.scala:192)
[error] org.specs.specification.ExampleExecution$$anonfun$2.apply(ExampleLifeC
ycle.scala:175)
[error] org.specs.specification.ExampleExecution.execute(ExampleLifeCycle.scal
a:246)
[error] org.specs.specification.SpecificationExecutor$$anonfun$executeExample$
3.apply(SpecificationExecutor.scala:70)
[error] org.specs.specification.SpecificationExecutor$$anonfun$executeExample$
3.apply(SpecificationExecutor.scala:70)
[error] scala.Option.map(Option.scala:129)
[error] org.specs.specification.SpecificationExecutor$class.executeExample(Spe
cificationExecutor.scala:70)
[error] org.specs.specification.BaseSpecification.executeExample(BaseSpecifica
tion.scala:58)
[error] org.specs.specification.BaseSpecification.executeExample(BaseSpecifica
tion.scala:58)
[error] org.specs.specification.Examples$$anonfun$executeExamples$1.apply(Exam
ples.scala:80)
[error] org.specs.specification.Examples$$anonfun$executeExamples$1.apply(Exam
ples.scala:80)
[error] scala.Option.map(Option.scala:129)
[error] org.specs.specification.Examples.executeExamples(Examples.scala:80)
[error] org.specs.specification.ExampleStructure$class.examples(ExampleStructu
re.scala:77)
[error] org.specs.specification.Examples.examples(Examples.scala:52)
[error] org.specs.specification.BaseSpecification$$anonfun$firstLevelExamplesN
b$2.apply(BaseSpecification.scala:316)
[error] org.specs.specification.BaseSpecification$$anonfun$firstLevelExamplesN
b$2.apply(BaseSpecification.scala:316)
[error] scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.
scala:123)
[error] scala.collection.immutable.List.foldLeft(List.scala:45)
[error] org.specs.specification.BaseSpecification.firstLevelExamplesNb(BaseSpe
cification.scala:316)
[error] org.specs.runner.NotifierRunner$$anonfun$report$1.apply(NotifierRunner
.scala:58)
[error] org.specs.runner.NotifierRunner$$anonfun$report$1.apply(NotifierRunner
.scala:58)
[error] scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized
.scala:34)
[error] scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:35)
[error] org.specs.runner.NotifierRunner.report(NotifierRunner.scala:58)
[error] org.specs.runner.NotifierRunner.report(NotifierRunner.scala:45)
[error] org.specs.runner.Reporter$class.reportSpecs(Reporter.scala:195)
[error] org.specs.runner.NotifierRunner.reportSpecs(NotifierRunner.scala:45)
[error] org.specs.runner.TestInterfaceRunner$$anonfun$run$3.apply(TestInterfac
eRunner.scala:70)
[error] org.specs.runner.TestInterfaceRunner$$anonfun$run$3.apply(TestInterfac
eRunner.scala:70)
[error] scala.Option.map(Option.scala:129)
[error] org.specs.runner.TestInterfaceRunner.run(TestInterfaceRunner.scala:70)

[error] org.specs.runner.TestInterfaceRunner.run(TestInterfaceRunner.scala:65)

[error] sbt.TestRunner.delegateRun(TestFramework.scala:62)
[error] sbt.TestRunner.run(TestFramework.scala:56)
[error] sbt.TestRunner.runTest$1(TestFramework.scala:76)
[error] sbt.TestRunner.run(TestFramework.scala:85)
[error] sbt.TestFramework$$anonfun$6$$anonfun$apply$8$$anonfun$7$$anonfun$appl
y$9.apply(TestFramework.scala:184)
[error] sbt.TestFramework$$anonfun$6$$anonfun$apply$8$$anonfun$7$$anonfun$appl
y$9.apply(TestFramework.scala:184)
[error] sbt.TestFramework$.sbt$TestFramework$$withContextLoader(TestFramework.
scala:196)
[error] sbt.TestFramework$$anonfun$6$$anonfun$apply$8$$anonfun$7.apply(TestFra
mework.scala:184)
[error] sbt.TestFramework$$anonfun$6$$anonfun$apply$8$$anonfun$7.apply(TestFra
mework.scala:184)
[error] sbt.Tests$$anonfun$makeSerial$1$$anonfun$apply$8.apply(Tests.scala:115
)
[error] sbt.Tests$$anonfun$makeSerial$1$$anonfun$apply$8.apply(Tests.scala:115
)
[error] scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.
scala:194)
[error] scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.
scala:194)
[error] scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.s
cala:59)
[error] scala.collection.immutable.List.foreach(List.scala:45)
[error] scala.collection.TraversableLike$class.map(TraversableLike.scala:194)
[error] scala.collection.immutable.List.map(List.scala:45)
[error] sbt.Tests$$anonfun$makeSerial$1.apply(Tests.scala:115)
[error] sbt.Tests$$anonfun$makeSerial$1.apply(Tests.scala:115)
[error] sbt.std.Transform$$anon$3$$anonfun$apply$2.apply(System.scala:47)
[error] sbt.std.Transform$$anon$3$$anonfun$apply$2.apply(System.scala:47)
[error] sbt.std.Transform$$anon$5.work(System.scala:67)
[error] sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:221
)
[error] sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:221
)
[error] sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:18)
[error] sbt.Execute.work(Execute.scala:227)
[error] sbt.Execute$$anonfun$submit$1.apply(Execute.scala:221)
[error] sbt.Execute$$anonfun$submit$1.apply(Execute.scala:221)
[error] sbt.CompletionService$$anon$1$$anon$2.call(CompletionService.scala:26)

[error] java.util.concurrent.FutureTask$Sync.innerRun(Unknown Source)
[error] java.util.concurrent.FutureTask.run(Unknown Source)
[error] java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
[error] java.util.concurrent.FutureTask$Sync.innerRun(Unknown Source)
[error] java.util.concurrent.FutureTask.run(Unknown Source)
[error] java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source)

[error] java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
[error] java.lang.Thread.run(Unknown Source)
[info] + join tuples with the same key
[error] Error: Total 145, Failed 7, Errors 7, Passed 131, Skipped 0
[error] Error during tests:
[error] com.twitter.scalding.WeightedPageRankSpec
[error] com.twitter.scalding.CrossTest
[error] com.twitter.scalding.LeftJoinTest
[error] com.twitter.scalding.TinyThenSmallJoinTest
[error] com.twitter.scalding.NumberJoinTest
[error] com.twitter.scalding.TinyJoinTest
[error] com.twitter.scalding.IterableSourceTest
[error] {file:/C:/Vikalp/Personal/LearnScala/scalding/}default-0e9944/test:test:
Tests unsuccessful
[error] Total time: 139 s, completed May 20, 2012 12:19:10 PM

Check for null flowDef

If someone does a write/read in a map/reduce operation, the flowDef will be null (because it is marked as transient).

We should check in Source#read/write to see if the flowDef is null, and if so, give a better error than NPE. It can help someone debug their situation faster.

Dependencies problem for algebird/chill behind a firewall.

Build for 0.8.1 fails behind a firewall which blocks git:// uri scheme due to dependencies on algebird & chill.

Changing the schemes to http:// didn't work either, since sbt 0.11.3 tries to download and extract, instead of git retrieval.

HBase support, anyone interested ?

Hi Scalding,

I was wondering if something like:


class TestHBaseTable(args: Args) extends Job(args) {
  val table = new HBaseTable("hostname", "tablename", "columnForKey", Array("familyName"), Array("columnForData"))
  table.read
  .map[Array[Byte], Double]('columnForData -> 'times3) { _ * 3 }
  .project('times3)
  .write(Tsv("hdfs://localhost:8020/user/hdfs/data/out/debug/HbaseTable")) 
}

... is of interest.
I made a very naive implementation of Source I called HBaseSource (with just createTap that uses Maple's HBaseTap) and trait HBaseSchemeSource that inits the Maple HBaseScheme. Just a few lines of code and 2 implicits i added in FieldConversions for the Array[Byte] Hbase uses.

Let me know if you want me to add my implementation to Scalding through the fork-pull thing.

Amit

Tests fail with OOM error

On Mac OS X 10.7.3, with scala 2.9.1.final. Clean checkout of scalding master. sbt update works fine, and scalding appears to build fine, but fails with an OutOfMemoryError when running the tests. Tail of output as follows:

[info] + A ToListJob should
[info] + must have the right number of lines
[info] + must get the result right
[info] + A NullListJob should
[info] + must have the right number of lines
[info] + must return an empty list for null key
sbt appears to be exiting abnormally.
The log file for this session is at /var/folders/zl/3gmx0mcj3vx55l99bczq9c640000gn/T/sbt3794837046744593738.log
java.lang.OutOfMemoryError: PermGen space
Error during sbt execution: java.lang.OutOfMemoryError: PermGen space

Stateful operation is probably not safe, and should be marked as such

http://docs.cascading.org/cascading/1.2/javadoc/cascading/operation/BaseOperation.html#isSafe()

By default, cascading (and hence scalding) assumes you can rerun the same map operation over and over (that is, idempotency is assumed). For a stateful operation, you might be talking to a server and doing some non-idempotent operation.

We should either make it clearer in the docs that idempotency is assumed and/or allow the programmer to signal to cascading that it is not (and we set isSafe == false).

I prefer to tell everyone that scalding addresses the idempotent case, and if you want to go beyond, write your own operation and use .each (and you take responsibility). I don't like encouraging non-idempotent operations.

Error when outputting a local job to a filename without a slash

I can run scripts/scald.rb --local tutorial/Tutorial0.scala fine as is (from the root of the scalding/ repo). However, if I change

val output = TextLine("tutorial/data/output0.txt")

to

val output = TextLine("output0.txt")

I get this error:

compiling tutorial/Tutorial0.scala
12/01/13 10:49:38 INFO util.Version: Concurrent, Inc - Cascading 2.0.0 [hadoop-0.20.2+]
12/01/13 10:49:38 INFO flow.Flow: [] starting
12/01/13 10:49:38 INFO flow.Flow: [] source: FileTap["TextLine[['num', 'line']->[ALL]]"]["tutorial/data/hello.txt"]"]
12/01/13 10:49:38 INFO flow.Flow: [] sink: FileTap["TextLine[['num', 'line']->[ALL]]"]["output0.txt"]"]
12/01/13 10:49:38 INFO flow.Flow: [] parallel execution is enabled: true
12/01/13 10:49:38 INFO flow.Flow: [] starting jobs: 1
12/01/13 10:49:38 INFO flow.Flow: [] allocating threads: 1
12/01/13 10:49:38 INFO planner.FlowStep: [] starting step: (1/1) local
12/01/13 10:49:38 ERROR local.LocalStepRunner: unable to prepare operation graph
java.lang.NullPointerException
at cascading.tap.local.FileTap.openForWrite(FileTap.java:108)
at cascading.tap.local.FileTap.openForWrite(FileTap.java:48)
at cascading.flow.stream.SinkStage.prepare(SinkStage.java:57)
at cascading.flow.stream.StreamGraph.prepare(StreamGraph.java:147)
at cascading.flow.local.LocalStepRunner.call(LocalStepRunner.java:84)
at cascading.flow.local.LocalStepRunner.call(LocalStepRunner.java:41)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:680)
12/01/13 10:49:39 WARN flow.Flow: stopping jobs
12/01/13 10:49:39 INFO planner.FlowStep: [] stopping: (1/1) local
12/01/13 10:49:39 WARN flow.Flow: stopped jobs
12/01/13 10:49:39 WARN flow.Flow: shutting down job executor
12/01/13 10:49:39 WARN flow.Flow: shutdown complete
Exception in thread "main" cascading.flow.FlowException: local step failed
at cascading.flow.planner.FlowStepJob.blockOnJob(FlowStepJob.java:161)
at cascading.flow.planner.FlowStepJob.start(FlowStepJob.java:125)
at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:112)
at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:39)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:680)
Caused by: java.lang.NullPointerException
at cascading.tap.local.FileTap.openForWrite(FileTap.java:108)
at cascading.tap.local.FileTap.openForWrite(FileTap.java:48)
at cascading.flow.stream.SinkStage.prepare(SinkStage.java:57)
at cascading.flow.stream.StreamGraph.prepare(StreamGraph.java:147)
at cascading.flow.local.LocalStepRunner.call(LocalStepRunner.java:84)
at cascading.flow.local.LocalStepRunner.call(LocalStepRunner.java:41)
... 5 more

I get this error in my own jobs as well (it's not just Tutorial0.scala).

There's no problem if I change the output to

val output = TextLine("foo/output0.txt")

(If the foo directory doesn't exist, the job simply creates it and dumps the output into output0.txt there.)

There also no problem if I use the absolute path to my current directory, or a relative path "./output0.txt".

So it seems like the exception depends on whether the output filename contains "/" or not.

Problem in compiling on Ubuntu 11.10

Firstly, I'm not familiar with ruby, sbt or scalding and only starting scala... so I might be doing something obviously wrong. So, here is what I just tried to do. This is Ubuntu 11.10 - and system fully up to date with apt-get. So, check the scala version:

me@ulap:~/dev/scala$ scalac -version
Scala compiler version 2.9.0.1 -- Copyright 2002-2011, LAMP/EPFL

Then get SBT

# https://github.com/harrah/xsbt/wiki/Getting-Started-Setup
me@ulap:~/bin$ wget http://typesafe.artifactoryonline.com/typesafe/ivy-releases/org.scala-tools.sbt/sbt-launch/0.11.2/sbt-launch.jar
# echo java -Xmx512M -jar `dirname $0`/sbt-launch.jar "$@" > sbt

Then get scalding:

me@ulap:~/dev/scala$ git clone http://github.com/twitter/scalding.git
Cloning into scalding...
remote: Counting objects: 134, done.
remote: Compressing objects: 100% (97/97), done.
remote: Total 134 (delta 40), reused 110 (delta 16)
Receiving objects: 100% (134/134), 78.49 KiB, done.
Resolving deltas: 100% (40/40), done.

Next change the scala version & compiler

# edit built.sbt
# scalaVersion := "2.9.0.1"

# edit scripts/scald.rb
# COMPILE_CMD="scalac"

However running sbt update with this version fails:

me@ulap:~/dev/scala/scalding$ ~/bin/sbt update
[info] Loading project definition from /home/me/dev/scala/scalding/project
[info] Updating {file:/home/me/dev/scala/scalding/project/}default-b80ea6...
[info] Resolving com.eed3si9n#sbt-assembly;0.7.3 ...
[info] Resolving org.scala-tools.sbt#sbt_2.9.1;0.11.2 ...
[info] Resolving org.scala-tools.sbt#main_2.9.1;0.11.2 ...
[info] Resolving org.scala-tools.sbt#actions_2.9.1;0.11.2 ...
[info] Resolving org.scala-tools.sbt#classfile_2.9.1;0.11.2 ...
[info] Resolving org.scala-tools.sbt#io_2.9.1;0.11.2 ...
[info] Resolving org.scala-tools.sbt#control_2.9.1;0.11.2 ...
[info] Resolving org.scala-lang#scala-library;2.9.1 ...
[info] Resolving org.scala-tools.sbt#interface;0.11.2 ...
[info] Resolving org.scala-tools.sbt#logging_2.9.1;0.11.2 ...
[info] Resolving org.scala-tools.sbt#process_2.9.1;0.11.2 ...
[info] Resolving org.scala-tools.sbt#classpath_2.9.1;0.11.2 ...
[info] Resolving org.scala-tools.sbt#launcher-interface_2.9.1;0.11.2 ...
[info] Resolving org.scala-lang#scala-compiler;2.9.1 ...
[info] Resolving org.scala-tools.sbt#incremental-compiler_2.9.1;0.11.2 ...
[info] Resolving org.scala-tools.sbt#collections_2.9.1;0.11.2 ...
[info] Resolving org.scala-tools.sbt#api_2.9.1;0.11.2 ...
[info] Resolving org.scala-tools.sbt#persist_2.9.1;0.11.2 ...
[info] Resolving org.scala-tools.sbinary#sbinary_2.9.0;0.4.0 ...
[info] Resolving org.scala-tools.sbt#compile_2.9.1;0.11.2 ...
[info] Resolving org.scala-tools.sbt#ivy_2.9.1;0.11.2 ...
[info] Resolving org.apache.ivy#ivy;2.2.0 ...
[info] Resolving com.jcraft#jsch;0.1.31 ...
[info] Resolving commons-httpclient#commons-httpclient;3.1 ...
[info] Resolving commons-logging#commons-logging;1.0.4 ...
[info] Resolving commons-codec#commons-codec;1.2 ...
[info] Resolving org.scala-tools.sbt#completion_2.9.1;0.11.2 ...
[info] Resolving jline#jline;0.9.94 ...
[info] Resolving org.scala-tools.sbt#run_2.9.1;0.11.2 ...
[info] Resolving org.scala-tools.sbt#task-system_2.9.1;0.11.2 ...
[info] Resolving org.scala-tools.sbt#tasks_2.9.1;0.11.2 ...
[info] Resolving org.scala-tools.sbt#tracking_2.9.1;0.11.2 ...
[info] Resolving org.scala-tools.sbt#cache_2.9.1;0.11.2 ...
[info] Resolving org.scala-tools.sbt#testing_2.9.1;0.11.2 ...
[info] Resolving org.scala-tools.testing#test-interface;0.5 ...
[info] Resolving org.scala-tools.sbt#compiler-interface;0.11.2 ...
[info] Resolving org.scala-tools.sbt#precompiled-2_8_1;0.11.2 ...
[info] Resolving org.scala-tools.sbt#precompiled-2_8_0;0.11.2 ...
[info] Resolving org.scala-tools.sbt#precompiled-2_9_0;0.11.2 ...
[info] Done updating.
[info] Set current project to scalding (in build file:/home/me/dev/scala/scalding/)
Getting Scala 2.9.0.1 ...

:: problems summary ::
:::: WARNINGS
                module not found: org.scala-lang#scala-compiler;2.9.0.1

        ==== local: tried

          /home/me/.ivy2/local/org.scala-lang/scala-compiler/2.9.0.1/ivys/ivy.xml

        ==== typesafe-ivy-releases: tried

          http://repo.typesafe.com/typesafe/ivy-releases/org.scala-lang/scala-compiler/2.9.0.1/ivys/ivy.xml

        ==== Maven Central: tried

          http://repo1.maven.org/maven2/org/scala-lang/scala-compiler/2.9.0.1/scala-compiler-2.9.0.1.pom

        ==== Scala-Tools Maven2 Repository: tried

          http://scala-tools.org/repo-releases/org/scala-lang/scala-compiler/2.9.0.1/scala-compiler-2.9.0.1.pom

        ==== Scala-Tools Maven2 Snapshots Repository: tried

          http://scala-tools.org/repo-snapshots/org/scala-lang/scala-compiler/2.9.0.1/scala-compiler-2.9.0.1.pom

                module not found: org.scala-lang#scala-library;2.9.0.1

        ==== local: tried

          /home/me/.ivy2/local/org.scala-lang/scala-library/2.9.0.1/ivys/ivy.xml

        ==== typesafe-ivy-releases: tried

          http://repo.typesafe.com/typesafe/ivy-releases/org.scala-lang/scala-library/2.9.0.1/ivys/ivy.xml

        ==== Maven Central: tried

          http://repo1.maven.org/maven2/org/scala-lang/scala-library/2.9.0.1/scala-library-2.9.0.1.pom

        ==== Scala-Tools Maven2 Repository: tried

          http://scala-tools.org/repo-releases/org/scala-lang/scala-library/2.9.0.1/scala-library-2.9.0.1.pom

        ==== Scala-Tools Maven2 Snapshots Repository: tried

          http://scala-tools.org/repo-snapshots/org/scala-lang/scala-library/2.9.0.1/scala-library-2.9.0.1.pom

                ::::::::::::::::::::::::::::::::::::::::::::::

                ::          UNRESOLVED DEPENDENCIES         ::

                ::::::::::::::::::::::::::::::::::::::::::::::

                :: org.scala-lang#scala-compiler;2.9.0.1: not found

                :: org.scala-lang#scala-library;2.9.0.1: not found

                ::::::::::::::::::::::::::::::::::::::::::::::



:: USE VERBOSE OR DEBUG MESSAGE LEVEL FOR MORE DETAILS
unresolved dependency: org.scala-lang#scala-compiler;2.9.0.1: not found
unresolved dependency: org.scala-lang#scala-library;2.9.0.1: not found
Error during sbt execution: Error retrieving required libraries
  (see /home/me/.sbt/boot/update.log for complete log)
[error] {file:/home/me/dev/scala/scalding/}default-bc2517/*:scala-instance: xsbti.RetrieveException: Could not retrieve Scala 2.9.0.1
[error] Total time: 3 s, completed Feb 9, 2012 9:22:34 AM
me@ulap:~/dev/scala/scalding$

So, try with 2.9.0

# so, chance the version to 2.9.0
# edit built.sbt
# scalaVersion := "2.9.0"

sbt update works:

me@ulap:~/dev/scala/scalding$ ~/bin/sbt update
...
[success] Total time: 3 s, completed Feb 9, 2012 9:27:08 AM

Then sbt test - which fails like this:

me@ulap:~/dev/scala/scalding$ ~/bin/sbt test
[info] Loading project definition from /home/me/dev/scala/scalding/project
[info] Set current project to scalding (in build file:/home/me/dev/scala/scalding/)
[info] Compiling 18 Scala sources to /home/me/dev/scala/scalding/target/scala-2.9.0/classes...
[error] /home/me/dev/scala/scalding/src/main/scala/com/twitter/scalding/Source.scala:145: type mismatch;
[error]  found   : Seq[cascading.tap.Tap[_ <: cascading.flow.FlowProcess[_], _, _, _]]
[error]  required: Seq[cascading.tap.Tap[?>: Nothing <: cascading.flow.FlowProcess, ?>: Nothing <: Any, ?>: Nothing <: Any, ?>: Nothing <: Any]]
[error]           case _ => new MultiSourceTap(taps.toSeq : _*)
[error]                                             ^
[error] /home/me/dev/scala/scalding/src/main/scala/com/twitter/scalding/Source.scala:173: type mismatch;
[error]  found   : Seq[cascading.tap.Tap[_ <: cascading.flow.FlowProcess[_], _, _, _]]
[error]  required: Seq[cascading.tap.Tap[?>: Nothing <: cascading.flow.FlowProcess, ?>: Nothing <: Any, ?>: Nothing <: Any, ?>: Nothing <: Any]]
[error]           JobConf, RecordReader[_,_], OutputCollector[_,_]](taps.toSeq : _*)
[error]                                                                  ^
[error] two errors found
[error] {file:/home/me/dev/scala/scalding/}default-bc2517/compile:compile: Compilation failed
[error] Total time: 31 s, completed Feb 9, 2012 9:29:53 AM

It would be really helpful to have good documentation on the installation parameters. What to change for different versions of scala or hadoop (I didn't even get that far, but I know it will come) as well as host name settings and whatever you have there.

Tutorials 2-5 fail with InvocationTargetException

$ scripts/scald.rb --local tutorial/Tutorial2.scala

/Users/dyross/src/scalding/target/scalding-assembly-0.2.0.jar
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27)
at java.lang.reflect.Constructor.newInstance(Constructor.java:513)
at com.twitter.scalding.Tool.run(Tool.scala:48)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
at com.twitter.scalding.Tool$.main(Tool.scala:94)
at com.twitter.scalding.Tool.main(Tool.scala)
Caused by: java.lang.NoClassDefFoundError: scala/Serializable
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClassCond(ClassLoader.java:631)
at java.lang.ClassLoader.defineClass(ClassLoader.java:615)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:283)
at java.net.URLClassLoader.access$000(URLClassLoader.java:58)
at java.net.URLClassLoader$1.run(URLClassLoader.java:197)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
at Tutorial2.(Tutorial2.scala:53)
... 8 more
Caused by: java.lang.ClassNotFoundException: scala.Serializable
at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
... 21 more

scald.rb assumes scala 2.8.1

you set the scala version in build.sbt, but scald.rb assumes 2.8.1. This led to some unexpected errors, and either
a) how to make it work with various versions of scala should be documented (or if it has to be 2.8.1 should be documented)
b) scald.rb can pull the version in from some other source (be it SCALA_HOME, the scala command, something written to the assembly jar manifest, I don't know)

sbt assembly not a valid key

I installed scala (2.9.2) and sbt on Mac OSX Lion. When I run sbt assembly I get an error message saying that assembly is not a valid command.

Any suggestions?

better serialization for type-erased types

The typed API, in particular, assumes scala's type system, which is richer than Java.

Now, we serialize List[Int] the same way we do List[String]: we look up the .getClass, and then pass this to Kryo (via Chill), and there is exactly one serializer for List[_] as a result.

An alternative approach would be to have a set of marker classes (that could be generated at runtime via ASM or something, but they are all trivial):
Marker0(x) extends Marker
Marker1(x) extends Marker
Marker2(x) extends Marker

etc...

Then we keep a mapping of Int => Bijection.Bufferable[_]. Now, the typed-API, before a write, wraps each item with a marker so that for each type we can find the correct Bufferable.

The win to an approach like this is that you can use a different serializer for (Int, String, Long) than (String, String, String), and you can avoid writing the type information of the internal types (since the marker gives you enough to reconstruct the EXACT scala type). The serialized data should be smaller, and there is no risk what-so-ever of recovering a different type than what you put in (as there is with Kryo because of subclassing).

This would be a non-negligible change. I can see a few approaches here:

  1. Don't do this in the fields-API.
  2. Do it optionally throughout scalding (some setting you turn on). This is harder because we are going to need implicit parameters methods which can change the type of the tuples.

I'd love to find a better way, but by the time we are past the compiler, the JVM has erased the inner types, and I'm not sure we can cheaply recover them.

ToolJob: support for Jobs with integrated Tool.

The normal use case of building a scalding job involves writing a class that subclasses Job. Then, this class is rendered as a cascading flow by the scalding.Tool. There are two issues with this: 1) reflection is normally used to launch the job, and any error in the job that throws at constructor time, is generally hidden from the user as it is a reflection failure. 2) For the use case of people building stand-alone jobs, this needlessly complicates their build, as they have to launch with a special redundant string.

My idea is the ability to do something like:

object MyNewJob extends App with ToolJob {
  // args is the raw input passed in provided by App
  TypedTsv[String](parsedArgs("input"))
    .mapTo('words) { _.split("\\s+") }
    .groupBy('words) { _.size }
    .write(Tsv(parsedArgs("out")))
}

And then be able to run that with ```hadoop jar MyJar.jar --input infile --out wordcount.tsv" and have it bake in the default main method correctly.

Externalized configuration options

There are a number of places in the Scalding source with default configuration values set and comments like

//tune this for ...

This is easy to do if you are using a local build of Scalding, but when integrating Scalding into a project as a library dependency adjusting these parameters requires overriding method definitions and other acrobatics.

It would be nice if there was an external config file that could be exposed.

The configuration system built by Typesafe is one option and has no dependencies: https://github.com/typesafehub/config/blob/master/README.md

Improve GC pressure dealing with cascading Tuples.

Look here:

https://github.com/twitter/scalding/blob/develop/src/main/scala/com/twitter/scalding/TupleBase.scala#L55

We should add a method to set into an existing Tuple and not necessarily reallocate.

The reason is cascading will not keep a reference to your tuple after you put it into the collector, so it is safe to reuse just one Tuple inside the operations, which should reduce GC pressure.

Calling TupleSetter.set(t : T, ctup: CTuple)

is probably what we want, which will reset ctup to contain exactly t, and then just reuse one instance of ctup.

An argument cannot take negative number as it's value

Negative numbers are treated as individual arguments, not values :
e.g. -minThreshold -3 is translated into two arguments (--min, --3), instead of min argument with 3 as its value.

If this is not an intended behavior, I think negative number should be treated differently.

In order to support multiple numbers as a value, e.g. --select -7,3,-2,5, I think something like following will do:

       if(arg == noDashes || arg.matches("-\\d+.*") )
          (acc.head._1 -> (arg :: acc.head._2)) :: acc.tail

Better support for Iterative Jobs / Temporary outputs

If you want to do an iterative job, the pattern is:

Read some previous state, compute some new state, check if you should stop, if you should, next returns None else next returns Some(job) where job is probably a copy of the current job.

The problem is the programmer has to manage all the temporary sources that are created to check convergence.

I imagine something like a TempSourceFactory that can keep track of temporary sources, which are probably cascading sequence files, which you can pass between jobs.

After the last job is run, the TempSourceFactory cleans up all the allocated data on the disk (which is to say, all of this data is ephemeral).

My design thinking around this is to have a map inside the TempSourceFactory object that maps a UUID onto a Map[String,Source]. For each iterative job, there is one UUID, and this can be accessed from any job.

This should probably be plumbed through with an API on TempSourceFactory, something like:

object TempSourceFactory {
  def apply[T](name: String)(implicit mf: Manifest[T], args: Args): Mappable[T]
  def cleanup: Unit
}

job stuck when submitted to cluster

It runs good at in local mode. But if submitted using the following script, the job stuck forever. The jobtracker web interface showed that the map phase was 100% completed, reduce 0% with no progress.

update: I am running in pseudo distributed mode, and I noticed that the default REDUCERS=100 in scripts/scald.rb, change REDUCERS from 100 to 2 solved this issue.

Add mapi to GeneratedTupleAdders

In the typed API, you often want to do a mapping one of the items in a tuple.

I propose we add to the TupleNAdders enrichment classes the methods:

class Tuple2Adder[A,B](tup: (A,B)) {
  def map1[Z](fn: A => Z): (Z,B) = (fn(tup._1), tup._2)
  def map2[Z](fn: B => Z): (A,Z) = (tup._1, fn(tup._2))
}

and so fourth.

Fix size SizeHintTest.scala

Flaky test that can break like this:

[info] ! SizeHint.transpose law is obeyed in total: Falsified after 72 passed tests.
[info] > ARG_0: SparseHint(2.530047600584062E-4,16488,28717)
[info] > ARG_1: SparseHint(0.04780439232218836,390418,89753)

CompositeInputFormat

Hadoop's CompositeInputFormat allows very efficient joins if you have identically-partitioned data (ie, joining the output of multiple groupBys that had the same keys, same sort, and the same number of reducers).

I don't know exactly what an API to use this from scalding would look like, but it would be great to come up with one.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.