Code Monkey home page Code Monkey logo

flink-connectors's Introduction

Pravega Flink Connectors

Build Status License Downloads Codecov

This repository implements connectors to read and write Pravega Streams with Apache Flink stream processing framework.

The connectors can be used to build end-to-end stream processing pipelines (see Samples) that use Pravega as the stream storage and message bus, and Apache Flink for computation over the streams.

Features & Highlights

  • Exactly-once processing guarantees for both Reader and Writer, supporting end-to-end exactly-once processing pipelines
  • Seamless integration with Flink's checkpoints and savepoints.
  • Parallel Readers and Writers supporting high throughput and low latency processing.
  • Table API support to access Pravega Streams for both Batch and Streaming use case.

Compatibility Matrix

The master branch will always have the most recent supported versions of Flink and Pravega.

Git Branch Pravega Version Flink Version Status Artifact Link
master 0.14 1.18 Under Development https://github.com/pravega/flink-connectors/packages/19676441
r0.14-flink1.17 0.14 1.17 Under Development https://github.com/pravega/flink-connectors/packages/1441637
r0.14-flink1.16 0.14 1.16 Under Development https://github.com/pravega/flink-connectors/packages/1704300
r0.13 0.13 1.16 Released https://repo1.maven.org/maven2/io/pravega/pravega-connectors-flink-1.16_2.12/0.13.0/
r0.13-flink1.15 0.13 1.15 Released https://repo1.maven.org/maven2/io/pravega/pravega-connectors-flink-1.15_2.12/0.13.0/
r0.13-flink1.14 0.13 1.14 Released https://repo1.maven.org/maven2/io/pravega/pravega-connectors-flink-1.14_2.12/0.13.0/
r0.12 0.12 1.15 Released https://repo1.maven.org/maven2/io/pravega/pravega-connectors-flink-1.15_2.12/0.12.0/
r0.12-flink1.14 0.12 1.14 Released https://repo1.maven.org/maven2/io/pravega/pravega-connectors-flink-1.14_2.12/0.12.0/
r0.12-flink1.13 0.12 1.13 Released https://repo1.maven.org/maven2/io/pravega/pravega-connectors-flink-1.13_2.12/0.12.0/

How to build

Building the connectors from the source is only necessary when we want to use or contribute to the latest (unreleased) version of the Pravega Flink connectors.

To build the project, Java version 11 is required and the repository needs to be checkout via git clone https://github.com/pravega/flink-connectors.git.

The connector project is linked to a specific version of Pravega, based on the pravegaVersion field in the gradle.properties.

After cloning the repository, the project can be built (excluding tests) by running the below command in the project root directory flink-connectors.

./gradlew clean build -x test

How to use

Check out documents here to learn how to build your own applications using Flink connector for Pravega. Also watch out that the Java version required to run the connector is either 8 or 11.

More examples on how to use the connectors with Flink application can be found in Pravega Samples repository.

Support

Don't hesitate to ask! Contact the developers and community on Slack (signup) if you need any help. Open an issue if you found a bug on Github Issues.

About

Flink connectors for Pravega is 100% open source and community-driven. All components are available under Apache 2 License on GitHub.

flink-connectors's People

Contributors

alexanderzhao1 avatar andreykoltsov1997 avatar aparnarr avatar arvindkandhare avatar charlielchen avatar chrisdail avatar crazyzhou avatar derekm avatar elizabethbain avatar empcl avatar eronwright avatar fpj avatar fyang86 avatar gamalhot avatar guangfeng-xu avatar hldnova avatar jdmaguire avatar jonny-miller avatar maddisondavid avatar miomiomiomio avatar pavanbhat21 avatar raulgracia avatar skrishnappa avatar stephanewen avatar thekingofcity avatar tzulitai avatar vijikarthi avatar welkin-y avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

flink-connectors's Issues

Merge FlinkExactlyOncePravegaWriter and FlinkPravegaWriter

Problem description
The FlinkExactlyOncePravegaWriter currently always uses transactions to write to pravega. Hence if this is used as a sink in a flink job with checkpointing disabled the events will never be committed to pravega.
We could conditionally use transactions in the writer depending on whether checkpointing is enabled or not.

Problem location
FlinkExactlyOncePravegaWriter

Suggestions for an improvement
Merge the 2 writer implementations.

Integrate flink end-to-end tests with Travis

Suggestion came from slack around integrating testing of the flink connectors using the flink end-to-end tests.

From Stephan:

Or we change the build setup of the flink-connectors repository a bit:

  • Travis would not call gradle directly, but instead run a test driver script
  • That script would trigger initially gradle, but also run some end-to-end tests that download flink, take the compiled connector, start flink, submit a job, parse results, etc.

Develop a unit test for FlinkPravegaReader

Problem description
The FlinkPravegaReader has a test FlinkPravegaReaderTest that is best understood as an integration test. Coverage is too low.

Suggestions for an improvement

  • Rename FlinkPravegaReader to FlinkPravegaReaderITCase.
  • Develop a unit test that uses a mock Pravega EventStreamReader.

Improve fairness of initial allocation of segments

Problem description
When the Flink job starts, the readers acquire segments as they start up. The first reader to start is being overly greedy by acquiring all segments and starving the other readers. Rebalancing eventually occurs as other readers come online.

This is an optimization issue not a functional problem.

Problem location
FlinkPravegaReader

Suggestions for an improvement
The problem is basically a race between reader initialization and segment acquisition. Since the number of subtasks (reader instances) is known when the reader group is created, their names could be pre-registered with the reader group during its initialization. This implies that the names be made stable (issue #16).

Here's a log showing that reader 1 (of 4) acquires all segments before readers 2..4 come online. Also shown is subsequent rebalancing.

2017-05-08 16:58:46,501 28409 [Source: Custom Source (2/4)] INFO  i.p.c.flink.FlinkPravegaReader - Starting Pravega reader 'Source: Custom Source (2/4)' for controller URI tcp://localhost:32770
2017-05-08 16:58:46,506 28414 [flink-akka.actor.default-dispatcher-7] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source (1/4) (4d19a3c2c16c6f6086f6b235f4335dbc) switched from DEPLOYING to RUNNING.
2017-05-08 16:58:46,509 28417 [flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source (2/4) (86099972408624a6918951a22530243d) switched from DEPLOYING to RUNNING.
2017-05-08 16:58:46,511 28419 [Source: Custom Source (2/4)] INFO  i.p.c.s.impl.EventStreamReaderImpl - EventStreamReaderImpl( id=Source: Custom Source (2/4)) acquiring segments {Segment(scope=scope, streamName=ShlxqXWdXvXzjGdpvZKJ, segmentNumber=2)=10236, Segment(scope=scope, streamName=ShlxqXWdXvXzjGdpvZKJ, segmentNumber=3)=9996, Segment(scope=scope, streamName=ShlxqXWdXvXzjGdpvZKJ, segmentNumber=0)=10308, Segment(scope=scope, streamName=ShlxqXWdXvXzjGdpvZKJ, segmentNumber=1)=10308}
2017-05-08 16:58:46,518 28426 [Source: Custom Source (3/4)] INFO  i.p.c.flink.FlinkPravegaReader - Starting Pravega reader 'Source: Custom Source (3/4)' for controller URI tcp://localhost:32770
2017-05-08 16:58:46,547 28455 [flink-akka.actor.default-dispatcher-6] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source (3/4) (30fd38ce163e51e765d3b236b522c811) switched from DEPLOYING to RUNNING.
2017-05-08 16:58:46,556 28464 [Source: Custom Source (1/4)] INFO  i.p.c.flink.FlinkPravegaReader - Starting Pravega reader 'Source: Custom Source (1/4)' for controller URI tcp://localhost:32770
2017-05-08 16:58:46,558 28466 [Source: Custom Source (4/4)] INFO  i.p.c.flink.FlinkPravegaReader - Starting Pravega reader 'Source: Custom Source (4/4)' for controller URI tcp://localhost:32770
2017-05-08 16:58:46,608 28516 [flink-akka.actor.default-dispatcher-6] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source (4/4) (03d62d7c81dae731e685adeca1175177) switched from DEPLOYING to RUNNING.
2017-05-08 16:58:46,610 28518 [flink-akka.actor.default-dispatcher-6] INFO  o.a.f.r.e.ExecutionGraph - Map -> Sink: Unnamed (1/1) (9aff8650dc230d80764275ab6a5e2dc2) switched from DEPLOYING to RUNNING.
...
2017-05-08 16:58:53,142 25485 [Source: Custom Source (4/4)] DEBUG i.p.c.flink.FlinkPravegaReader - Emitting: 3858
2017-05-08 16:58:53,144 25487 [Source: Custom Source (4/4)] DEBUG i.p.c.flink.FlinkPravegaReader - Emitting: 3859
2017-05-08 16:58:54,723 28631 [Map -> Sink: Unnamed (1/1)] DEBUG i.p.c.f.u.IntSequenceExactlyOnceValidator - IntSequenceExactlyOnceValidator - received element: 3858
2017-05-08 16:58:54,723 28631 [Map -> Sink: Unnamed (1/1)] DEBUG i.p.c.f.u.IntSequenceExactlyOnceValidator - IntSequenceExactlyOnceValidator - received element: 3859
...
2017-05-08 16:59:23,314 25657 [Source: Custom Source (4/4)] INFO  i.p.c.s.impl.EventStreamReaderImpl - EventStreamReaderImpl( id=Source: Custom Source (4/4)) releasing segment Segment(scope=scope, streamName=OwKBLcwfDwfhNkhIqXuK, segmentNumber=2)
2017-05-08 16:59:23,332 25675 [Source: Custom Source (1/4)] INFO  i.p.c.s.impl.EventStreamReaderImpl - EventStreamReaderImpl( id=Source: Custom Source (1/4)) acquiring segments {Segment(scope=scope, streamName=OwKBLcwfDwfhNkhIqXuK, segmentNumber=2)=4572}
...
2017-05-08 16:59:56,142 25485 [Source: Custom Source (1/4)] DEBUG i.p.c.flink.FlinkPravegaReader - Emitting: 4002
2017-05-08 16:59:56,144 25487 [Source: Custom Source (4/4)] DEBUG i.p.c.flink.FlinkPravegaReader - Emitting: 4021
...

Treat Pravega as a source dependency

Problem description
Fix the travis build of the connector to build Pravega automatically, so that the connector may be unit tested by travis.

Suggestions for an improvement
Consider using a submodule plus a composite build to incorporate the Pravega build into the connector build.

Use a builder for creating Pravega Reader/Writer

Problem description

During discussion for #30 we noticed the number of options to the constructors required for flink. It may be a nicer experience to have a builder to create readers and writers instead of using constructors. This may lead to better code reuse.

Suggestions for an improvement

Look into using a builder for construction of the flink connectors. Potential items to be handled by the builder:

  • controller info
  • stream id (as StreamId or as separate scope/stream name). Note that the reader accepts multiple stream names. Should it be possible to read from numerous scopes?
  • writer modes (best-effort, at-least-once, exactly-once)
  • start time
  • deserialization schema
  • 'reader name' (better understood as operator name)
  • event read timeout
  • checkpoint initiate timeout

Implement connector metrics

Problem description
The connector should emit relevant metrics using Flink's metrics API. Note that Flink already provides some basic metrics like records in/out, which needn't be duplicated.

Suggestions for an improvement
Suggested metrics:

  • configuration data (stream names as gauges?)
  • segment assignment
  • event I/O rates
  • position information
  • lag (ReaderGroupMetrics::unreadBytes)

Move the exactly once connectors into flink-connectors

Problem description
Currently the exactly once connectors are being developed out of the pravega/pravega project.
We need to move those into the pravega/flink-connectors project.

Problem location
Flink connectors in pravega

Suggestions for an improvement
Move the flink connector files our of the pravega project.

Recovered commit should apply to original stream

Problem description
The connector is designed to handle an edge case where the transaction associated with a given savepoint remains uncommitted due to a crash before notifyCheckpointComplete. In that situation, a subsequent job based on that savepoint recovers the transaction during restoreState. The subject of this bug is that the new job might point to a different stream from that of the original job. The code in recoverState should use the old stream name when looking for transactions to recover.

Problem location
FlinkPravegaWriter

Suggestions for an improvement
Record the old name as part of the tuple that is returned in savepoint.

Remove "pravega-standalone" from shaded JAR

Problem description
The pravega-standalone dependency is erroneously being included in the shaded JAR.

Since the server code isn't in the shaded jar, and the POM doesn't list the server libs as dependencies (nor should it), the emulator classes make little sense to have here.

Problem location
build.gradle

Suggestions for an improvement
Remove it from the set of 'included dependencies' section of 'shadowJar'. It is fine to continue to have this dependency in the testCompile configuration.

Disable Pravega automatic checkpointing

Problem description
Since the connector explicitly invokes checkpoints, the automatic checkpoint feature is unnecessary and causes an issue due to an assumption within the connector about the checkpoint name.

See related: Pravega/issues/Automatic checkpoints

Problem location
FlinkPravegaReader

Suggestions for an improvement

  • disable automatic checkpointing using the reader group config.

Work on README

Problem description
The project readme file needs detail, let's write a good landing readme file.

Problem location
README.

Suggestions for an improvement
Add more information to the main README file.

Add logging support to tests

Problem description
All tests are currently timing out in my setup. I went to build looking for logs and didn't find any. It would be useful to have slf4j logging dumped by default to files so that we can inspect in the case of test failures.

Problem location
Tests, logging.

Suggestions for an improvement
Add slf4j logging and configuration.

Improve watermark handling (per-segment watermark)

Problem description
Time is a first-class aspect of the Flink Streaming programming model. The progress of time is based on watermarks that typically track the timestamps seen in records as they're emitted by the source. The system works best if sources emit records in roughly the order they were received in; in other words, Flink's ability to reorder out-of-order elements is limited. Two problems occur when a source emits records in a highly unordered fashion.

  1. Any record with a timestamp older than the current watermark is considered a 'late' record which requires special handling by the application.
  2. Any record with a timestamp newer than the current watermark is buffered in operator state. Depending on the state backend, this can increase memory and/or disk usage.

The Pravega source already processes segments in order; a successor won't be processed until its predecessors have been. However, segments that meet that criteria are simply processed in the order that the reader group encounters them. The ordering isn't strong enough to avoid emission of late records.

Another cause of out-of-orderness is the fact that some segments are much longer than others, combined with the fact that a segment is processed completely once it is assigned to a reader. For example, imagine that segment A contains a full day's worth of records, while segment B was split into segments C and D at mid-day due to a scale event. A given reader may well emit all of A before tackling C/D.

This problem asserts itself mainly in historical processing scenarios.

Suggestions for an improvement

  1. Consider prioritizing unassigned segments by their start time.
  2. Consider introducing a per-segment watermark assigner, with automatic min-watermark calculation across segments. This may help with problem (1). See also Kafka Per-Partition Watermarks feature docs.
  3. Consider employing a segment selection policy that would discourage a given reader from assigning itself a segment that contains elements older than the reader's current watermark. For example, once a segment has been processed, store the per-segment watermark into reader group state as the effective 'start time' of the successors.

Use single threaded executor in FlinkPravegaWriter

Problem description
The non-transactional writer uses a larger thread pool than necessary for handling write completion callbacks.

Suggestions for an improvement
Use a single threaded executor to reduce the number of created threads and to reduce contention. The body of the callback synchronizes on the internal writer anyway, so having numerous threads simply increases contention.

Batch connector doesn't rewind on task failover

Problem description
Flink supports a number of failover strategies when a task fails, by default a global restart of the job. None of the restart strategies work correctly with FlinkPravegaInputFormat because it doesn't rewind the reader group to the start position.

Suggestions for an improvement

  • Discard the reader group state upon a global restart.
  • Validate that the job is configured to use the global failover strategy.

Additional Background
The restart procedure for an input format normally works as follows:

  1. InputFormat::new is called by the client, then serialized into a job graph.
  2. InputFormat::configure is called once by the JM when the execution graph is built.
  3. InputFormat::createInputSplits is called once by the JM when the execution graph is built.
  4. InputFormat::getInputSplitAssigner(splits) is called for each execution (incl. resetForNewExecution)
  5. InputSplitAssigner::getNextInputSplit is called by the JM to produce a split for processing by an idle task.

In words, the 'assigner' is the iterator and is reset by the 'global' failover strategy using resetForNewExecution.

Prefer Flink annotations and preconditions

Problem description
With the rationale that the connector code should sit nicely with that of other Flink connectors, prefer Flink annotations over Guava annotations where possible. Concretely: Preconditions, VisibleForTesting.

Note that the Pravega client itself uses Guava, so it will remain a dependency.

Implement table connector (streaming only)

Problem description
A Flink table connector (TableSource/TableSink) is another type of connector, needed to support Flink's Table/SQL library. Two variants exist, one for integration with the streaming API (e.g. StreamTableSource), the other for the batch API (e.g. BatchTableSource). The implementation typically builds on the classic streaming or batch connector.

Implementation Details
The duty of a table source is to return a DataStream<Row> where Row is a core data type of Flink Table. A typical table source takes a DeserializationSchema<Row> as a parameter, to support conversion from an arbitrary byte message to a Row. Flink provides JsonRowDeserializationSchema and AvroRowDeserializationSchema for convenience.

A table sink is similar to a source but has some interesting implementation details, building on the concept of Streams & Tables. A basic implementation may extend AppendStreamTableSink to indicate that it accepts inserts only. A fancy sink may extend other variants to support deletes, upserts, etc.

More information here: Flink documentation

Implement batch connector (source only)

Problem description
Develop a batch connector to process historical data in a given stream using the Flink Batch API. A good reason is to be able to use Flink Gelly (graph API) and Flink ML which are based on the batch API.

It is probably sufficient to consider a Pravega stream as a source, not as a sink, for batch purposes.

Suggestions for an improvement
There's at least two approaches being considered. One is to treat segments as splits, which seems more likely to leverage data locality and more compatible with fine-grained recovery, but requires client changes. Another is to synthesize a split per subtask and to use the existing reader group API, which would require fewer changes to the Pravega client but underperform at scale.

Implement an InputFormat, which splits an input file or other resource (e.g. Pravega stream) into processable units for Flink to process across a number of subtask instances. For example, a file-based input format splits the input file(s) into splits representing non-overlapping ranges that correspond to HDFS blocks. For Pravega, a close analogue would be to treat each segment as an input split.

If location hints are available (on a per-split basis), implement LocatableInputSplit and return an instance of LocatableInputSplitAssigner from InputFormat.

A connector is free to read each split in whatever way it chooses. It needn't be a file-based access. For example, if each segment were a split, the connector could use a Pravega Reader API to query a specific segment, e.g. io.pravega.client.segment.impl.SegmentInputStream (which the reader group API uses under the hood).

Handle expiration of transactions between operator checkpoint and global checkpoint commit

Problem description

The FlinkExactlyOncePravegaWriter works similar to a 2-PC protocol:

  1. A transaction is created per checkpoint
  2. When Flink does a checkpoint on the writer, it flushes the transaction, and stores its UUID in the checkpoint state (vote-to-commit)
  3. When the checkpoint is complete and the sink receives a notification, that checkpoint's transaction is committed (full commit)

That model assumes that transactions can be committed once the checkpoint's completeness notification is received (step 3). If a transaction times out between step (2) and step (3), there will be data loss.

This is an inherent fragility that seems hard to circumvent with the current primitive, and has to do with transaction timeouts. Given sufficiently long timeouts, this may never be a problem in most setups, but it is not nice to have this weak point in the long run.

Problem location

The problem is in the use of Pravega Transactions in the io.pravega.connectors.flink.FlinkExactlyOncePravegaWriter.

Suggestions for an improvement

From the top of my head, there are three types of solutions to that issue:

  1. Pravega offers a pre-commit / full-commit distinction in transactions, where a pre-commit means the transaction becomes immutable and the usual timeouts do not apply (except possibly a garbage prevention-timeout which could be very long). The full commit publishes the temporary segment.

  2. Flink makes sure it can recover transactions, for example by persisting the data as well in the checkpoints. If a transaction timed out, it can open a new transaction and re-write the data. Disadvantage is that effectively, everything is persisted in two distinct system (each with its own durability/replication).

  3. Flink adds a way that lets tasks complete a checkpoint completely independent of other tasks. Then the transaction could be immediately committed on trigger checkpoint. That would require to guarantee that the sinks would never be affected by a recovery of other tasks. To keep the current consistency guarantees, this would require persisting the result from the input to the sink operation (similar as for example Samza persists every shuffle), which is in some sense not too different from approach (2).

Use secure token for Travis integration

Problem description
The token for Travis integration has currently changed and is replaced by secure credentials. The same needs to be used for (pravega/pravega#1712 for reference). The same credentials should be used for this repo too.

Problem location
.travis.yml

Suggestions for an improvement
Replace token with secure token.

Implement 'fixed' event router

Problem description
To use the Pravega sink, an instance of PravegaEventRouter must be provided. Develop a predefined implementation, similar to FlinkFixedPartitioner, that maps each Flink partition to a routing key.

Consider whether it would be possible to support parallelism changes to the sink.

This approach is one of many that could be imagined to make sense of the ordering of events produced to a stream by a Flink program. A program may ensure that all events for a given key arrive at the same sink instance by leveraging a keyed stream, e.g. stream.keyBy(...).addSink(..). Some event-time reordering logic would also be needed to establish a per-key ordering.

See org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner for background.

Suggestions

  • Evolve the PravegaEventRouter interface with an open(int parallelInstanceId, int parallelInstances) method.

Remove unneeded dependencies

Problem description
Some of the 'shared-*' dependencies might not be needed, e.g. shared-metrics.

Problem location
build.gradle

Suggestions for an improvement

FlinkPravegaWriter cannot be flushed following a write error

Problem description
When close or flush is called after a write that has failed internally, the method will stall forever.

Problem location
The problem is due to bad accounting of pending writes based on pendingWritesCount. The flush method waits for the count to reach zero, but the count is not properly decremented on error. (ref)

Suggestions for an improvement
Decrement the write count irrespective of success or failure.

Implement table source (batch)

Problem description
As a follow-up to #32, extend FlinkPravegaTableSource to support the batch API, based on the underlying FlinkPravegaInputFormat.

Please note that Flink's in-built CsvTableSource implements both StreamTableSource and BatchTableSource. We want the same here.

Develop a unit test for FlinkPravegaInputFormat

Problem description

Likewise to #67 and #68, the current tests for the FlinkPravegaInputFormat are strictly speaking integration tests that do not have sufficient coverage. We should develop proper unit tests for the connector.

Suggestions for an improvement

  • Rename FlinkPravegaInputFormatTest to FlinkPravegaInputFormatITCase.
  • Develop a unit test that uses a mock Pravega EventStreamReader.

Use a stable default readerName

Problem description
The readerName is used only as a key into Flink's checkpoint state to disambiguate the state written by our checkpoint hook, from that of other hooks in the same job. It is important for the value to be stable, otherwise the savepoint is unusable. Let's use a more stable identifier by default than a randomly-generated value.

Problem location
FlinkPravegaReader

Suggestions for an improvement
Ordinary state stored by an operator is similarly disambiguated using the operator ID (ref). Some enhancement to Flink's hook facility would be needed to use the operator ID here.

In the meantime, I suggest hashing the inputs to the PravegaFlinkReader (stream name, etc) to generate a default value for the readerName.

Reader ID stability

Problem description
Each subtask instance of FlinkPravegaReader creates a separate reader as part of the overall reader group. The reader ID is presently derived from UUID.randomUUID(), but would benefit from simply being derived from the subtask ID. This would improve readability of logs and nicely relate Flink concepts to Pravega concepts. No naming conflict is expected because a separate reader group is created for each Flink job.

Problem location
FlinkPravegaReader

Suggestions for an improvement
In FlinkPravegaReader::run, simply use the following to initialize the reader ID:

final String readerId = getRuntimeContext().getTaskNameWithSubtasks();

This would produce logs like the following:

2017-05-08 16:58:46,501 28409 [Source: Custom Source (2/4)] INFO  i.p.c.flink.FlinkPravegaReader - Starting Pravega reader 'Source: Custom Source (2/4)' for controller URI tcp://localhost:32770
2017-05-08 16:58:46,556 28464 [Source: Custom Source (1/4)] INFO  i.p.c.flink.FlinkPravegaReader - Starting Pravega reader 'Source: Custom Source (1/4)' for controller URI tcp://localhost:32770

Instead of:

2017-05-08 17:54:31,892 19956 [Source: Custom Source (4/4)] INFO  i.p.c.flink.FlinkPravegaReader - Starting Pravega reader 'flink-reader-9ac3a8bd-711c-448d-9e0a-2e3f59b7967e' for controller URI tcp://localhost:32770

Note that the term Custom Source stems from the application-specific source name provided via the DSL, e.g.:

env.addSource(new FlinkPravegaReader(...)).name("Custom Source")

Pravega txn API changes

Problem description
In this commit:

pravega/pravega@fc27917

we changed the txn API to remove the various timeouts in the signature. For now, we have moved them to configuration, but we should expect some further changes.

Problem location
Sink connector.

Suggestions for an improvement
Change the use of txns according to the API changes.

Exception thrown while trying to connect to controller.

Problem description

        at io.pravega.client.stream.impl.ControllerResolverFactory.lambda$newNameResolver$103(ControllerResolverFactory.java:68)
        at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
        at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
        at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
        at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
        at io.pravega.client.stream.impl.ControllerResolverFactory.newNameResolver(ControllerResolverFactory.java:69)
        at io.grpc.internal.ManagedChannelImpl.getNameResolver(ManagedChannelImpl.java:440)
        at io.grpc.internal.ManagedChannelImpl.<init>(ManagedChannelImpl.java:394)
        at io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:350)
        at io.pravega.client.stream.impl.ControllerImpl.<init>(ControllerImpl.java:140)
        at io.pravega.client.stream.impl.ControllerImpl.<init>(ControllerImpl.java:114)
        at io.pravega.client.admin.impl.StreamManagerImpl.<init>(StreamManagerImpl.java:35)
        at io.pravega.client.admin.StreamManager.create(StreamManager.java:28)
        at io.pravega.connectors.flink.utils.SetupUtils.createTestStream(SetupUtils.java:126)

Problem location
SetupUtils.java

In SetupUtils getControllerUri method:

        return URI.create("tcp://" + this.inProcPravegaCluster.getControllerURI());
    }

It expects inProcPravegaCluster.getControllerUri to be only IP:port value.
However, "tcp://" prefix is already added by inprocPravegaCluster
This causes uri to look like "tcp://tcp:// ..."

There is also a bug in InProcPravegaCluster where it adds ports[0] twice whereby making the return value look like:
"tcp://localhost:ports[0], localhost:ports[0], localhost:ports[1] ..."
So even though flink connector tests start only one controller, they get a comma separated value of
"tcp://localhost:<ports[0]>, localhost:<ports[0]>". Though this is harmless.

Suggestions for an improvement
Fix the issue in setupUtils and InProcPravegaCluster.
InprocPravegaCluster should not include ports[0] twice..
SetupUtils should not add tcp:// as it is already added to the returned value..

Unit testing for reader and writer

Problem description
The existing tests do not provide adequate coverage because they're unable to simulate a wide variety of edge cases. For example, the writer has logic for write failures and async writes that isn't exercised.

Suggestions for an improvement
Develop a new set of tests that use a mock Pravega client and the Flink-provided operator test harness. See io.pravega.connectors.flink.EventTimeOrderingOperatorTest for an example.

Consider moving those tests that rely on an embedded Pravega server to intTest.

Reevaluate error semantics of FlinkPravegaWriter

Problem description
In the non-transactional write mode, the sink function processes writes asynchronously. Any error is buffered internally to be re-thrown later when a subsequent write, close or snapshot is called. One issue is that the error state is cleared after the error is thrown. It seems odd to clear the error since it a fatal error.

Consider the following sequence:

write("A"); // processed asynchronously, resulting in a buffered error.
try {
    write("B"); // buffered error is thrown 
}
catch(Exception e) {
    write("B"); // should this succeed?
}

Problem location
FlinkPravegaWriter::checkWriteError

Suggestions for an improvement
Don't clear the error; block any further writes. Maybe move to a closed state upon error.

Validate correctness of ByteBuffer / byte[] interchange

Problem description

The bridge between Pravega and Flink serializers switches between ByteBuffer and byte[]. It makes the assumption that the provided byte buffers are always heap-backed and that the payload of the byte buffer is the complete backing array (no position / mark / limit / slice).

Problem location

In the FlinkPravegaUtils$FlinkDeserializer.deserialize() method.

Suggestions for an improvement

We should have sanity checks that validate that the byte buffer spans the full backing array (position is zero, limit is capacity).

Update connector build to support Flink 1.4.0

Problem description

There are couple of breaking changes in Flink 1.4 code base (FLINK-7323, FLINK-7548) that will impact running flink/pravega connector on earlier version (1.3.1). This requires an update to the connector project to support Flink 1.4.0 binary.

Build against stable Flink version 1.3.1

Problem description

The current build goes against the 1.3-SNAPSHOT version. That is not a stable release and only published to the nightly snapshot repositories. It should not be relied upon in a stable release of this connector.

Problem location

Flink version at gradle.properties.

Suggestions for an improvement

Change the Flink version to 1.3.1.

ReaderCheckpointHook failure (FLINK-6606)

Problem description
The reader fails at job initialization, due to an issue with the classloader as described in FLINK-6606.

The exception is:

2017-05-12 15:07:15,227 ERROR org.apache.flink.runtime.jobmanager.JobManager                - Failed to submit job a7535f3f9bdfc06ec89f57e4683aed68 (anomaly-detection)
io.pravega.shaded.io.grpc.ManagedChannelProvider$ProviderNotFoundException: No functional channel service provider found. Try adding a dependency on the grpc-okhttp or grpc-netty artifact
    at io.pravega.shaded.io.grpc.ManagedChannelProvider.provider(ManagedChannelProvider.java:126)
    at io.pravega.shaded.io.grpc.ManagedChannelBuilder.forTarget(ManagedChannelBuilder.java:77)
    at io.pravega.client.stream.impl.ControllerImpl.<init>(ControllerImpl.java:91)
    at io.pravega.client.admin.stream.impl.ReaderGroupManagerImpl.<init>(ReaderGroupManagerImpl.java:49)
    at io.pravega.client.admin.ReaderGroupManager.withScope(ReaderGroupManager.java:39)
    at io.pravega.connectors.flink.ReaderCheckpointHook.<init>(ReaderCheckpointHook.java:62)
    at io.pravega.connectors.flink.FlinkPravegaReader.createMasterTriggerRestoreHook(FlinkPravegaReader.java:276)
    at org.apache.flink.streaming.api.graph.FunctionMasterCheckpointHookFactory.create(FunctionMasterCheckpointHookFactory.java:43)
    at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:262)

Problem location
The root cause is that grpc uses the thread context classloader (TCCL) to locate a transport plugin (i.e. grpc-netty), but Flink isn't setting the TCCL to facilitate that.

Suggestions for an improvement
A patch has been submitted to Flink, ideally for inclusion in 1.3. As a workaround, the FlinkPravegaReader could set the TCCL by hand, e.g.:

class FlinkPravegaReader {
public FlinkPravegaReader() {
  ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
  Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
  try {
    ReaderGroupManager.withScope(scope, controllerURI)
                .createReaderGroup(this.readerGroupName,
                ReaderGroupConfig.builder().startingTime(startTime).build(), streamNames);
  } finally {
    Thread.currentThread().setContextClassLoader(originalClassLoader);
  }
}

Develop a unit test for FlinkPravegaWriter

Problem description
The FlinkPravegaWriter has a pair of existing tests, FlinkPravegaWriterTest and FlinkExactlyOncePravegaWriterTest, that are best understood as integration tests. Overall coverage is low.

Suggestions for an improvement

  • Develop a unit test that uses a mock Pravega event writer to fully exercise the FlinkPravegaWriter.
  • Consolidate the existing tests into a test called FlinkPravegaWriterITCase.

Use Pravega batch read API with batch connector

Problem description
We have merged an experimental API for batch reads of a stream:

pravega/pravega@493a9f4

The general idea is that a job has access to all segments in parallel, assuming that for batch jobs, we don't care about the order of events in a stream.

The idea is to incorporate this API in the preliminary batch connector developed in PR #54.

Problem location
Batch connectors.

Suggestions for an improvement
Use the experimental batch read API

Implement watermark idleness

Problem description
Subtasks that are without assigned segments/shards/partitions for an indefinite period should enter an idle state. Otherwise downstream operators may stall waiting for watermark progression. See FLINK-5017.

From SourceFunction.SourceContext:

/**
 * Marks the source to be temporarily idle. This tells the system that this source will
 * temporarily stop emitting records and watermarks for an indefinite amount of time. This
 * is only relevant when running on {@link TimeCharacteristic#IngestionTime} and
 * {@link TimeCharacteristic#EventTime}, allowing downstream tasks to advance their
 * watermarks without the need to wait for watermarks from this source while it is idle.
 *
 * <p>Source functions should make a best effort to call this method as soon as they
 * acknowledge themselves to be idle. The system will consider the source to resume activity
 * again once {@link SourceContext#collect(T)}, {@link SourceContext#collectWithTimestamp(T, long)},
 * or {@link SourceContext#emitWatermark(Watermark)} is called to emit elements or watermarks from the source.
 */
 @PublicEvolving
 void markAsTemporarilyIdle();

Problem location
FlinkPravegaReader

Suggestions for an improvement
Call markAsTemporarilyIdle whenever the Pravega reader has no segments assigned.

FlinkPravegaReader fails to achieve exactly-once semantics in some cases

Problem description
In the edge case where a task fails before the first checkpoint is successful, the behavior should be that the group is rewound to the initial state. For example, if the group was configured to start from the beginning of the stream, it should restart from the beginning.

Due to how the source is implemented by creating the reader group in the source constructor (which is not re-executed in this case), and that the hook isn't invoked when there's no state to restore, the actual behavior is that the group simply continues from where it left off. Actually, when the replacement tasks start up, an error occurs due to dirty state:

java.lang.IllegalStateException: The requested reader: Source: Custom Source -> Sink: Unnamed (1/4) cannot be added to the group because it is already in the group. Perhaps close() was not called?
	at io.pravega.client.stream.impl.ReaderGroupStateManager.initializeReader(ReaderGroupStateManager.java:118)
	at io.pravega.client.stream.impl.ClientFactoryImpl.createReader(ClientFactoryImpl.java:138)
	at io.pravega.client.stream.impl.ClientFactoryImpl.createReader(ClientFactoryImpl.java:124)
	at io.pravega.connectors.flink.util.FlinkPravegaUtils.createPravegaReader(FlinkPravegaUtils.java:110)
	at io.pravega.connectors.flink.FlinkPravegaReader.run(FlinkPravegaReader.java:227)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
	at org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(StoppableSourceStreamTask.java:39)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
	at java.lang.Thread.run(Thread.java:748)

Simply removing oneself from the online readers would fix the above symptom but would produce undesirable at-most-once behavior.

Suggestions for an improvement
The obvious fix is to change Flink's hook functionality to invoke the hook in the non-restore case too. In that case, the hook would reinitialize the group. Alternately, the tasks could catch the above exception and reset the reader group state, with some additional coordination.

As a workaround, the reader could wait for the first Flink checkpoint to arrive before processing any elements. There's a catch-22: the Flink checkpoints are communicated to the task via the reader group state!

Remove "TCCL" workaround code

Problem description
For issue #24 we added some code to workaround a bug in Flink builds prior to RC2. Once the connector is updated to depend on RC2+, we can remove the workaround code.

Build step throws checkstyle violation error

Problem description
Building latest connector code is throwing checkstyle violation errors

Problem location
Multiple java files

[ant:checkstyle] [ERROR] /flink-connectors/src/main/java/io/pravega/connectors/flink/CheckpointSerializer.java:1: Line does not match expected header line of '/\*\*'. [RegexpHeader]
[ant:checkstyle] [ERROR] /flink-connectors/src/main/java/io/pravega/connectors/flink/FlinkExactlyOncePravegaWriter.java:1: Line does not match expected header line of '/\*\*'. [RegexpHeader]
[ant:checkstyle] [ERROR] /flink-connectors/src/main/java/io/pravega/connectors/flink/FlinkPravegaReader.java:1: Line does not match expected header line of '/\*\*'. [RegexpHeader]
[ant:checkstyle] [ERROR] /flink-connectors/src/main/java/io/pravega/connectors/flink/FlinkPravegaWriter.java:1: Line does not match expected header line of '/\*\*'. [RegexpHeader]
[ant:checkstyle] [ERROR] /flink-connectors/src/main/java/io/pravega/connectors/flink/PravegaEventRouter.java:1: Line does not match expected header line of '/\*\*'. [RegexpHeader]
[ant:checkstyle] [ERROR] /flink-connectors/src/main/java/io/pravega/connectors/flink/PravegaWriterMode.java:1: Line does not match expected header line of '/\*\*'. [RegexpHeader]
[ant:checkstyle] [ERROR] /flink-connectors/src/main/java/io/pravega/connectors/flink/ReaderCheckpointHook.java:1: Line does not match expected header line of '/\*\*'. [RegexpHeader]

Suggestions for an improvement

Publish the effective POM

Problem description
The wrong POM is being published, it should be the shaded/effective POM. The unshaded POM shows dependencies (e.g. on the Pravega client) that shouldn't be there after shading.

Problem location
Gradle build

Suggestions for an improvement

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.