Code Monkey home page Code Monkey logo

pangool's People

Contributors

amuniz avatar dongpf avatar epalace avatar ivanprado avatar pereferrera avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

pangool's Issues

TestSolrOutputFormat test can't run succussfully

Hi, @pereferrera
solr data index files were output in /src/test/resources/solr-es/tmp/hadoop-$user/mapred/local/solr_attempt_local_0001_m_000000_0.1/ folder, not in expected out-com.datasalt.pangool.solr.TestSolrOutputFormat/part-00000.
This issue caused all assertTrue(new File(OUTPUT + "/part-00000/data/index").exists()); asserts fail.

It looks strange to create files in HDFS using a path from disk

I ran the TopicalWordCount example and encountered the following problem. Apparently I didn't have sufficient privileges to create files somewhere, which is getting from "hadoop.tmp.dir" in the code(InstancesDistributor.java) if not set. However, the default of "hadoop.tmp.dir“ is "/tmp/hadoop-${user.name}", which is a path from disk.
Though it works well after I set this param, but.....


Exception in thread "main" org.apache.hadoop.security.AccessControlException: org.apache.hadoop.security.AccessControlException: Permission denied: user=bjdata, access=WRITE, inode="home":hadoop:supergroup:rwxr-xr-x
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27)
at java.lang.reflect.Constructor.newInstance(Constructor.java:513)
at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:96)
at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:58)
at org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.(DFSClient.java:2836)
at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:500)
at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:206)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:484)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:465)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:372)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:364)
at com.datasalt.pangool.utils.InstancesDistributor.distribute(InstancesDistributor.java:77)
at com.datasalt.pangool.tuplemr.TupleMRBuilder.createJob(TupleMRBuilder.java:240)
at com.mediav.pangoolCase.TopicalWordCount.run(TopicalWordCount.java:102)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
at com.mediav.pangoolCase.TopicalWordCount.main(TopicalWordCount.java:113)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.RunJar.main(RunJar.java:156)

work with hadoop 0.23 ?

i try to work pangool on hadoop 0.23,but it not work well。the unit test has error。

the logs:

java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.JobContext, but class was expected
at com.datasalt.pangool.tuplemr.mapred.lib.output.ProxyOutputFormat.createOutputFormatIfNeeded(ProxyOutputFormat.java:89)
at com.datasalt.pangool.tuplemr.mapred.lib.output.ProxyOutputFormat.checkOutputSpecs(ProxyOutputFormat.java:79)
at org.apache.hadoop.mapreduce.JobSubmitter.checkSpecs(JobSubmitter.java:417)
at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:332)
at org.apache.hadoop.mapreduce.Job$11.run(Job.java:1218)
at org.apache.hadoop.mapreduce.Job$11.run(Job.java:1215)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1212)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1215)
at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1236)
at com.shicheng.yarn.WordCountPg.run(WordCountPg.java:108)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:69)
at com.shicheng.yarn.WordCountPgTest.test(WordCountPgTest.java:36)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28)
at org.junit.runners.BlockJUnit4ClassRunner.runNotIgnored(BlockJUnit4ClassRunner.java:79)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:71)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:49)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:193)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:52)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:191)

Improve MapOnlyJobBuilder

Currently it lacks some functionality that TupleMRBuilder has.

Ideally we shouldn't be replicating code. MapOnlyJoBuilder is a nice minimal API but maybe it could be developed on top of TupleMRBuilder. Then, TupleMRBuilder would need to be adapted in order to support map-only Jobs.

Seamlessly integration with SOLR indexing

Provide better integration with SOLR indexing than that of Hadoop's SOLR-1301 . The idea es that we can leverage Tuples for indexing them directly without DocumentConverters.

Additionally, test that with Pangool's Multiple Outputs we can index more than one kind of index in the same Job (which is not possible in Hadoop).

"Streams" or different group-by's

As pointed out by Alexei: https://groups.google.com/forum/#!topic/pangool-user/iWf3BODBI9o

It would be nice to have the possibility of specifying different group bys for the same Pangool Job. Pig seems to have this possibility built-in in its optimizer. Even though I'm not sure if this is always the most efficient way of executing several groupBy's over the same input, it makes sense that Pangool supports such advanced usages because the philosophy of Pangool is to make "low-level MapReduce easier". If there are low-level use cases that we don't natively support I think it is worth taking a look and seeing how reasonable that is to implement.

Alexei proposes an API based on declaring "Streams", which would be quite different to the current API where you have to specify intermediate schemas. At first sight I wouldn't recommend such an API revamp, because this is still a particular use case and it doesn't make sense to me to make it so "first class".

Here we can follow up a discussion on this. I have been thinking on it and I have a few ideas on the matter:

  • First of all it seems the concept of "stream id" != "intermediate schema id" is needed as one would be able to "group by" in different ways the same intermediate schema. So we have a 1 - N relation between intermediate schema and stream id, and for N intermediate schemas we may have M stream ids where M >= N.
  • To keep with the same IO efficiency, Pangool shouldn't add any extra integer for the stream id. What is now used as schemaId would be called "stream id" internally, and by default every intermediate schema is assigned one "stream id", like now. So now we have the particular case that N = M, but we would make it more general.
  • The API could remain exactly as it is, to be backwards compatible. The groupBy would be the "common group by". If specified, nothing else can be done: the Job is only grouped by this criteria. But, we could add a second option with the following new method in TupleMRBuilder:
addGroupBy(groupBy)
  .withSchema(schema1);

So the user can either act as now (use a common group by) or add K particular groupBys to the same Job, each associated with one or more Schemas:

addGroupBy(groupBy)
  .withSchema(schema1)
  .withSchema(schema2);

The Pangool Mapper would check this and emit as many Tuples as needed internally if one schema has more than one groupBy, assigning different stream ids.

  • Serialization / Deserialization: It seems to me (even though I haven't taken a look that far) that Pangool is assuming always the presence of a "common schema". After reading the common schema, it knows the (to be called stream id) which makes it able to deserialize the particular part. Changes in the code would need to be made to avoid the need of having a common schema, thus having the stream id sometimes be the first field serialized and deserialized.
  • Comparator: It looks like the SortComparator wouldn't need to be touched that much, just to change it so that the schemaId used to compare would be the stream id.
  • Partitioner: The Partitioner should be able to partition more than one stream id into the same partition, for instance by taking the minimum stream id within a group. In the above example all tuples emitted with schema1 and schema2 in the particular groupBy would be partitioned in the same partition, even though they would be serialized with different "stream ids".
  • Reducer: By default, this method would delegate all groupings to the reducer specified in setTupleReducer(). This is fine, since Pangool reducers have a Tuple as key, so every key may be different even in the same Reducer. But it might be a bit challenging to "detect" with some if / else what groupings are we receiving, so it makes sense to have per-groupBy reducers:
addGroupBy(groupBy)
  .withSchema(schema1)
  .withSchema(schema2)
  .withReducer(reducer);

So the Pangool Reducer would instantiate many reducers in this case and delegate to one or another depending on the "stream id". If this methods are used, then the TupleMRBuilder won't complain if setTupleReducer() hasn't been called. If both things are called, the most particular thing wins: for schemas that have a particular reducer associated, this would be the one which would be called, for the rest the "general" reducer would be called.

  • Order by: Each schema within a groupBy could be sorted in different ways. Therefore the withSchema() method should allow specifying an order by:
addGroupBy(groupBy)
  .withSchema(schema1, orderBy1)
  .withSchema(schema2, orderBy2)
  .withReducer(reducer);

This all seems a bit complex, but it is actually challenging to be as backwards compatible as possible, and remain coherent with the whole idea of the API and Pangool.

That would be my two cents as to how this could be implemented.

Closing the loop of serialization (lists, sets, maps)

Something which remains in my mind is the possibility of closing the loop and making Pangool have all the convenient serialization features, which remain to be : Lists and Maps (being Set a particular case of a Map).

Currently it is possible to serialize them using Avro but the integration code required doesn't look very nice. Pangool could add a wrapper to make this a little nicer - delegating the serialization to Avro - but then it wouldn't be possible to serialize Lists of arbitrary Objects.

While it is true that it wasn't the main idea of Pangool to make it fully serialization-built-in functional, there is no reason why new features which pay off, are easy to implement and make sense with the whole codebase shouldn't be implemented.

What's more, taking a look at the current code, it doesn't seem difficult to add proper built-in serialization support for (typed) Lists or Maps. A custom FieldSerialization could be implemented, which writes the list length first and calls the delegate code in SimpleTupleSerializer for serializing the list typed values.

This would allow for arbitrary typed lists, the type defined by a Pangool's Field (so the method in Field would be something like:

public static Field createListField(String name, Field type)

Therefore it would be possible to serialize lists of lists of lists. Or lists of Tuples. Or anything which is possible due to this recursion.

Opened questions would then be:

  • How to deal efficiently with null values.
  • Whether heterogeneous lists should be considered at all or discarded (serialization would be then inefficient and complex).

Add getString() method to Tuple

Like in Storm:

String word = tuple.getString(0);

Such a method could hide the need for converting from UTF8 to Java String.

TupleOutputFormat throws NullPointer when there's a missing field in the tuple

java.lang.NullPointerException
at com.datasalt.pangool.utils.TupleToAvroRecordConverter.toRecord(TupleToAvroRecordConverter.java:95)
at com.datasalt.pangool.tuplemr.mapred.lib.output.TupleOutputFormat$TupleRecordWriter.write(TupleOutputFormat.java:84)
at com.datasalt.pangool.tuplemr.mapred.lib.output.TupleOutputFormat$TupleRecordWriter.write(TupleOutputFormat.java:63)
at com.datasalt.pangool.tuplemr.mapred.lib.output.PangoolMultipleOutputs.write(PangoolMultipleOutputs.java:364)
at com.datasalt.pangool.tuplemr.mapred.lib.output.PangoolMultipleOutputs.write(PangoolMultipleOutputs.java:340)
at com.datasalt.pangool.tuplemr.MultipleOutputsCollector.write(MultipleOutputsCollector.java:46)

AbstractHadoopTestLibrary.withTupleOutput bug

AbstractHadoopTestLibrary.withTupleOutput keeps a map with all the tuples from TupleOutputFormat but the same instance is reused, so all the entries keep the content from the last tuple. We need to modify TupleInputReader in order to accept a new Tuple instance in every nextKeyValue() call or create a separated method like : nextTuple(ITuple tuple)

The TestTopicFingerprint.test() can't run successfully!

I ran the test and got errors. I debuged and found it failed in FileOutputCommitter.moveTaskOutputs, but I still don't know the fundamental reason.

Here are my testcase logs:

13/01/24 01:06:05 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
13/01/24 01:06:05 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
13/01/24 01:06:05 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
13/01/24 01:06:05 INFO input.FileInputFormat: Total input paths to process : 1
13/01/24 01:06:06 INFO mapred.JobClient: Running job: job_local_0001
13/01/24 01:06:06 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
13/01/24 01:06:06 INFO input.FileInputFormat: Total input paths to process : 1
13/01/24 01:06:06 INFO mapred.MapTask: io.sort.mb = 100
13/01/24 01:06:06 INFO mapred.MapTask: data buffer = 79691776/99614720
13/01/24 01:06:06 INFO mapred.MapTask: record buffer = 262144/327680
13/01/24 01:06:06 INFO input.DelegatingMapper: [profile] Got input split. Going to look at DC.
13/01/24 01:06:06 INFO input.DelegatingMapper: [profile] Finished. Calling run() on delegate.
13/01/24 01:06:06 INFO mapred.MapTask: Starting flush of map output
13/01/24 01:06:06 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
13/01/24 01:06:06 INFO mapred.MapTask: Finished spill 0
13/01/24 01:06:06 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
13/01/24 01:06:06 INFO mapred.LocalJobRunner:
13/01/24 01:06:06 INFO mapred.TaskRunner: Task attempt_local_0001_m_000000_0 is allowed to commit now
13/01/24 01:06:06 WARN mapred.TaskRunner: Failure committing: java.io.IOException: Failed to save output of task: attempt_local_0001_m_000000_0
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:154)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:165)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:165)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:165)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:165)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:118)
at com.datasalt.pangool.tuplemr.mapred.lib.output.ProxyOutputFormat$ProxyOutputCommitter.commitTask(ProxyOutputFormat.java:157)
at org.apache.hadoop.mapred.Task.commit(Task.java:779)
at org.apache.hadoop.mapred.Task.done(Task.java:691)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:309)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:177)

13/01/24 01:06:06 WARN mapred.LocalJobRunner: job_local_0001
java.io.IOException: Failed to save output of task: attempt_local_0001_m_000000_0
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:154)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:165)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:165)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:165)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:165)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:118)
at com.datasalt.pangool.tuplemr.mapred.lib.output.ProxyOutputFormat$ProxyOutputCommitter.commitTask(ProxyOutputFormat.java:157)
at org.apache.hadoop.mapred.Task.commit(Task.java:779)
at org.apache.hadoop.mapred.Task.done(Task.java:691)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:309)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:177)
13/01/24 01:06:07 INFO mapred.JobClient: map 0% reduce 0%
13/01/24 01:06:07 INFO mapred.JobClient: Job complete: job_local_0001
13/01/24 01:06:07 INFO mapred.JobClient: Counters: 8
13/01/24 01:06:07 INFO mapred.JobClient: FileSystemCounters
13/01/24 01:06:07 INFO mapred.JobClient: FILE_BYTES_READ=25861
13/01/24 01:06:07 INFO mapred.JobClient: FILE_BYTES_WRITTEN=41068
13/01/24 01:06:07 INFO mapred.JobClient: Map-Reduce Framework
13/01/24 01:06:07 INFO mapred.JobClient: Combine output records=6
13/01/24 01:06:07 INFO mapred.JobClient: Map input records=6
13/01/24 01:06:07 INFO mapred.JobClient: Spilled Records=6
13/01/24 01:06:07 INFO mapred.JobClient: Map output bytes=24
13/01/24 01:06:07 INFO mapred.JobClient: Map output records=6
13/01/24 01:06:07 INFO mapred.JobClient: Combine input records=6

HCatalog failing with non string partition columns

HCatalogInputFormat always returns strings for partition columns, even when these partitions are declared as other types (i.e. INT). In this case, Pangool fails as it does not automatically converts from String to the target type.

Is that an HCatalog Bug? Is that a problem of our integration?

Update pangool.net and javadoc

Documentation (tutorial, etc) seems to be a bit old by now. Also, as we don't automatize uploading new javadocs, they seem to be also old.

We should specifically add a new section for illustrating TupleTextInputFormat which is a very powerful tool in Pangool.

Anything else that comes into mind should be added into this ticket.

NullPointerException in RollapReducer when reduce size is 0

The following exception is thrown when an empty group reaches a RollupReducer:

java.lang.NullPointerException
at com.datasalt.pangool.tuplemr.mapred.RollupReducer.run(RollupReducer.java:140)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:662)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:425)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
at org.apache.hadoop.mapred.Child.main(Child.java:249)

That only affects to the Rollup functionality.

Possible bug in rollup with Hadoop 0.20.2

I suspect there is some strange behavior when using Rollup and Hadoop 0.20.2
I have a Job that uses Rollup and that behaves oddly only when used with Hadoop 0.20.2. These counters shows it:

Map output records=15088
Reduce input records=12378

There shouldn't be a difference between reduce input records and map output records in this case. When using Hadoop 0.20.205.0, everything is fine:

Map output records=15038
Reduce input records=15038

MOUT not properly handled

Current MOUT is not acting as a total OutputFormat. MOUT life cycle is managed by calls in the methods setup(), write() and cleanup() from mappers and reducers. Ant the commit is handled at the cleanup() method. That is not right. It should be handled by an OutputCommiter.

By the other hand, the responsible for commiting the results, including the MOUT results, is the principal OutputFormat. That means that MOUT would only work if the OutputFormat extends FileOutputFormat, which is not right. MOUT should work independently of the principal OutputFormat.

The solution should be capturing the principal OutputFormat signals by using some kind of ProxyOutputFormta and dealing with these signals to the different OutputFormat, the principal and the MOUT. Also we should have some kind of ProxyOutputCommiter, that should send signals properly to both output systems.

Deserializer with buffer size

Currently we are using the Hadoop Deserializer class for the custom Pangool serialization: http://pangool.net/userguide/custom_serialization.html

But some serialization mechanisms needs the buffer size in order to deserialize properly (i.e. ProtocolBuffers). With current API the serialization needs to write the buffer size to the disk... But that is something we have already done with Pangool!. So we would be writing the buffer size twice.

That can be avoided by passing the buffer size to the deserialize() method. That would mean that we would have to stop using the Hadoop Deserialize class and use our own new interface for that.

Backwards compatibility for TupleReader and TupleMRBuilder

Currently one can read any TupleFile and process any output from a TupleMRBuilder without needing to care about the Schema, which is good. But if you have a set of Schemas and evolve them, for example by adding one or two fields, and you want to use these instead of the ones that are saved in the TupleFiles, you have to write custom business logic for copying the input Tuple to another Tuple (with the new Schema).

The idea would be to allow the user to provide an optional input Schema in TupleReader or TupleMRBuilder, so that if a Schema is provided then the Tuple is instantiated with the user's Schema instead of the file schema, so that maybe not all the fields are read, or some new fields are not written (in that case we have to think of a reasonable API for providing default values) - also note that since #18 it is now possible to serialize nulls in a Tuple.

Custom serialization for Tuple OBJECT fields

This should allow to use stateful ser/deserializers for OBJECT fields.
Currently the serialization used for OBJECT fields is defined statically in Hadoop's property "io.serialization", so all the fields with the same class share the same stateless serializer.

This new feature must allow that , for instance, multiple Avro field withins a tuple be ser/deser with its particular schemas.

Mutator class for evolving Schemas conveniently

For the next Pangool release I'm working on an utility called "Mutator" that allows to conveniently evolve Schemas. Currently it has these methods:

minusFields(Schema schema, String... minusFields) - returns a Schema like the argument one minus the fields passed as second parameter.

subSetOf(Schema schema, String... subSetFields) - returns a Schema like the argument one containing only the fields passed as second parameter.

superSetOf(Schema schema, Field... newFields) - returns a Schema like the argument one but adding new Fields to it.

jointSchema(Schema leftSchema, Schema rightSchema) - returns a joint Schema of two Schemas, where the left one has priority over the right (first all fields from the left are added, then the ones in the right that are not in left).

Shout if you have more suggestions or feedback.

Instance files should be deleted after Job completion

Pangool serializes everything (InputFormat, Combiner, Mapper, Reducer, OutputFormat...) so that we can use instances instead of classes. We try to use the DistributedCache when possible (it is no possible for InputFormat and OutputFormat). The current approach works fine, however, instance files may be accumulated in "hadoop.tmp.dir" and, if not deleted, the cost of loading one increases. There are two things that must be revised:

  1. Deleting instance files after Job finalization, if it's possible
  2. DCUtils: if(path.toString().endsWith(filePostFix)) -> Doesn't look like we need to do a postfix checking here. Getting the file straight should be possible and so the cost would be constant (we don't need to iterate over all instance files).
  3. Using the local files from DistributedCache when possible, through a boolean parameter. So InputFormat and OutputFormat will pass "false" and Combiner, Mapper and Reducer "true".

Currently the main bottleneck happens in the Combiner, if there are a lot of files. The cost of loading the instance for everything else is negligible. Which makes me thing of another idea:

  • We could implement a JVM-local cache for Combiner instances. A static HashMap, for instance. Because Combiners are executed in the same JVM than Mappers, caching the object instances would allow us to avoid looking for instance cache for every Combiner call. Does it make sense?

In any case, the other things should be done as well.

MultipleOutputs is very slow when trying to use an inexistent multiple output

If the user is trying to write to a multiple outputs that hasn't been configured, and the job hasn't a default multiple output defined, successive calls will become very slow. The reason is that, before checking if the multiple output name is valid, a series of expensive actions are performed (reflection, output format instantiation, ...). This can be seen in class PangoolMultipleOutputs, method getRecordWriter().

Thrift using different serialization protocols

Right now Pangool is serializing thrift using TBinaryProtocol. But could be interesting to use TCompactProtocol, which uses less space. The idea is to make the selection of the protocol configurable.

Enhance multipleInputs to allow more than one mapping for the same input path

An example use case is a Parquet file where we select different sets of columns and assign each set to a different Mapper. Right now, Pangool overrides the input assignment for the same Path, silently keeping only the last one. A small change is needed for Pangool to be able to delegate to multiple mappers and input formats for the same Path.

Serialization refactoring

A small refactoring in serialization code should allow the possibility of adding stateful serialization for Tuples inside Tuples.

Create a generic scalable CrossProduct primitive

As a first attempt create an example of CrossProduct with two sources .
Then build a generic N-CrossProduct task that is scalable and assumes that not all the dataset can fit in memory, so multiple Job phases will be required.

Support for nulls in fields

Adding support for nulls. Some schema fields could be mark as nullable. Those fields supports nulls on the field. The idea is to implement an efficient serialization and comparation, that:

  • Efficiently serializes null information using an small amount of bits
  • Does not affect the current Pangool efficiency when none of the fields is nullable.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.