Code Monkey home page Code Monkey logo

dataflowjavasdk's Introduction

Google Cloud Dataflow SDK for Java

Google Cloud Dataflow is a service for executing Apache Beam pipelines on Google Cloud Platform.

Getting Started

We moved to Apache Beam!

Apache Beam Java SDK and the code development moved to the Apache Beam repo.

If you want to contribute to the project (please do!) use this Apache Beam contributor's guide

Contact Us

We welcome all usage-related questions on Stack Overflow tagged with google-cloud-dataflow.

Please use the issue tracker on Apache JIRA to report any bugs, comments or questions regarding SDK development.

More Information

Apache, Apache Beam and the orange letter B logo are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries.

dataflowjavasdk's People

Contributors

amygdala avatar azurezyq avatar bjchambers avatar bo-yuan avatar chamikaramj avatar charlesccychen avatar christinagreer avatar davorbonaci avatar dhalperi avatar dpmills avatar francesperry avatar huygaa11 avatar ianzhou1 avatar jasonkuster avatar jkff avatar kennknowles avatar laraschmidt avatar lukecwik avatar malo-denielou avatar ml8 avatar pabloem avatar peihe avatar reuvenlax avatar robertwb avatar sammcveety avatar scwhittle avatar slavachernyak avatar swegner avatar tgroh avatar vikkyrk avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

dataflowjavasdk's Issues

CoGbkResultCoder and UnionCoder access

I am attempting to implement a thin Scala wrapper around Cloud Dataflow. So far I've been finding that I need to set Coders explicitly for each of my transformations, and have a workable solution for most operations.

I have just moved on to adding joins, and have come across the following problem:

I believe the appropriate coder for a join result is of type 'CoGbkResultCoder'. Both 'of' functions for creating this coder require a UnionCoder to encapsulate the join value types (the one taking a list of Coders assumes that the list has a single element of type UnionCoder) - but UnionCoder is package private and inaccessible to my app.

In my particular instance, I am encoding the values with Kryo so anticipate I would need to supply a coder even if this was implemented in Java. Am I going about this the wrong way? Is there another way to set the coder for a join that doesn't require UnionCoder?

Expose System Watermark

We would love to do alerting if certain jobs / transform fall behind or trigger external work based on where the processing time is in the system. Ideally it would be nice if this was exposed via cloud metrics, but I would accept anything that allowed me to get my hands on the timestamp directly.

If there is a simple enough way to achieve this with triggers I am open to it, but I don't want the data in the pane, I just want to know the progress.

FileBasedSource does not work on Windows

Apologies if this is not the right place to submit issues.

The following code

Path path = Files.createTempFile("test", ".txt");
Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.create());
pipeline.apply(TextIO.Read.from(path.toString()));
pipeline.run();

throws an exception on a Windows client:

java.lang.RuntimeException: Failed to read from source: com.google.cloud.dataflow.sdk.runners.worker.TextSource@644baf4a
...
Caused by: java.io.IOException: No match for file pattern 'C:\Users\jroy\AppData\Local\Temp\test8262931969830113037.txt'
at com.google.cloud.dataflow.sdk.runners.worker.FileBasedSource.iterator(FileBasedSource.java:100)

The file exists, so this exception is unexpected.

The bug is in FileIOChannelFactory::match(), which improperly appends the value returned by File::getAbsolutePath() to a glob expression. On Windows, getAbsolutePath() returns the path with backslash path separators, which are interpreted as escape characters by FileSystem::getPathMatcher(). So no matches are returned.

Some pipelines with many sinks may OOM

A simple test pipeline with a Partition followed by assigning a Sink to each output may lead to OutOfMemoryExceptions if the Sinks have large byte buffers. Two examples are BigQueryIO (in batch mode) and TextIO (when writing to GCS).

Unfortunately, using a machine type that has proportionally more memory and cores will not work, because the memory usage scales as the product of (# worker threads / machine) * (# sinks / thread) * (buffer size / sink), and by default the # worker threads equals the number of cores.

We are working on understanding and reducing the buffer space used, but here are some work-arounds.

  1. increasing the amount of memory available per worker harness thread, by using high memory (highmem) instances;
  2. increasing the amount of memory available per worker harness thread, by limiting the number of threads with --numberOfWorkerHarnessThreads pipeline option;
  3. reducing the amount of memory consumed per worker thread, by using fewer partitions and hence fewer sinks, so that they together consume less memory;
  4. using the --gcsUploadBufferSizeBytes pipeline option to make the buffer size used in uploads to GCS be smaller than its default value of 64MiB. (Note that there may be minor performance degradation if you make this buffer much smaller than 8MiB, although any size that is a multiple of 256KiB will work.)

Regarding CheckStyle-Voilation

Hi,

I installed apache-mvn-3.1.1,added into my classpath,cloned the GCDSDK and than I ran the command "mvn clean install" but it results in checkstyle-voilations...

Please suggest.

Regards,
Shubh

image

Write transform unsuitable for batched flows that use window functions.

To reproduce, create any batch pipeline that uses a windowing function and any output method that uses Write to write the data out. The write results are output without a timestamp, ultimately causing an exception to be thrown.

Caused by: java.lang.RuntimeException: java.lang.UnsupportedOperationException: WindowFn attemped to access input timestamp when none was available
    at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnContext.makeWindowedValue(DoFnRunner.java:298)
    at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnContext.outputWindowedValue(DoFnRunner.java:309)
    at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnContext.output(DoFnRunner.java:351)
    at com.google.cloud.dataflow.sdk.transforms.Write$Bound$2.finishBundle(Write.java:180)

This unfortunate and prevents a lot of reuse of pipeline components. Fixing it would also have the added benefit of making the transform suitable for streaming (I think).

The difficulty here is of course, what timestamp should the the write result have? I think the current processing time would work just fine, and ultimately I don't think it really matters as it isn't used anywhere.

Thoughts? Feelings?

Ultimately I can work around this by either a) not using Write for anything b) in batch mode explicitly reifying timestamps into grouping keys and dropping windowing from all batch processing. a) is definitely more palatable than b), but I would prefer a third path.

Native support for conditional iteration

There are a variety of use cases which would benefit from native support for conditional iteration.

For instance, http://stackoverflow.com/questions/31654421/conditional-iterations-in-google-cloud-dataflow/31659923?noredirect=1#comment51264604_31659923 asks about being able to write a loop like the following:

PCollection data  = ...
while(needsMoreWork(data)) {
  data = doAStep(data)
}

If there are specific use cases please let us know the details. In the future we will use this issue to post progress updates.

Document valid options for setWorkerDiskType

Seems like the possible options are

"compute.googleapis.com/projects//zones//diskTypes/pd-ssd"
"compute.googleapis.com/projects//zones//diskTypes/pd-standard"

(and not just "pd-ssd" or "pd-standard"). Perhaps better to make those an enum instead of magic strings?

Mvn Install Tests fail on Mac OSX with non standard Region Preferences

When running "mvn clean install" to install DataFlowJavaSDK on a MacBook with OSX Mavericks and non standard settings in the Language and Region Preferences, the installation fails with the following Test Errors:

Failed tests:
CombineValuesFnTest.testCombineValuesFnExtract:321 arrays first differed at element [0]; expected:<[ValueInGlobalWindow: KV(a, 4.5)]> but was:<[ValueInGlobalWindow: KV(a, 4,5)]>
CombineValuesFnTest.testCombineValuesFnAll:240 arrays first differed at element [0]; expected:<[ValueInGlobalWindow: KV(a, 6.0)]> but was:<[ValueInGlobalWindow: KV(a, 6,0)]>
LoggingMediaHttpUploaderProgressListenerTest.testLoggingProgressAfterSixtySeconds:57
Argument(s) are different! Wanted:
mockLogger.debug(
"Uploading: NAME Average Rate: 0.167 MiB/s, Current Rate: 0.167 MiB/s, Total: 10.000 MiB"
);
-> at com.google.cloud.dataflow.sdk.util.gcsio.LoggingMediaHttpUploaderProgressListenerTest.testLoggingProgressAfterSixtySeconds(LoggingMediaHttpUploaderProgressListenerTest.java:57)
Actual invocation has different arguments:
mockLogger.debug(
"Uploading: NAME Average Rate: 0,167 MiB/s, Current Rate: 0,167 MiB/s, Total: 10,000 MiB"
);
-> at com.google.cloud.dataflow.sdk.util.gcsio.LoggingMediaHttpUploaderProgressListener.progressChanged(LoggingMediaHttpUploaderProgressListener.java:78)

Resetting the Language and Region Settings to Default fixes the problem.

Listing directories with GcsUtil

I need to get a list of directories in GCS. GcsUtil.expand skips directories

https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/GcsUtil.java#L187

Is there a current solution for listing directories?

If not, I'm happy to make a PR, but I don't know how you'd want it designed -- (a) a Boolean argument for expand called something like omitDirectories, (b) a new method expandDirectories, (c) remove the directory skip check but leave everything else unchanged [which will probably break other users' workflows].

Enabling projection on columns

As you pay per processed column in BigQuery, projection should be implemented in the BigQuery reader. This might significantly reduce costs when working with large rows, when you only need a small subset of columns.

Is the immutable requirement for ParDo really necessary?

Referring to this commit. Is the immutable requirement really necessary? Some data structures, i.e. PriorityQueue, relies on mutable operation for performance reasons. And since users always map their data to such structures and fold in place, there's little concern of incorrectness.

Unbounded file sink

A common request is the ability in streaming pipelines to publish data to a file per window, like already exists for BigQueryIO.

We should add a ParDo-based example for this, and we also need to supply a custom file source that works in streaming mode (TextIO does not).

A few of the subtleties that come from streaming operation:

  • For fault tolerance, each bundle is a different unit of isolation. Unlike in batch, we can't just retry the entire step if any bundle fails. So we can't simply append to the same destination file in different bundles.

    We need to create separate temporary files for each bundle, marking them as permanent/successful in an idempotent way in a subsequent step.

  • If we have many elements, many workers, or many windows concurrently open, this could end up creating lots of bundles.

    GroupByKey and using large windows will enable us to have fewer bundles, but more elements per key-window group. Design needs to scale to large values.

Add utility function for creating temp files, clarify docs

I found out the hard way that /tmp is not mounted in the docker containers. (http://stackoverflow.com/questions/33052578/temp-files-in-google-cloud-dataflow) Would be great if a utility class provided methods like File.createTempFile and Files.createTempDirectory that return a usable location. This would also be good to document on cloud.google.com/dataflow (let me know if I missed this somewhere).

Happy to make a PR if the maintainers provide some input on where the files should be. It seems a bit unusual to save them in a logs directory per the SO answer.

Allow for user controlled unique id in BigQuery Write

If we do a streaming write, the write connector will generate unique ids and tag rows transparently to avoid duplication. If the user knows something about their data like "the transaction-id column is unique across the table", it would be nice to be able to use that instead of the generated ids.

NullPointerException in StreamingModeExecutionContext

Hi there,

I've go a streaming pipeline (DataflowPipelineRunner) that polls an HTTP endpoint, performs some manipulation and stores the results in a BigQuery table. Based on some rudimentary logging I can see the data is coming down the pipeline. It then encounters the following exception:

(b359ca736e4f8d93): Exception: java.lang.NullPointerException
 com.google.cloud.dataflow.sdk.runners.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:246)
 com.google.cloud.dataflow.sdk.runners.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:541)
 com.google.cloud.dataflow.sdk.runners.worker.StreamingDataflowWorker.access$300(StreamingDataflowWorker.java:83)
 com.google.cloud.dataflow.sdk.runners.worker.StreamingDataflowWorker$5.run(StreamingDataflowWorker.java:435) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 java.lang.Thread.run(Thread.java:745)

I'm using version 1.2.0 of the Dataflow Java SDK.

Any help would be greatly appreciated, please let me know if I can provide more details.

withNamespace() is missing in the DatastoreIO class

I created a sample Cloud Dataflow Java project using dataflow Eclipse plugin, I wanted to create a pipeline for DatastoreIO with namespace support, but i couldn't find the withNamespace() in the DatastoreIO class.

Maven compilation errors caused by out of date Javadoc references.

Attempting to compile the SDK currently fails with an error during javadoc generation. The first two errors seemed to be caused by the use of a relative path to WindowingInternals while the last one is because DoFn.KeyedState was removed recently.

Relevant log:
[ERROR] /home/cal/smplbio/DataflowJavaSDK/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Trigger.java:84: error: reference not found
[ERROR] * it may persist book-keeping information to {@link WindowingInternals.KeyedState},
[ERROR] ^
[ERROR] /home/cal/smplbio/DataflowJavaSDK/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Trigger.java:96: error: reference not found
[ERROR] * {@link WindowingInternals.KeyedState} before the callback returns.
[ERROR] ^
[ERROR] /home/cal/smplbio/DataflowJavaSDK/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/CodedTupleTagMap.java:26: error: reference not found
[ERROR] * {@link com.google.cloud.dataflow.sdk.transforms.DoFn.KeyedState#lookup(java.lang.Iterable)}.
[ERROR] ^

Ran out of memory running WordCount on large file

If I run (using DirectPipelineRunner) the WordCount example with an input file of 800mb, the JVM runs out of memory:

[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.1:java (default-cli) on project google-cloud-dataflow-java-examples-all: An exception occured while executing the Java class. null: InvocationTargetException: Java heap space -> [Help 1]

This seems to imply that we can only process input data that is smaller than the heap available to the VM. Is that correct? If so, how can this process big data?

[question] Bundle relationship to Windows / Grouping keys

I am currently working on a Cassandra Connector, at the moment it's just the write aspect of it. One thing that is nice / interesting about Cassandra is that it will resolve multiple inserts to the same partition key to a single write. If there was some way to force a bundles to have the same partition key somehow it could enable some nice optimisations: for streaming writes you could use cql batches to nice effect and in batch mode you could do some of the tricks that the datastax spark connector does and grab the token ring and send the data to the correct nodes directly without a coordinator.

Is there any way to reason about these types of things? I have been reading all the impls in the SDK for BigQuery / DataStore but this type of thing doesn't seem to be in there.

PubsubIO deletes context

A pubsub message, when consumed by a custom subscription, has more information than just the payload:
{"message": {"attributes": {}, "data": "some-base64-data", "message_id": "84531668664"}, "subscription": "projects//subscriptions/"}

Specifically attributes might be something you'd want. There should be a codec that handles this.

pseudo-code:

class PubsubMessage {
  String subscription;
  String json; // or some other type
  Map<String, String> attributes; // or some other type...
}

Expose WindowFn attributes

Hi,

I'm implementing a PipelineRunner that runs Google Dataflow programs as Apache Flink Streaming jobs.

To translate the windowing functions I need to access the attributes of the specific WindowFn subclasses, e.g. this one.

Is there a reason for not exposing those?

BigQueryIO.Read handles TIMESTAMP fields inconsistently between local and cloud

When using BigQueryIO.Read to read TIMESTAMP-typed fields, the results come back differently when running locally and when running in Google cloud:

  • locally the data comes back as string representation of floating point number (number of seconds since Unix Epoch)
  • in the cloud the data comes back as human readable timestamp

For example, I've created a csv file containing following data and loaded that into a table in big query:

$ cat timestamps.csv
1970-01-01 00:00:00.000 UTC
1970-01-01 00:00:00.010 UTC
1970-01-01 00:00:00.100 UTC
2015-04-30 12:34:56.789 UTC

$ bq load dataset.table_name timestamps.csv 'timestamp:timestamp'

Then I created the following trivial dataflow pipeline to read the data from BigQuery and dump it as text file:

public static void main(String[] args) {
      Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

      Pipeline p = Pipeline.create(options);
      p.apply(BigQueryIO.Read.from(options.getInput()))
              .apply(ParDo.of(new DoFn<TableRow, String>() {
                  private static final long serialVersionUID = 0;
                  @Override
                  public void processElement(ProcessContext c) throws Exception {
                      TableRow row = c.element();
                      String timestamp = (String) row.get("timestamp");
                      c.output(timestamp);
                  }
              }))
              .apply(TextIO.Write.named("myout").to(options.getOutput()));
      p.run();
  }
}

When running locally results come back as:
0.0
1.430397296789E9
0.1
0.01

When running in GCP, results come back as:
1970-01-01 00:00:00 UTC
1970-01-01 00:00:00.01 UTC
1970-01-01 00:00:00.1 UTC
2015-04-30 12:34:56.789 UTC

Note also how millisecond field may or may not be present, and number of digits is variable, all of which makes parsing this data more difficult than it needs to be.

Write to BigQuery with TABLE_DATE_RANGE

Hi,

I have a BigQuery streaming Write pipeline and I would like to have my writes go to different tables based on the current time/day of the write. This would take advantage of the TABLE_DATE_RANGE operator in BigQuery. It doesn't seem like this is possible with the current API. Is this an extension that can be considered?

edit:

I found some documentation on this in the source https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java#L156-L170. But it is out of date and doesn't seem to work (CalendarWindows.DaysWindows cannot be cast to BoundedWindow for example).

PubSub IO cannot specify custom timestamp

Hi,

I'm defining a pipeline to read logs exported from App Engine to PubSub using the Log Export feature. I wrote a function that parses the LogEntry. Each Log has its own timestamp and I would like to use that timestamp for downstream Windowing operations.

The problem I'm encountering is that PubsubIO attaches its own timestamp to the incoming data here and the timestamp I'm wanting to use from the LogEntry is earlier in time. Attempting to set the timestamp to that of the LogEntry results in an error:

java.lang.IllegalArgumentException: Cannot output with timestamp 2015-10-09T20:06:21.991Z. Output timestamps must be no earlier than the timestamp of the current input (2015-10-09T20:06:25.669Z) minus the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestmapSkew() Javadoc for details on changing the allowed skew.

So, I can change the allowed skew but what I would really like is to allow PubSub IO to optionally not add a timestamp so that we can always use the timestamp from the LogEntry without having to manually specify the skew.

Error "Could not obtain credential using gcloud: Cannot run program "gcloud": error=2, No such file or directory" running WordCount with DirectPipelineRunner

I've built DataflowJavaSDK successfully, but attempting to run the WordCount example using the syntax listed in the README:

mvn exec:java -pl examples -Dexec.mainClass=com.google.cloud.dataflow.examples.WordCount -Dexec.args="--runner=DirectPipelineRunner --output=foo"

Yields this error:

[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.1:java (default-cli) on project google-cloud-dataflow-java-examples-all: An exception occured while executing the Java class. null: InvocationTargetException: Could not obtain credential using gcloud: Cannot run program "gcloud": error=2, No such file or directory -> [Help 1]

No table sharding in BigQuery writer.

Given the big query model this severely limits the utility of streaming into BigQuery. I am going to hack up something over the next couple days, looking for some pointers.

I am guessing that the best thing to offer a template for people to take the window of the element being processed and use that as a template for a dataset name, and output to the table returned by the template function.

Given that there is already a handy static map of schemas, this doesn't seem like it would be too hard to add.

If this is successful and you consider this the right approach, would be happy to offer a PR.

EDIT: clearly just using a static map won't cut it as over a long running streaming app, this is will eventually OOM.

Many Datastore Namespaces

Following up on the following issue. Since the following issue is closed, so I am creating a new issue.
#64

I have many namespaces in the cloud datastore. The DatastoreWordCount example shows using the pipeline to read and write to one datastore namespace.
What should be done to read/write data to thousands of namespaces. Should we generate thousands
of pipelines in a loop for each namespace?
What is the recommended approach?

Local pusub runner

Currently it is not possible to run streaming pipelines with the direct runner if they involve a pubsub component. Local translator that created a simple publisher / pull subscriber would be very helpful.

Ability to specify name for MapElements.via transforms

I've got a pipeline comprised entirely of a series of .apply(MapElements.via(new SimpleFunctionX))s. Would be nice to be able to set the name property so this can be monitored with nicer names than "MapElements1," "MapElements2," ... maybe MapElements.via(sf).named("name")?

A bug related to the per-window table names.

I've tried to use the bellow example that there is in BigQueryIO.java:

PCollection quotes = ...

List fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("source").setType("STRING"));
fields.add(new TableFieldSchema().setName("quote").setType("STRING"));
TableSchema schema = new TableSchema().setFields(fields);

quotes.apply(BigQueryIO.Write
  .named("Write")
  .to("my-project:output.output_table")
  .withSchema(schema)
  .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));

And the bellow error has appeared:

Exception: java.lang.IllegalArgumentException: Table reference is not in [project_id]:[dataset_id].[table_id] format: my-project:output.output_table-[2015-07-24:00:00.000Z..2015-07-25T00:00:00.000Z)

This is actually incorrect, since it appears to generate strings that are not legal in BigQuery table names.

Transform does not have a stable unique name

After updating to dataflow 0.4.20150727 we've started getting this exception at runtime. It seems to happen nearly every time we apply any transform without explicitly giving it a name. Is this the intent?

This entirely reasonable code now results in an unintuitive runtime crash:

   @Test
    public void exampleFailure(){
        Pipeline p = TestPipeline.create();
        final PCollection<Integer> pInts1 = p.apply(Create.of(Arrays.asList(1, 2, 3)));
        final PCollection<Integer> pInts2 = p.apply(Create.of(Arrays.asList(1, 2, 3)));
        p.run();
    }
java.lang.IllegalStateException: Transform Create.Values2 does not have a stable unique name. This will prevent reloading of pipelines.
    at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:330)
    at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:253)
    at com.google.cloud.dataflow.sdk.values.PBegin.apply(PBegin.java:48)
    at com.google.cloud.dataflow.sdk.Pipeline.apply(Pipeline.java:137)
    at org.broadinstitute.hellbender.engine.dataflow.transforms.GetOverlappingReadsAndVariantsUnitTest.exampleFailure(GetOverlappingReadsAndVariantsUnitTest.java:63)

Adding some boilerplate fixes it

    @Test
    public void exampleFailure(){
        Pipeline p = TestPipeline.create();
        final PCollection<Integer> pInts1 = p.apply("Create.Values1",Create.of(Arrays.asList(1, 2, 3)));
        final PCollection<Integer> pInts2 = p.apply("Create.Values2",Create.of(Arrays.asList(1, 2, 3)));
        p.run();
    }

but this should be unnecessary since I gave it the same name it had already inferred for that transform.

Three questions:

  1. Is this a bug? If it's working as intended then the single parameter version of apply should really be removed since it's highly unsafe.
  2. Is it safe to use a randomly generated name?
    @Test
    public void exampleFailure(){
        Pipeline p = TestPipeline.create();
        final PCollection<Integer> pInts1 = p.apply(UUID.randomUUID().toString(),Create.of(Arrays.asList(1, 2, 3)));
        final PCollection<Integer> pInts2 = p.apply(UUID.randomUUID().toString(),Create.of(Arrays.asList(1, 2, 3)));
        p.run();
    }

I assume it's not because it says "stable". What's the danger though? If a randomly generated name isn't ok, is it what should we do for programmatically generated transforms? Is an incremented counter ok?

  1. Could you better define "stable", "unique", and "reload pipeline"? Is that globabally unique? Unique per pipeline run? What does it mean to reload a pipeline? Is that something that happens by magic behind the scenes or is that something we'd have to initiate manually?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.