Code Monkey home page Code Monkey logo

proxima-platform's Introduction

Build Status sonar Maven Version

The Proxima platform

The platform is a generic data ingestion, manipulation and retrieval framework. High level can be described by following scheme:

high-level scheme

Design document

High level design document can be found here.

Incomplete (under construction) documentation

Not yet complete documentation can be found here. The documentation should grow over over time to cover all the aspects of the platform. PRs welcome!

Scheme definition

First, let's introduce some glossary:

  • entity: a named dictionary consisting of string key and one or more attributes
  • attribute: an atomic field of entity with string name and scheme definining its data-type
  • attribute family: a logical grouping of attributes of the same entity into a named group
  • storage: a physical store for data

Example scheme definition

The scheme definition uses HOCON. As a short example we will show definition of data processing of a hypothetic e-commerce site. The site has some goods, some users and generates some events which describe how users interact with the goods. We will use protocol buffers for serialization.

First, let's define our data model. We will model the system which processes events coming from some source in given format and based on these events creates a model of user preferences.

entities {
  # user entity, let's make this really simple
  user {
    attributes {

      # some details of user - e.g. name, email, ...
      details { scheme: "proto:cz.o2.proxima.example.Example.UserDetails" }

      # model of preferences based on events
      preferences { scheme: "proto:cz.o2.proxima.example.Example.UserPreferences" }

      # selected events are stored to user's history
      "event.*" { scheme: "proto:cz.o2.proxima.example.Example.BaseEvent" }

    }
  }
  # entity describing a single good we want to sell
  product {
    # note: we have to split to separate attributes each attribute that we want to be able
    # to update *independently*
    attributes {

      # price, with some possible additional information, like VAT and other stuff
      price { scheme: "proto:cz.o2.proxima.example.Example.Price" }

      # some general details of the product
      details { scheme: "proto:cz.o2.proxima.example.Example.ProductDetails" }

      # list of associated categories
      "category.*" { scheme: "proto:cz.o2.proxima.example.Example.ProductCategory" }

    }
  }

  # the events which link users to goods
  event {
    attributes {

      # the event is atomic entity with just a single attribute
      data { scheme: "proto:cz.o2.proxima.example.Example.BaseEvent" }

    }
  }

}

Next, after defining our data model, we need to specify attribute families for our entities. This definition is highly dependent on the access pattern to the data. Mostly, we have to worry about how are we going to read our data. Relevant questions are:

  • are we going to need a random access (get or list request) for data by entity key and attribute name?
  • are we going to be reading the data as continuously updated stream?
  • do we want to be able to read all historical updates, or are we interested only in the last updated value for each attribute?
  • are we going to process the data in batch fashion to build some sort of model?

Let's describe our intentions as follows:

  • we need to be able to batch reprocess all events (maybe limited by some global time window, say two years back), in order to build a model that will be used to update user's preferences with incoming events
  • we need random acccess to data stored per user and per product
  • we need access to stream of events to be able to do real-time updates to user preferences
  • we want to be able to select some events to be stored in user's history and then list this history by time from newest to oldest

To be able to fulfill these requirements, we have chosen the following storages:

This will yield us the following setup for attribute families (some details are ommitted for simplicity):

 attributeFamilies {

    # we need this to be able to read user attributes 'details' and 'preferences' by user's key
    user-random-access {
      entity: user
      attributes: [ "details", "preferences" ]
      storage: "cassandra://"${cassandra.seed}/${cassandra.user-table}"?primary=user"
      type: primary
      access: random-access
    }

    # store incoming events to user's history
    user-event-history-store {
      entity: event
      attributes: [ "data" ]
      storage: "cassandra://"${cassandra.seed}/${cassandra.user-event-table}/
      # this class defines how we transform incoming event to CQL
      cqlFactory: cz.o2.proxima.example.EventHistoryCqlFactory
      # this is filtering condition, we want to select only some events
      filter: cz.o2.proxima.example.EventHistoryFilter
      type: replica
      access: write-only
    }

    # this family defines read access to the stored event history
    user-event-history-read {
      entity: user
      attributes: [ "event.*" ]
      storage: "cassandra://"${cassandra.seed}/${cassandra.user-event-table}"?primary=user&secondary=stamp&data=event&reversed=true"
      # ignore this for now
      converter: cz.o2.proxima.core.storage.cassandra.DateToLongConverter
      type: replica
      # we will not explicitly modify this, it will be updated automatically by incoming events
      access: read-only
    }

    # random access to products
    product-random-acesss {
      entity: product
      attributes: [ "*" ]
      storage: "cassandra://"${cassandra.seed}/${cassandra.product-table}
      type: primary
      access: random-access
    }

    # event stream storage
    event-commit-log {
      entity: event
      attributes: [ "*" ]
      storage: "kafka://"${kafka.brokers}/${kafka.events-topic}
      # this is our commit log
      type: primary
      access: commit-log
    }
    # store events for batch analytics
    event-batch-storage {
      entity: event
      attributes: [ "*" ]
      storage: "hdfs://"${hdfs.authority}/${hdfs.event-path}
      type: replica
      access: batch-updates
    }

  }

  cassandra {
    seed = "cassandra:9042"
    user-table = "user"
    product-table = "product"
    user-event-table = "user_event"
  }

  kafka {
    brokers = "kafka1:9092,kafka2:9092,kafka3:9092"
    events-topic = "events"
  }

  hdfs {
    authority = "hdfs-master"
    event-path = "/events"
  }

By this definition, we have (somewhat simplified) working description of Proxima platform scheme for data manipulation, that can be fed into the ingestion/retrieval service and will start working as described above.

Platform's data model

Generally, data are modelled as unbounded stream of updates to attributes of entities. Each update consists of the following:

  • name of entity
  • name of attribute
  • value of attribute (or flag representing delete)
  • timestamp of the update
  • UUID of the update

Each stream can then be represented as a table (a.k.a table-stream duality), which is essentially a snapshot of a stream at a certain time (in terms of Proxima platform called batch snapshot).

Compiling scheme definition to access classes

The platform contains maven compiler of scheme specification to java access classes as follows:

     <plugin>
       <groupId>cz.o2.proxima</groupId>
       <artifactId>proxima-compiler-java-maven-plugin</artifactId>
       <version>0.14.0</version>
       <configuration>
         <outputDir>${project.build.directory}/generated-sources/model</outputDir>
         <javaPackage>cz.o2.proxima.testing.model</javaPackage>
         <className>Model</className>
         <config>${basedir}/src/main/resources/test-readme.conf</config>
       </configuration>
       <executions>
         <execution>
           <phase>generate-sources</phase>
           <goals>
             <goal>compile</goal>
           </goals>
         </execution>
       </executions>
       <dependencies>
         <!--
           Use direct data operator access, see later
         -->
         <dependency>
           <groupId>${project.groupId}</groupId>
           <artifactId>proxima-direct-compiler-plugin</artifactId>
           <version>0.14.0</version>
         </dependency>
         <!--
           The following dependencies define additional
           dependencies for this example
         -->
         <dependency>
           <groupId>${project.groupId}</groupId>
           <artifactId>proxima-core</artifactId>
           <version>${project.version}</version>
           <classifier>tests</classifier>
         </dependency>
         <dependency>
           <groupId>${project.groupId}</groupId>
           <artifactId>proxima-scheme-proto</artifactId>
           <version>0.14.0</version>
         </dependency>
         <dependency>
           <groupId>${project.groupId}</groupId>
           <artifactId>proxima-scheme-proto-testing</artifactId>
           <version>0.14.0</version>
         </dependency>
       </dependencies>
     </plugin>

This plugin then generates class cz.o2.proxima.testing.model.Model into target/generated-sources/model. The class can be instantiated via

  Model model = Model.of(ConfigFactory.defaultApplication());

or (in case of tests, where some validations and initializations are skipped)

 Model model = Model.ofTest(ConfigFactory.defaultApplication());

Platform's DataOperators

The platform offers various modes of access to data. As of version 0.14.0, these types are:

  • direct
  • Apache Beam
  • Apache Flink

Direct access to data

This operator is used when accessing data from inside single JVM (or potentially multiple JVMs, e.g. coordinated via distributed consumption of commit log). The operator is constructed as follows:

   private DirectDataOperator createDataOperator(Model model) {
     Repository repo = model.getRepo();
     return repo.getOrCreateOperator(DirectDataOperator.class);
   }

Next, we can use the operator to create instances of data accessors, namely:

  • CommitLogReader
  • BatchLogReader
  • RandomAccessReader

For instance, observing commit log can be done by

   DirectDataOperator operator = model.getRepo().getOrCreateOperator(DirectDataOperator.class);
   CommitLogReader commitLog = operator.getCommitLogReader(
       model.getEvent().getDataDescriptor())
       .orElseThrow(() -> new IllegalArgumentException("Missing commit log for "
           + model.getEvent().getDataDescriptor()));
   commitLog.observe("MyObservationProcess", new LogObserver() {

     @Override
     public boolean onError(Throwable error) {
       throw new RuntimeException(error);
     }

     @Override
     public boolean onNext(StreamElement elem, OnNextContext context) {
       log.info("Consumed element {}", elem);
       // commit processing, so that it is not redelivered
       context.confirm();
       // continue processing
       return true;
     }

   });

Creating BatchLogReader or RandomAccessReader is analogous.

Apache Beam access to data

First, create BeamDataOperator as follows:

  BeamDataOperator operator = model.getRepo().getOrCreateOperator(BeamDataOperator.class);

Next, use this operator to create PCollection from Model.

  // some imports omitted, including these for clarity
  import org.apache.beam.sdk.Pipeline;
  import org.apache.beam.sdk.transforms.Count;
  import org.apache.beam.sdk.transforms.WithKeys;
  import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
  import org.apache.beam.sdk.transforms.windowing.FixedWindows;
  import org.apache.beam.sdk.transforms.windowing.Window;
  import org.apache.beam.sdk.values.KV;
  import org.apache.beam.sdk.values.PCollection;
  import org.joda.time.Duration;

  Pipeline pipeline = Pipeline.create();
  PCollection<StreamElement> input = operator.getStream(
      pipeline, Position.OLDEST, false, true,
      model.getEvent().getDataDescriptor());
  PCollection<KV<String, Long>> counted =
      input
          .apply(
              Window.<StreamElement>into(FixedWindows.of(Duration.standardMinutes(1)))
                  .triggering(AfterWatermark.pastEndOfWindow())
                  .discardingFiredPanes())
          .apply(
              WithKeys.of(
                  el ->
                      model
                          .getEvent()
                          .getDataDescriptor()
                          .valueOf(el)
                          .map(BaseEvent::getProductId)
                          .orElse("")))
          .apply(Count.perKey());

  // do something with the output

Online Java docs

Build notes

CI is run only against changed modules (and its dependents) in pull requests. To completely rebuild the whole project in a PR push a commit with commit message 'rebuild'. After the build, you can squash and remove the commit.

proxima-platform's People

Contributors

dependabot[bot] avatar dmvk avatar je-ik avatar jozovilcek avatar lestr avatar o2-be-build avatar tisonet avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

proxima-platform's Issues

Create consistently partitioned stream views

Current abstraction of observers is insufficient, we need to be able to consistently partition the elements in a commit log. In order to achieve this, we need introduce a partitioned view for a commit-log attribute family. This view can then be observed via RemoteLogObserver, with following features:

  • the onNext method will take a Collector, which will collect results
  • these results will be available as euphoria's Dataset
  • the PartitionedView's observe method will return the Dataset
  • the flow will be deployed in a standard euphoria way, by calling Flow.submit()
  • RemoteLogObserver needs to be Serializable.

Error on close of bulk to gcloud storage

java.io.IOException: write beyond end of stream
	at java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:201)
	at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
	at java.io.DataOutputStream.write(DataOutputStream.java:107)
	at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
	at cz.o2.proxima.gcloud.storage.BinaryBlob$Writer.writeBytes(BinaryBlob.java:69)
	at cz.o2.proxima.gcloud.storage.BinaryBlob$Writer.write(BinaryBlob.java:86)
	at cz.o2.proxima.gcloud.storage.BulkGCloudStorageWriter.write(BulkGCloudStorageWriter.java:120)
	at cz.o2.proxima.server.IngestServer$3.lambda$writeInternal$1(IngestServer.java:870)
	at net.jodah.failsafe.Functions$10.call(Functions.java:252)
	at net.jodah.failsafe.SyncFailsafe.call(SyncFailsafe.java:145)
	at net.jodah.failsafe.SyncFailsafe.run(SyncFailsafe.java:81)
	at cz.o2.proxima.server.IngestServer$3.writeInternal(IngestServer.java:868)
	at cz.o2.proxima.server.IngestServer$3.onNextInternal(IngestServer.java:859)
	at cz.o2.proxima.storage.commitlog.RetryableBulkObserver.onNextInternal(RetryableBulkObserver.java:85)
	at cz.o2.proxima.storage.commitlog.RetryableBulkObserver.onNext(RetryableBulkObserver.java:46)
	at cz.o2.proxima.storage.pubsub.PubSubReader.lambda$observeBulk$1(PubSubReader.java:146)
	at cz.o2.proxima.storage.pubsub.PubSubReader.lambda$consume$2(PubSubReader.java:232)
	at cz.o2.proxima.pubsub.shaded.com.google.cloud.pubsub.v1.MessageDispatcher$4.run(MessageDispatcher.java:405)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Add data processing API to generated code

The code generated by config compiler should generate access methods for Datasets. Transformations applied to these datasets should then be possible to execute on different executors using Euphoria API. Basically, this means moving parts of code from tools to core.

Query language

In order to be able to perform at least basic analytical queries against the data stored in the platform we need to extend our query language to support batch queries.

Currently the query language is really basic and not flexible enough, e.g.

env.gateway.device.streamFromOldest().filter({ it.attribute != "device.ZWAVE_1" && it.value != null }).windowAll().reduce({ new Tuple(it.key, it.attribute) }, null, { a, b -> b }).filter({ it.second != null }).map({ it.second }).filter({ !it.value.payload.contains("Smoke Sensor") && !it.value.payload.contains("Door/Window Sensor") && !it.value.payload.contains("Power Meter") }).forEach({ println it.key + ": " + it.value.payload })

We need to develop query language that is easy to use and has sufficient ability to express the analytical queries.

Cleanup: remove Serializable from writers and readers

Only DataAccessor should be Serializable and all implementations should work as factories returning non serializable readers and writers. This is due to the fact, that the clients are rarely Serializable and therefore make implementation of the Serializable interface complicated (using transient fields and lazy initialization).

Support seamless replication

Need support for multimaster-multislave replication schemes accross multiple proxima deployments with eventual consistency. Required features:

  • the replication will work on commit logs only - all other consistency and replication will occur inside given proxima deployment
  • easy configuration
  • ability to observe non replicated items in commit log only (defaults to whole commit log in case there is no replication)

Draft of configuration:

 entities {
   original {
     attrA: { scheme: "bytes" }
     attrB: { scheme: "bytes" }
   }
 }
 replication {
   replicate-original-attrA {
     entity: original
     attributes: [ "attrA" ] # can be "*"
     # declare attribute families that will be written when update happens
     targets {
       first {
         // attribute family specifier
       }
       second {
         // attribute family specifier
       }
     }
     # declare single source family that will be read and written to local commit log
     source {
       // attribute family specifier
     }
   }
   replicate-original-all {
     entity: original
     attributes: [ "*" ] # replicate all attributes with some other settings
     targets { ... }
     source { ... }
   }
 }

In order to accomplish this we need to:

  • create implicit attribute for each replicated one (prefixed with _<replication_name>_s_ for source attribute family, _<replication_name>_t_<target_name>_ for target attribute), _<replication_name>_ for direct writes and _<replication_name>_r_ for the replicated attribute containing all replicated data (which will be read instead of original attribute, which becames virtual)
  • relax AttributeProxy so that it can be used only for writes (i.e. attribute is modified only on direct write, read happens on actual attribute) - this will be used to transform writes to attribute X into _<replication_name>_X
  • start replication of _<replication_name>_X into X
  • start replication of _<replication_name>_source_X into X
  • start replication of _<replication_name>_X into each of _<replication_name>_target_<target_name>_X

Add generic entity transformation

Each attribute family must have option to specify generic transformation function that will convert given entity to some other entity and/or attribute. This is to facilitate automatic entity transformation and migration of such transformating pipelines.

Add health check metric

Add health check that will signal ansible playbook that the instance is operating normally and that it can proceed to another instance to prevent possibility of rolling install bringing the whole application down.

Add websocket source

Add source that reads streaming data from websocket (single partition, no commits or checkpoints).

Create multi-family random access reader

It is necessary to allow reading from multiple random-access families by a single (wrapper) random access reader. The reader might be constructed using a builder:

 RandomAccessReader multi = MultiRandomReader.newBuilder()
     .addReader(reader1, attr1, attr2, attr3)
     .addReader(reader2, attr4, attr5)
     ...
     .build()

Use thread pool for observers

Currently, observers start their own threads outside of any thread pool or executor. We should use some Executor for that. This should be configured on Repository and used platform-wide.

Enable attribute family to be persisted by stateful operation(s)

Currently, each attribute family is persisted via stateless element-wise (map) operation. For some use-cases, this is not sufficient (HFile bulk importing). We will relax this restriction by the following:

  • annotate attribute family with pipeline field which will declare class name which will take as input PCollection<StreamElement> and will persist it into desired storage
  • when any attribute family is annotated with the pipeline keyword, loading of such config into replication (ingest) server must fail
  • create replication-pipeline module, which will create single Beam's Pipeline from the config that upon submission will perform all replications

Create partitioned cached view

The platform's inputs have to have an option to consume somehow partitioned view of updates to some attribute(s). We will achieve this by specifying (optional) implementation of this on level of attribute family. The view will be updatable and will have callbacks to be called when internal data are updated.

Some tests are flaky

These tests fail from time to time.

testWrite[0](cz.o2.proxima.gcloud.storage.BulkGCloudStorageWriterTest)  Time elapsed: 6.134 sec  <<< ERROR!
org.junit.runners.model.TestTimedOutException: test timed out after 2000 milliseconds
	at cz.o2.proxima.gcloud.storage.BulkGCloudStorageWriterTest.testWrite(BulkGCloudStorageWriterTest.java:137)

testWrite[1](cz.o2.proxima.gcloud.storage.BulkGCloudStorageWriterTest)  Time elapsed: 6.117 sec  <<< ERROR!
org.junit.runners.model.TestTimedOutException: test timed out after 2000 milliseconds
	at cz.o2.proxima.gcloud.storage.BulkGCloudStorageWriterTest.testWrite(BulkGCloudStorageWriterTest.java:137)

Multidimensional wildcard attributes

It seems unavoidable to enable multidimensional wildcard attributes. Currently, the platform supports only single dimensional (i.e. attribute.<suffix>), we need to enable multiple dimensions by attribute.<first>.<second>.... with appropriate semantics including prefix listing, etc.

Outage of tests running on Stage env.

We have encountered outage of tests running on stage env. Logs are enclosed below.

Nov 9 08:38:41 mujsignal-205 pexeso-ingest[1144]: java.lang.RuntimeException: java.lang.ArrayIndexOutOfBoundsException: 1782419
Nov 9 08:38:41 mujsignal-205 pexeso-ingest[1144]: #011at cz.o2.proxima.storage.kafka.KafkaCommitLog$2.onPartitionsAssigned(KafkaCommitLog.java:417)
Nov 9 08:38:41 mujsignal-205 pexeso-ingest[1144]: #011at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
Nov 9 08:38:41 mujsignal-205 pexeso-ingest[1144]: #011at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367)
Nov 9 08:38:41 mujsignal-205 pexeso-ingest[1144]: #011at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
Nov 9 08:38:41 mujsignal-205 pexeso-ingest[1144]: #011at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
Nov 9 08:38:41 mujsignal-205 pexeso-ingest[1144]: #011at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
Nov 9 08:38:41 mujsignal-205 pexeso-ingest[1144]: #011at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
Nov 9 08:38:41 mujsignal-205 pexeso-ingest[1144]: #011at cz.o2.proxima.storage.kafka.KafkaCommitLog.processConsumerWithObserver(KafkaCommitLog.java:563)
Nov 9 08:38:41 mujsignal-205 pexeso-ingest[1144]: #011at cz.o2.proxima.storage.kafka.KafkaCommitLog.processConsumer(KafkaCommitLog.java:525)
Nov 9 08:38:41 mujsignal-205 pexeso-ingest[1144]: #011at cz.o2.proxima.storage.kafka.KafkaCommitLog.lambda$observePartitionsBulk$7(KafkaCommitLog.java:432)
Nov 9 08:38:41 mujsignal-205 pexeso-ingest[1144]: #011at java.lang.Thread.run(Thread.java:748)
Nov 9 08:38:41 mujsignal-205 pexeso-ingest[1144]: Caused by: java.lang.ArrayIndexOutOfBoundsException: 1782419
Nov 9 08:38:41 mujsignal-205 pexeso-ingest[1144]: #011at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:76)
Nov 9 08:38:41 mujsignal-205 pexeso-ingest[1144]: #011at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:50)
Nov 9 08:38:41 mujsignal-205 pexeso-ingest[1144]: #011at java.io.DataOutputStream.writeInt(DataOutputStream.java:197)
Nov 9 08:38:41 mujsignal-205 pexeso-ingest[1144]: #011at org.apache.hadoop.io.SequenceFile$Writer.sync(SequenceFile.java:1299)
Nov 9 08:38:41 mujsignal-205 pexeso-ingest[1144]: #011at org.apache.hadoop.io.SequenceFile$BlockCompressWriter.sync(SequenceFile.java:1539)
Nov 9 08:38:41 mujsignal-205 pexeso-ingest[1144]: #011at org.apache.hadoop.io.SequenceFile$BlockCompressWriter.close(SequenceFile.java:1569)
Nov 9 08:38:41 mujsignal-205 pexeso-ingest[1144]: #011at cz.o2.proxima.storage.hdfs.HdfsDataAccessor.rollback(HdfsDataAccessor.java:125)
Nov 9 08:38:41 mujsignal-205 pexeso-ingest[1144]: #011at cz.o2.proxima.server.IngestServer$3.onRestart(IngestServer.java:885)
Nov 9 08:38:41 mujsignal-205 pexeso-ingest[1144]: #011at cz.o2.proxima.storage.kafka.KafkaCommitLog$2.onPartitionsAssigned(KafkaCommitLog.java:397)
Nov 9 08:38:41 mujsignal-205 pexeso-ingest[1144]: #11... 10 more

Add support for marking StreamElement as replicated

In order to be able to consistently replicate data between two environments in master-master fashion, we need to be able to distinguish between inputs that have already been replicated, because otherwise we might end up in replication loops.

Handle offset commit and reset in sources

We need to be able to correctly handle offset committing and rewinding in places where sources are converted to Euphoria. This applies currently to tools and partitionedview.

Terminated thread pool in cassandra io

Feb 20 10:17:14 sb-ingest3 pexeso-ingest[1110]: 2018-02-20 10:17:14 ERROR CassandraWriter:53 - Failed to ingest record StreamElement(uuid=9ed40715-15e8-4c68-b21b-8e6a04747c76, entityDesc=EntityDescriptor(comm_message), attributeDesc=AttributeDescriptor(entity=comm_message, name=audit), key=d6a75626-161e-11e8-aba4-00505602142b, attribute=audit, stamp=1519118234096, value.length=672) into cassandra
Feb 20 10:17:14 sb-ingest3 pexeso-ingest[1110]: com.datastax.driver.core.exceptions.DriverInternalError: Unexpected exception thrown
Feb 20 10:17:14 sb-ingest3 pexeso-ingest[1110]: #011at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:39)
Feb 20 10:17:14 sb-ingest3 pexeso-ingest[1110]: #011at com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:104)
Feb 20 10:17:14 sb-ingest3 pexeso-ingest[1110]: #011at cz.o2.proxima.storage.cassandra.CacheableCQLFactory.prepare(CacheableCQLFactory.java:340)
Feb 20 10:17:14 sb-ingest3 pexeso-ingest[1110]: #011at cz.o2.proxima.storage.cassandra.CacheableCQLFactory.getPreparedStatement(CacheableCQLFactory.java:150)
Feb 20 10:17:14 sb-ingest3 pexeso-ingest[1110]: #011at cz.o2.proxima.storage.cassandra.DefaultCQLFactory.elementInsert(DefaultCQLFactory.java:164)
Feb 20 10:17:14 sb-ingest3 pexeso-ingest[1110]: #011at cz.o2.proxima.storage.cassandra.DefaultCQLFactory.getWriteStatement(DefaultCQLFactory.java:110)
Feb 20 10:17:14 sb-ingest3 pexeso-ingest[1110]: #011at cz.o2.proxima.storage.cassandra.CassandraWriter.write(CassandraWriter.java:47)
Feb 20 10:17:14 sb-ingest3 pexeso-ingest[1110]: #011at cz.o2.proxima.server.IngestServer.ingestRequest(IngestServer.java:589)
Feb 20 10:17:14 sb-ingest3 pexeso-ingest[1110]: #011at cz.o2.proxima.server.IngestServer.writeRequest(IngestServer.java:515)
Feb 20 10:17:14 sb-ingest3 pexeso-ingest[1110]: #011at cz.o2.proxima.server.IngestServer.processSingleIngest(IngestServer.java:476)
Feb 20 10:17:14 sb-ingest3 pexeso-ingest[1110]: #011at cz.o2.proxima.server.IngestServer.access$000(IngestServer.java:82)
Feb 20 10:17:14 sb-ingest3 pexeso-ingest[1110]: #011at cz.o2.proxima.server.IngestServer$IngestService.ingest(IngestServer.java:110)
Feb 20 10:17:14 sb-ingest3 pexeso-ingest[1110]: #011at cz.o2.pexeso.server.IngestServer$BackwardsCompatibleIngestService.ingest(IngestServer.java:50)
Feb 20 10:17:14 sb-ingest3 pexeso-ingest[1110]: #011at cz.o2.pexeso.proto.service.IngestServiceGrpc$MethodHandlers.invoke(IngestServiceGrpc.java:330)
Feb 20 10:17:14 sb-ingest3 pexeso-ingest[1110]: #011at io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:174)
Feb 20 10:17:14 sb-ingest3 pexeso-ingest[1110]: #011at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:271)
Feb 20 10:17:14 sb-ingest3 pexeso-ingest[1110]: #011at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:616)
Feb 20 10:17:14 sb-ingest3 pexeso-ingest[1110]: #011at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
Feb 20 10:17:14 sb-ingest3 pexeso-ingest[1110]: #011at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:107)
Feb 20 10:17:14 sb-ingest3 pexeso-ingest[1110]: #011at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
Feb 20 10:17:14 sb-ingest3 pexeso-ingest[1110]: #011at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
Feb 20 10:17:14 sb-ingest3 pexeso-ingest[1110]: #011at java.lang.Thread.run(Thread.java:748)
Feb 20 10:17:14 sb-ingest3 pexeso-ingest[1110]: Caused by: java.util.concurrent.RejectedExecutionException: Task cz.o2.proxima.cassandra.shaded.com.google.common.util.concurrent.Futures$2$1@7e0ed157 rejected from java.util.concurrent.ThreadPoolExecutor@2c8fb64f[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1]
Feb 20 10:17:14 sb-ingest3 pexeso-ingest[1110]: #011at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
Feb 20 10:17:14 sb-ingest3 pexeso-ingest[1110]: #011at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
Feb 20 10:17:14 sb-ingest3 pexeso-ingest[1110]: #011at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
Feb 20 10:17:14 sb-ingest3 pexeso-ingest[1110]: #011at cz.o2.proxima.cassandra.shaded.com.google.common.util.concurrent.MoreExecutors$ListeningDecorator.execute(MoreExecutors.java:556)
Feb 20 10:17:14 sb-ingest3 pexeso-ingest[1110]: #011at cz.o2.proxima.cassandra.shaded.com.google.common.util.concurrent.Futures$2.execute(Futures.java:1174)
Feb 20 10:17:14 sb-ingest3 pexeso-ingest[1110]: #011at cz.o2.proxima.cassandra.shaded.com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:817)
Feb 20 10:17:14 sb-ingest3 pexeso-ingest[1110]: #011at cz.o2.proxima.cassandra.shaded.com.google.common.util.concurrent.AbstractFuture.addListener(AbstractFuture.java:595)
Feb 20 10:17:14 sb-ingest3 pexeso-ingest[1110]: #011at cz.o2.proxima.cassandra.shaded.com.google.common.util.concurrent.Futures.transformAsync(Futures.java:1153)
Feb 20 10:17:14 sb-ingest3 pexeso-ingest[1110]: #011at com.datastax.driver.core.GuavaCompatibility$Version19OrHigher.transformAsync(GuavaCompatibility.java:210)
Feb 20 10:17:14 sb-ingest3 pexeso-ingest[1110]: #011at com.datastax.driver.core.SessionManager.toPreparedStatement(SessionManager.java:196)
Feb 20 10:17:14 sb-ingest3 pexeso-ingest[1110]: #011at com.datastax.driver.core.SessionManager.prepareAsync(SessionManager.java:157)
Feb 20 10:17:14 sb-ingest3 pexeso-ingest[1110]: #011at com.datastax.driver.core.AbstractSession.prepareAsync(AbstractSession.java:126)
Feb 20 10:17:14 sb-ingest3 pexeso-ingest[1110]: #011at com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:102)
Feb 20 10:17:14 sb-ingest3 pexeso-ingest[1110]: #011... 20 more

Validate exceptions thrown on misconfiguration

When a miscofiguration occurs, we need to provide user with descriptive exception explaining the problem. Currently, when using type: primary for storage without online writing capability, we get

Exception in thread "main" java.lang.ClassCastException: cz.o2.proxima.storage.hdfs.HdfsDataAccessor cannot be cast to cz.o2.proxima.storage.OnlineAttributeWriter
   at cz.o2.proxima.storage.AttributeWriterBase.online(AttributeWriterBase.java:47)
   at cz.o2.proxima.repository.Repository.lambda$readAttributeFamilies$17(Repository.java:553)
   at java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1691)
   at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
   at cz.o2.proxima.repository.Repository.readAttributeFamilies(Repository.java:543)
   at cz.o2.proxima.repository.Repository.<init>(Repository.java:242)
   at cz.o2.proxima.repository.Repository.<init>(Repository.java:58)
   at cz.o2.proxima.repository.Repository$Builder.build(Repository.java:120)
   at cz.o2.proxima.repository.Repository.of(Repository.java:67)
   at cz.o2.proxima.server.IngestServer.<init>(IngestServer.java:440)
   at cz.o2.proxima.server.IngestServer.main(IngestServer.java:84)

The correct explanation would be more like "Do not use bulk storages for primary storage".

PubSubReader: IndexOutOfBoundsException

At certain conditions the PubSubReader might throw following exception:

java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
	at java.util.ArrayList.rangeCheck(ArrayList.java:657)
	at java.util.ArrayList.get(ArrayList.java:433)
	at cz.o2.proxima.storage.pubsub.PubSubReader.lambda$null$2(PubSubReader.java:197)
	at java.util.concurrent.atomic.AtomicReference.updateAndGet(AtomicReference.java:179)
	at cz.o2.proxima.storage.pubsub.PubSubReader.lambda$null$3cafaa0b$1(PubSubReader.java:181)
	at cz.o2.proxima.storage.commitlog.BulkLogObserver$OffsetCommitter.confirm(BulkLogObserver.java:57)
	at cz.o2.proxima.source.UnboundedStreamSource$1.lambda$commitOffset$0(UnboundedStreamSource.java:143)
	at java.util.ArrayList.forEach(ArrayList.java:1255)
	at cz.o2.proxima.source.UnboundedStreamSource$1.commitOffset(UnboundedStreamSource.java:143)
	at cz.o2.proxima.source.UnboundedStreamSource$1.commitOffset(UnboundedStreamSource.java:103)
	at cz.seznam.euphoria.executor.local.LocalExecutor$UnboundedPartitionSupplierStream.get(LocalExecutor.java:119)
	at cz.seznam.euphoria.executor.local.LocalExecutor.lambda$execMap$7(LocalExecutor.java:628)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Fix dependencies

Some artifacts include conflicting dependencies (i.e. netty, commons, ...). This issues is about fixing these dependencies (by exclusions).

Create IO for PubSub

Implement Google PubSub commitlog and partitionedview. The partitionedview will be implemented by Apache Beam Repartition.

Add test coverage to build

We need to improve overall test coverage by incorporating automatic test coverage report and failing build on low coverage conditions.

Create ProximaIO for Apache Beam

Create PoC of ProximaIO which will create PCollection from given attributes and/or will be able to persist PCollection into given attribute(s).

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.