Code Monkey home page Code Monkey logo

rhadoop's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rhadoop's Issues

advanced equijoin

The equjoin currently in dev is the basic one. Doesn't scale well when one key is predominant, doesn't exploit special cases like one side having a small number of records. There is lot of work that could go into having a better join feature.

Kill job when killing mapreduce

for maximum integration it would be nice to have that a mapreduce call, on exit, clean up the related hadoop job. This would happen only if mapreduce terminates abnormally, for instance gets interrupted. This should work exactly the same at the prompt or in a script. It's not clear what the implementation approach could be, other than a) the kill command is included in the job output so capturing that (capture.otuput?) and calling system is one way of doing it b)the on.exit function is the way to add this clean up action
This is still not a plan because the information on how to kill a job is available after on exit can be called

distribute files over hdfs

to.dfs doesn't partition the files that it creates, which prevents the map phase of a job on the file to use multiple cores. That's all right for I/O bound jobs as to.dfs never creates big data, but for more CPU intensive can be a problem. A simple job with random keys and pass through mapper and reducer can solve this.

Can't install rhdfs

Hi,

I am running R 2.13.1 on CENTS 6.0 x64 with Oracle JDK 1.6.23 64bit and rJava0.9.2, when executing R CMD INSTALL R CMD INSTALL rhdfs_1.0.1.tar.tar

  • installing to library ‘/opt/r2131/lib64/R/library’
  • installing source package ‘rhdfs’ ...
    ** R
    ** inst
    ** preparing package for lazy loading
    ** help
    *** installing help indices
    ** building package indices ...
    ** testing if installed package can be loaded
    Error : .onLoad failed in loadNamespace() for 'rhdfs', details:
    call: .jcall("RJavaTools", "Ljava/lang/Object;", "invokeMethod", cl,
    error: java.lang.NoClassDefFoundError: com.sun.security.auth.UnixPrincipal
    Error: loading failed
    Execution halted
    ERROR: loading failed
  • removing ‘/opt/r2131/lib64/R/library/rhdfs’
    [gpadmin@hdp1 pkgs]$ java -version
    java version "1.6.0_23"
    Java(TM) SE Runtime Environment (build 1.6.0_23-b05)
    Java HotSpot(TM) 64-Bit Server VM (build 19.0-b09, mixed mode)

binary streaming formats

binary streaming formats. Almost 2x the speed for python/dumbo on a IO heavy example. see also wiki efficient rmr document.

Hadoop streaming issue (apparently) with mapreduce

Hi all,

Firstly, I'd like to apologize for not sticking to gfm conventions: it's almost midnight, my brain is nowhere near the functional level required to figure something new out and I'm posting here for the first time. Now:

I am somewhat familiar with R (~1.5 years), fairly new to Hadoop and very new to RHadoop. I've recently been trying to run a couple of simple linear regression codes written using RHadoop; however, I seem to be running repeatedly into what seems to be a hadoop streaming issue. Before I jump into it, I shall explain the codes and where I'm running them a little.

Both of the codes mentioned above expect an input csv on the HDFS with no headers and only containing values for the predictor variables and the dependent variable, which must be the last variable (counting left to right)

Since I'm not entirely sure yet of the way RHadoop works, I wrote both codes with different assumptions on RHadoop's working. One code is based on the assumption that text files will be processed line-by-line by the mapper and the second one is a logical rip-off from an equivalent Rhipe code, where the assumption is that for each mapper there will exist (magically) a list of values (a la map.values in Rhipe).

First code (assuming mapper processes line-by-line):

### Script: RHadoopReg.R ###
# load required libraries
library(rhdfs)
library(rmr)

# define function rHdp.lm
rHdp.lm <- function(input,output = NULL){

    # define mapper as function
    map <- function(k,v){
        # split input line on commas
        val <- strsplit(v, ',')[[1]] 

        # extract predictors and bind
        #1 to the right
        x <- c(1,as.numeric(val[-length(val)]))

        # extract dependent
        y <- as.numeric(val[length(val)])

        # compute outer of x and x
        # for m predictors, this should
        # yield a (m + 1)x(m + 1) matrix
        val1 <- outer(x,x)

        # multiply x with dependent
        # this should yield a vector
        # of length (m + 1)
        val2 <- x*y

        # cbind val1 and val2 and write
        # out with NULL key
        keyval('dummy_key',cbind(val1,val2))
    }

    # define reducer as function
    reduce <- function(k,vv){

        # initialize val as zero
        val <- 0

        # loop through each (matrix)
        # element of vv and take matrix
        # sums
        for(i in 1:length(vv)){
            val = val + as.numeric(vv[[i]])
        }

        # extract t(x)*x
        x <- val[,-dim(val)[2]]

        # extract t(x)*y
        y <- val[,dim(val)[2]]

        # use solve to get betas
        betas <- solve(as.matrix(x),as.matrix(y))

        # write out with key betas
        keyval('betas',betas)
    }

    # define as a map-reduce job
    mapreduce(input = input, 
            output = output, 
            map = map,
            reduce = reduce)
}

rHdp.lm('/user/hadoop/regData','/user/hadoop/rTestRegOut')

Second code (assuming mappers process lists of values):

### Script: RHadoopReg1.R ###
# load required libraries
library(rhdfs)
library(rmr)

# define function rHdp.lm1
rHdp.lm1 <- function(input,output = NULL){

    # define mapper as function
    map <- function(k,v){
        # split input line on commas
        datMat <- do.call('rbind',lapply(v, function(r){
            as.numeric(strsplit(r,',')[[1]])
        }))

        # extract dependent
        y <- as.matrix(datMat[,dim(datMat)[2]])

        # extract predictors and bind
        #1 to the right
        x <- cbind(rep(1,dim(datMat)[1]),as.matrix(datMat[,-dim(datMat)[2]]))

        # compute crossprod of x and x
        # for m predictors, this should
        # yield a (m + 1)x(m + 1) matrix
        val1 <- crossprod(as.matrix(x),as.matrix(x))

        # multiply x with dependent
        # this should yield a vector
        # of length (m + 1)
        val2 <- crossprod(as.matrix(x),as.matrix(y))

        # cbind val1 and val2 and write
        # out with NULL key
        keyval('temp_out',cbind(val1,val2))
    }

    # define reducer as function
    reduce <- function(k,vv){

        # initialize val as zero
        val <- 0

        # loop through each (matrix)
        # element of vv and take matrix
        # sums
        for(i in 1:length(vv)){
            val = val + as.numeric(vv[[i]])
        }

        # extract t(x)*x
        x <- val[,-dim(val)[2]]

        # extract t(x)*y
        y <- val[,dim(val)[2]]

        # use solve to get betas
        betas <- solve(as.matrix(x),as.matrix(y))

        # write out with key betas
        keyval('betas',betas)
    }

    # define as a map-reduce job
    mapreduce(input = input, 
            output = output, 
            map = map,
            reduce = reduce)
}

rHdp.lm1('/user/hadoop/regData/','/user/hadoop/rTestRegOut1')

I've tried running these codes both on a 14-node hadoop cluster, with nodes running hadoop-0.20.2.append on either Ubuntu Lucid or Maverick, and a single node installation, running on Ubuntu Lucid which itself runs on VMPlayer on Windows 7.

Now, upon sourcing either of these two codes in R, I repeatedly keep running into the following error:

> source("RHadoopReg.R")
packageJobJar: [/tmp/RtmpIKd3Fr/rhstr.map287d3049, /tmp/RtmpIKd3Fr/rhstr.reduce20ae0aad, /tmp/RtmpIKd3Fr/rmrParentEnv, /tmp/RtmpIKd3Fr/rmrLocalEnv, /usr/local/hadoop/hadoop-datastore/hadoop-hadoop/hadoop-unjar7948228214583466863/] [] /tmp/streamjob2980539178006618836.jar tmpDir=null
11/10/20 10:51:33 INFO mapred.FileInputFormat: Total input paths to process : 1
11/10/20 10:51:34 INFO streaming.StreamJob: getLocalDirs(): [/usr/local/hadoop/hadoop-datastore/hadoop-hadoop/mapred/local]
11/10/20 10:51:34 INFO streaming.StreamJob: Running job: job_201110200316_0004
11/10/20 10:51:34 INFO streaming.StreamJob: To kill this job, run:
11/10/20 10:51:34 INFO streaming.StreamJob: /usr/local/hadoop/hadoop/bin/../bin/hadoop job  -Dmapred.job.tracker=localhost:54311 -kill job_201110200316_0004
11/10/20 10:51:35 INFO streaming.StreamJob: Tracking URL: http://localhost.localdomain:50030/jobdetails.jsp?jobid=job_201110200316_0004
11/10/20 10:51:36 INFO streaming.StreamJob:  map 0%  reduce 0%
11/10/20 10:52:38 INFO streaming.StreamJob:  map 100%  reduce 0%
11/10/20 10:52:46 INFO streaming.StreamJob:  map 0%  reduce 0%
11/10/20 10:53:17 INFO streaming.StreamJob:  map 100%  reduce 100%
11/10/20 10:53:17 INFO streaming.StreamJob: To kill this job, run:
11/10/20 10:53:17 INFO streaming.StreamJob: /usr/local/hadoop/hadoop/bin/../bin/hadoop job  -Dmapred.job.tracker=localhost:54311 -kill job_201110200316_0004
11/10/20 10:53:17 INFO streaming.StreamJob: Tracking URL: http://localhost.localdomain:50030/jobdetails.jsp?jobid=job_201110200316_0004
11/10/20 10:53:17 ERROR streaming.StreamJob: Job not Successful!
11/10/20 10:53:17 INFO streaming.StreamJob: killJob...
Streaming Job Failed!
Error in rhstream(map = map, reduce = reduce, reduceondataframe = reduceondataframe,  : 
  hadoop streaming failed with error code 1

Further exploring the trace on the jobtracker url, this is what I find (for each attempt):

java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1 at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:311) at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:545) at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:132) at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:57) at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:358) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307) at org.apache.hadoop.mapred.Child.main(Child.java:170) 

While this trace seems somewhat common on Google, I cannot seem to find anything specifically about it in an RHadoop context.

Additionally, I've also been observing some strange behavior from rhdfs.

This set of commands run fine:

> to.dfs(lapply(1:100, function(i){eps = rnorm(1,sd=10);keyval(i, list(x=c(i,i+eps),y=2*(eps > 0) - 1))}),"/user/hadoop/logreg")
> a <- from.dfs("/user/hadoop/logreg")

However, when I try the same trick with a file already on the HDFS (put there previously using hadoop fs -copyFromLocal), I seem to run into a strange error:

> b <- from.dfs("/user/hadoop/regData")
Error in file(con, "r") : cannot open the connection

I initially suspected that this might have something to do with permissions and I was partially right; when I tried the first set of commands as root, I got the following error:

> to.dfs(lapply(1:100, function(i){eps = rnorm(1,sd=10);keyval(i, list(x=c(i,i+eps),y=2*(eps > 0) - 1))}),"/user/hadoop/logreg")
put: org.apache.hadoop.security.AccessControlException: Permission denied: user=root, access=WRITE, inode="hadoop":hadoop:supergroup:rwxr-xr-x
[1] "/user/hadoop/logreg"

The other set of commands also gave the same error as the first when run as root:

> a <- from.dfs("/user/hadoop/regData/regTestData.csv")
get: org.apache.hadoop.security.AccessControlException: Permission denied: user=root, access=EXECUTE, inode="regData":hadoop:supergroup:rw-r--r--
Error in if (file.info(tmp)[1, "isdir"]) { : 
  missing value where TRUE/FALSE needed

Still clinging to the idea that the issue may be due to permissions, I checked the permissions on the files involved in both sets of commands. However, much to my chagrin, they were both the same (discovered using hadoop fs -ls):

-rw-r--r--   1 hadoop supergroup       4698 2011-10-20 11:16 /user/hadoop/logreg
-rw-r--r--   1 hadoop supergroup      15226 2011-10-20 11:13 /user/hadoop/regData

I have now come to one of the following four conclusions:

  1. This is due to a very stupid error on my part
  2. This is due to some incompatibility issues between hadoop and RHadoop
  3. I should be specifying some parameters that I am not (a la the mapred list in Rhipe)
  4. None of the above, this is just plain beyond me

Therefore, I now find myself posting here what's ultimately quite a long post, in the hope that someone will know exactly (or at least vaguely) what's going wrong and point me in the right direction.

Thanks,

Krish.

binary native format

Now that the support for binary formats is becoming available (in branch binary-io) it is possible to change the native R format from a text format to a binary format. This would be more compact end efficient and avoid quoting issues and hopefully make #36 go away.
This is the plan: serialize R objects with serialize, then used the typed bytes writer with type code 0 or one alias for that (type codes above 50 are reserved for application specific serialization). Then there is the question of whether to write the typedbytes directly to disk or to write them in a sequence file. Sequence files are the HDFS standard, so that seems like a good reason to use it. It has a header with version information and a key value structure plus some sync information. One drawback is that we don't have a sequence file reader in R and the only way to access them is through streaming, so that would leave from.dfs and to.dfs out in the cold. A workaround for that is to replace their implementation with a streaming job (at least for the binary case). It looks like the new dumptb command could be exactly what is needed (so one would have to do a hadoop streaming dumptb instead of a hadoop dfs -get, but it would be the same from there.

prevent map and reduce functions from using stdin and out

stdin and stdout have special meaning for streaming, in that they are used to communicate between streaming and map and reduce executables. Even stderr can have a special meaning (it is used to implement counters and statuses) but writing other things to stderr doesn't cause damage. The problem is not when map and reduce are little function of which we know the ins and outs and do not use stdin and stdout, but when we use libraries of which we may not know whether they do IO under some circumstance and we may not be able to change or fix it if they do. So taking a page from hadoopy, it would be nice to divert or redirect all output from within the mapper (say with sink) so that only the blessed streaming stream can be produced (from the map.driver and record.writer specifically). Same for input even if I think that is a less solvable or common occurrence.

kmeans not reproducible as test case

To test the implementation of a backend it would be nice to see that different backends return the same results when running the same program on the same input. In fact I was planning to make that a requirement for new backends. The problem with randomized algorithms is that it isn't trivial to get reproducible results. The normal approach of re-seeding the RNG doesn't extend easily to a parallel framework where the degree of parallelism and the exact input to each of the sequential processes is not controlled by the programmer. So I am excluding kmeans from the cross-backend test cases for now. Ideas welcome.

rmr - Add ability to modify per-job config parameters

As documented on the wiki, a conscious design decision was made to not provide an interface to set per-job config parameters. However, there are scenarios where developers need to set job specific parameters on the jobconf, such as mapred.reduce.tasks. Functionality should be added to provide this ability.

CASSANDRA Support

According to CASSANDRA-3134, the preferred mechanism for binary I/O with streaming is Typed bytes. Two streaming based libraries (Hadoopy and dumbo, both for python) use typedbytes for other types of binary input, creating some support. So if we just implemented an R reader/writer for typed bytes and CASSANDRA-3134 is fixed, then we should have cassandra support and also some other binary i/O options should open up. See also HADOOP-1722, the original issue about typedbytes.

dfs.size

it would be nice to have a way to test the size of a file (or directory) to be able to set parameters for a job (imagine a sampling rate that's inversely proportional to the size, for instance) or to switch to the local backend when the data sizes don't warrant using hadoop, which happens sometimes in iterative jobs. It would probably be even better to have a record count, but I don't know of any way to get that without a mapreduce job.

HBASE support

The goal here is to run rmr mapreduce jobs over HBASE data, of course. It looks like there is some ancient (by current standards) work that could be helpful to implement this, documented here. There is currently no way to pass all the needed options to mapreduce, but that could be fixed. On the positive side the JSON format option should go into rmr without much if any additional parsing than is already in place. Of course the devil is in the details. Alternatively, we could take advantage of the typedbytes work that's already scheduled for the CASSANDRA issue #20 and use the work done by the guys at last.fm, licensing permitting. Actually, this latter route seems to build on much more recent and alive work.

local backend doesn't convert data to JSON or other representation

blocking #8 backend abstraction
In theory JSON conversion should fail or roundtrip (x== fromJSON(toJSON(x)) for any supported x, fail otherwise). In practice it's not the case (matrices, for instance). So things that work in local may not work on hadoop and vice-versa (we hit the first case so far). Backend should be semantically interchangeable, so this is a bug. We could push the data through the same encoding as if we were using Hadoop, to simulate any shortcomings of the encoding. Or we could switch at least internall (between map and reduce) to a native R encoding that has no roundtrip problems ever.

Examples in wiki are out of date

At least two examples in the rmr tutorial are not running as intended, presumably due to the switch of default from value=NULL to key=NULL in rmr 1.1.

finish call in mapreduce

it could be useful to have mapreduce execute a function after the last input record in map or reduce or combine. Introduced in HADOOP-20, they are branded there as simple cleanup functions, but I think they can be used to implement a number of map-side programming patterns as found in Cascading. The simplest one can think of is a almost map side counter, where the map is an accumulator and the map-finish function would output only the total of the count for that mapper. This way the reducer has only one input record per mapper. This can be achieved also with combiners, but with the current list interface one could run out of memory. Instead with the map-finish approach one can do it in constant memory,

Alternative backends

So far we have hadoop streaming and local (same interpreter). Are there other backends people would like to see? Would they fit the same API? Mesos? Spark? Hadoop Java with rJava. The sky is the upper limit, as long as it makes sense.

backend abstraction

What is in dev now is a start, but it still uses hdfs whereas it should use the local fs with the local backend. Need to propagate the backend sepration across all features.

Chaining mapreduce fails on second job

I'm trying to follow the example in the rmr help files with a simple mapreduce filter. it fails on the second job:

Streaming Command Failed!
Error in mr(map = map, reduce = reduce, reduceondataframe = reduceondataframe,  : 
  hadoop streaming failed with error code 1

The code I'm using is:

## Taken from example in rmr guide on GitHub
library(rmr)

mapReduceFilter = function(input, output = NULL, pred) {
  mapreduce(input = input, output = output, textinputformat = rawtextinputformat,
            map = function(k,v) if(pred(k,v)) keyval(k,v) else NULL)}

pred.col1.odd = function(k,v) {
  return(as.numeric(strsplit(x = v, ",")[[1]][1]) %% 2 != 0)
}
pred.col3.even = function(k,v) {
  return(as.numeric(strsplit(x = v, ",")[[1]][3]) %% 2 == 0)
}

# Test single run with first pred - works
from.dfs(mapReduceFilter(input = "/rdata/test-file3.csv", pred = pred.col1.odd))
# Test single run with second pred - works
from.dfs(mapReduceFilter(input = "/rdata/test-file3.csv", pred = pred.col3.even))
# Test chained - fails on second stream job
from.dfs(mapReduceFilter(input = mapReduceFilter(input = "/rdata/test-file3.csv",
                           pred = pred.col1.odd),
                         pred = pred.col3.even))

The test data is in a simple csv format:
1,22,333
2,33,444

I suspect I'm making a simple error, but clearly missing something important!
Any pointers to the error in my ways appreciated.

Regards Ross

EMR backend

EMR has a streaming-like feature but not API compatible with hadoop streaming. It seems possible to provide another backend that is based on EMR, once the backend refactoring is gone and the local backend has made it into master. It is required that any backend pass R cmd check and that all the tests be extended to cover any new backend.

Vectorized refactor (was: Alternative API)

The question here is whether it is possible to support better the "vectorized" programming style that is necessary in R to get the best speed with reasonable changes to the API. For instance, one could have a vmap be a function(kk, vv) that is processing multiple records at once. To give legs to this option one would need: an efficient way to parse multiple input lines, that is without using *apply or for loops; an equivalent on the reduce side, whose signature is unclear; and use cases that show how useful the alternate API is, So far we have none of these.

implement counters and status messages

They are already implemented but for some reason stopped working some time ago and became abandonware. It's also a question whether we really want and need these, since they are not scalable hadoop features, but they are very useful in debugging

i/o format to support any R objects

the current JSON based I/O format has one drawback, it doesn't support all R types (no models, promises, functions etc.) Even with the ones supported, there are some odd mismatches. To improve on this, one could 1) have a separate input format that is not JSON compatible and is based on the built in serialization (see serialize()) 2) try to squeeze unsupported types into the current scheme by turning them into strings, with an option on the other end to unserialize the string. Pros and cons to both solutions. In any event, one needs to make sure that hadoop streaming will be able to understand keys and values, that is separators, possibly the default ones \t and \n should not be allowed in whatever encoding is used.

csv format

is an obvious candidate to add support for. Use built in flattening function (flatten, to.data.frame) to flatten nested lists and then format.

Not able to install rhdfs package and rhbase package

I am just new to hadoop and want to try mapreduce in R. But i am not able to install rhdfs and rhbase package.

When i install rhbase package i receive following message..

  • installing to library 'C:/Users/cse3502/Documents/R/win-library/2.14'

  • installing source package 'rhbase' ...

    WARNING: this package has a configure script
    It probably needs manual configuration

** libs

*** arch - i386
ERROR: compilation failed for package 'rhbase'

  • removing 'C:/Users/cse3502/Documents/R/win-library/2.14/rhbase'

When i install rhdfs

  • installing to library 'C:/Users/cse3502/Documents/R/win-library/2.14'
  • installing source package 'rhdfs' ...
    ** R
    ** inst
    ** preparing package for lazy loading
    ** help
    *** installing help indices
    ** building package indices ...
    ** testing if installed package can be loaded
    Error : .onLoad failed in loadNamespace() for 'rhdfs', details:
    call: .jnew("org/apache/hadoop/conf/Configuration")
    error: java.lang.ClassNotFoundException
    Error: loading failed
    Execution halted
    ERROR: loading failed
  • removing 'C:/Users/cse3502/Documents/R/win-library/2.14/rhdfs'

i am also not able to run this command R CMD javareconf
i am using jdk1.6.0_30

sequence.typedbytes format issues

testing sequence typedbutes we found two issues. First NULL is encoded as list(). This is a shortcoming of typedbytes has been fixed in HIVE-1029 but not in streaming (not clear why there is a duplicate typedbytes def in hive). The other is that there are two notions of collection, a list and a vector and they both correspond to R lists, so vectors come back as lists. Unless I am missing something about typedbytes, for now this are restrictions we need to live with.

library loading in the map and reduce functions

additional libraries loaded when mapreduce is exectued are not loaded in the nodes when map and reduce are executed. One has the workaround of loading them explicitly in the map and reduce functions, but it's ugly and a deviation from the goal that mapreduce should work as a ?apply family function

deploy tools

To run R on a Hadoop cluster, one approach is to install R and necessary packages on all nodes and a few rsyncs and sshes do the job quite all right, but I am opening this to see if anyone has or want to provide a user friendly solution

matrices not dealt properly by to.dfs

matrices are turned into lists of elements. It seems like keeping the rows together would make sense, in particular since once the dim information is lost there is no way of recovering it.

local backend doesn't convert data to JSON or other representation

related to #8 backend abstraction
In theory JSON conversion should fail or roundtrip (x== fromJSON(toJSON(x)) for any supported x, fail otherwise). In practice it's not the case (matrices, for instance). So things that work in local may not work on hadoop and vice-versa (we hit the first case so far). Backend should be semantically interchangeable, so this is a bug. We could push the data through the same encoding as if we were using Hadoop, to simulate any shortcomings of the encoding. Or we could switch at least internall (between map and reduce) to a native R encoding that has no roundtrip problems ever.

Changes in the API -- naming conventions

What happened is that I took a moment to reflect on what had been done up to 1.1. I looked at a simple things like naming conventions and I couldn't see any. So I thought it was time to adopt one before millions of lines of code need to be refactored. The changes affect long, self documenting names like defaulttextinputformat. I was using a mix of .-separated, camelCase and nonseparated. I decided to switch to .-separated uniformly, internally and in the API. The reasons are

  • The variety of formats was a distraction and affected readability for no gain.
  • non-separated is faster to write but harder to read. Went for readability, IDEs help with writability.
  • camel case reminds of Java, not very common in R outside of the OO subset.
  • .-separated seems a bit more R idiomatic and was already used in many places

The exceptions are (on top of my mind):

  • mapreduce: people write it as one word very often
  • keyval, k,v,vv: used often enough that a shorter form seems warranted (the stand for: create a key, value pair, key, value and list of values resp.)

I know changes in the API should not be taken lightly and are disruptive. In this case though, everything is one-to-one with the previous API. Just a few search and replace and you are done and your programs will be more readable for that. The related commits are: 67169f4, c6968e1, f51356a, 65f0e19, 36ef09c, 17f73da, 0dabf51, which looks like a lot but it is a bunch of very repetitive, search and replace operations. I think the changes are complete as far as the API but more work is needed internally -- you should care only if you were submitting patches though. All this is still in dev only.

Another set of changes that you should be aware of concerns the package options section of the API. There used to be one function per option, it was getting unwieldy as I was preparing to expand the set of options, so now there is one setter and one getter for all options, with an additional argument. The new options are still unused.

Thanks for bearing with me and with the vagaries of a new project and do not hesitate to contact me if anything seems amiss.

support general partitioners and comparators

advanced but important hadoop features, one can't work around their unavailability. Possible approach to do this is to create java classes that start an R server and pass an R expression to it to eval. See JRI.

typed bytes reader efficiency issue

the reader is not efficient when deserializing long lists (lists in typed bytes jargon) There's a comment in the code to mark the exact trouble spot, Use same technique as for the reader and from.dfs to avoid quadratic append bug.

map only jobs

I thought omitting the reducer was enough. Not so. One has to specify -D mapred.reduce.tasks=0, which is possible in dev through the tuning parameters options but not recommended so we should probably provide a different mechanism. But it is not totally clear to me what happens when the reducer is not specified and the above option not set.

partial from.dfs

If a dataset is too big, from.dfs will fill up available RAM. Sometimes having the whole data set is not necessary, one is just interested in a sample. Add an option to do just that.

makefile type behavior

In very large computations, it's important to avoid recomputing what has not changed from the last time. It would be nice to support this in rmr or an add-on package. The steps I envision are: compute a unique filename from the args of a mapreduce job to use as output path; add logic to check for the mod date of inputs and outputs (or use CRCs if dates are unreliable) and trigger re-computation only as needed; define an input type or a package option that would trigger this behavior in a complex job without having to modify the job, e.g. we have linear.least.squares function that triggers multiple jobs written before this makefile-like feature was even discussed. Can we switch on the makefile-like behavior without any internal modification? One possibility is a decorator type solution, see addMemoization in R.cache package for a similar approach.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.