Code Monkey home page Code Monkey logo

stream-collector's People

Contributors

aldemirenes avatar alexanderdean avatar alexbenny avatar aparra avatar benfradet avatar benjben avatar capcawork avatar christoph-buente avatar chuwy avatar colmsnowplow avatar dilyand avatar duncan avatar fblundun avatar istreeter avatar jankoulaga avatar jatinder85 avatar jbeemster avatar lmath avatar lukeindykiewicz avatar miike avatar mirkoprescha avatar mmathias01 avatar oguzhanunlu avatar peel avatar pondzix avatar sambo1972 avatar spenes avatar stanch avatar voropaevp avatar zcei avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

stream-collector's Issues

Scala Stream Collector: support IP anonymization modifier

Perhaps we should think of IP anonymization as a modifier, not an enrichment, and we should make it available in the collector.

We have historically avoided doing any kind of transformation in the collector, but this can cause issues with PII.

Scala Stream Collector: improve graceful shutdown

Project: Scala Stream Collector (Kinesis)

Version: 0.14.0

Expected behavior: Running scala stream collector within Kubernetes environment when an instance/container is being signalled to terminate. It should stop accepting requests and clear off existing tasks into the sink ASAP before fully terminating.

Actual behavior: Terminate with an error

java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@51d2451 rejected from java.util.concurrent.ScheduledThreadPoolExecutor@4e8777ab[Shutting down, pool size = 8, active threads = 1, queued tasks = 2, completed tasks = 5]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
	at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)
	at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)
	at java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622)
	at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:136)
	at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
	at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
	at scala.concurrent.Promise$class.complete(Promise.scala:55)
	at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:157)
	at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:23)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Steps to reproduce:

  1. Have 2 or more scala stream collector running in Kubernetes environment.
  2. Concurrently collect data and scale down the number of replicas to 1 and watch the logs for those terminating pods.

Scala Stream Collector: move in-memory event buffer to RocksDB?

At the moment, if the collector cannot write to Kinesis, the events will back-up in an in-memory buffer.

Should we change this to an on-disk KV store, like RocksDB?

Pros:

  • Events will survive a box bounce
  • Events will survive a restart (we might want to restart to attempt a config change)
  • Less risk of blowing the JVM Heap?

Cons:

  • More moving parts
  • Performance slowdown

Scala Stream Collector: add return path for collector cookie value

Three options:

  1. POST to a new endpoint which returns a self-describing JSON containing the cookie
  2. A special endpoint which redirects to a URL containing the cookie
  3. The response to the existing POST requests (see also snowplow/snowplow#2605)

I think I am leaning towards 1 as it is the most modular (it doesn't require the JavaScript Tracker to use; it doesn't require changes to be made to the trackers).

Post fails: No 'Access-Control-Allow-Origin' header is present

When posting to the Kafka Stream Collector, the browser blocks the request, and says:

has been blocked by CORS policy: No 'Access-Control-Allow-Origin' header is present on the requested resource.

Does the POST method not spit out the Access-Control-Allow-Origin header or am I missing something?

The POST comes from domain.com and goes to info.domain.com:8300 where the collector is listening.

Stream Collector: add stdout artifact

Project: Scala Stream Collector - stdout

Version: 1.0.0

Expected behavior: The jar should be created with name - snowplow-stream-collector-stdout-{$versionn-no}.jar

Actual behavior: The project just clean and compile and no jar is created

Steps to reproduce:
sbt "project stdout" assembly

  1. Just try to create an assembly for scala stream collector module where the output set to stdout -
    using the command -
    sbt "project stdout" assembly

PR already create to resolve it -
snowplow/snowplow#4278

Scala Stream Collector: Add redirect option to the iglu adapter endpoint

Project: Scala Stream Collector

Version: 0.15.0

Expected behavior:

Actual behavior:

Steps to reproduce:

I’m implementing ad impression tracking and ad click tracking in Google Campaign Manager. I can’t do the click tracker as an encoded unstructured event schema because the generated URL goes over 500 chars. So unfortunately as a workaround I need to use a structured event with a made up structure. Eg:

https://tracker.mydomain.com/r/tp2?aid=my-app&e=se&se_ca=ad_tracker&se_ac=click&se_la=advertiserId:aaa,bannerId:bbb,campaignId:ccc,impressionId:ddd&u=https://redirect.to.this.url

It would be great to have an update to the generic iglu adapter so that it supports redirects. Eg:

https://tracker.mydomain.com/com.snowplowanalytics.iglu/v1?aid=my-app&schema=iglu:com.snowplowanalytics.snowplow/ad_impression/jsonschema/1-0-0&advertiserId=aaa&bannerId=bbb&campaignId=ccc&costModel=cpm&impressionId=ddd&u=https://redirect.to.this.url

This would make implementing click tracking much simpler and powerful at a time, not just in Google DCM, but in general in any other context.

Scala Stream Collector: implement write ahead log

If the streaming technology used (e.g. PubSub or Kinesis) is not available, the collector will keep on accumulating raw events in memory.

Those raw events should rather be flushed to disk for later recovery in a write ahead log.

scala-stream-collector throws TimeoutException when using nsq

Project: scala-stream-collector

Version: snowplow_scala_stream_collector_nsq_0.15.0.zip

Expected behavior:

Actual behavior:

[ERROR] [07/05/2019 09:07:05.449] [scala-stream-collector-akka.actor.default-dispatcher-10] [akka.actor.ActorSystemImpl(scala-stream-collector)] Error during processing of request: 'command: PUB adxpixelgood timedout'. Completing with 500 Internal Server Error response. To change default exception handling behavior, provide a custom ExceptionHandler. java.util.concurrent.TimeoutException: command: PUB adxpixelgood timedout at com.snowplowanalytics.client.nsq.Connection.commandAndWait(Connection.java:167) at com.snowplowanalytics.client.nsq.NSQProducer.produce(NSQProducer.java:113) at com.snowplowanalytics.client.nsq.NSQProducer.produceMulti(NSQProducer.java:82) at com.snowplowanalytics.snowplow.collectors.scalastream.sinks.NsqSink.storeRawEvents(NsqSink.scala:55) at com.snowplowanalytics.snowplow.collectors.scalastream.CollectorService.sinkEvent(CollectorService.scala:218) at com.snowplowanalytics.snowplow.collectors.scalastream.CollectorService.cookie(CollectorService.scala:113) at com.snowplowanalytics.snowplow.collectors.scalastream.CollectorRoute$$anonfun$collectorRoute$1$$anonfun$apply$1$$anonfun$apply$2$$anonfun$apply$3$$anonfun$apply$11$$anonfun$apply$12.apply(CollectorRoute.scala:87) at com.snowplowanalytics.snowplow.collectors.scalastream.CollectorRoute$$anonfun$collectorRoute$1$$anonfun$apply$1$$anonfun$apply$2$$anonfun$apply$3$$anonfun$apply$11$$anonfun$apply$12.apply(CollectorRoute.scala:86) at akka.http.scaladsl.server.Directive$$anonfun$addByNameNullaryApply$1$$anonfun$apply$13.apply(Directive.scala:134) at akka.http.scaladsl.server.Directive$$anonfun$addByNameNullaryApply$1$$anonfun$apply$13.apply(Directive.scala:134) at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8$$anonfun$apply$9$$anonfun$apply$10.apply(Directive.scala:93) at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8$$anonfun$apply$9$$anonfun$apply$10.apply(Directive.scala:93) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$mapRouteResult$1$$anonfun$apply$3.apply(BasicDirectives.scala:61) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$mapRouteResult$1$$anonfun$apply$3.apply(BasicDirectives.scala:61) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154) at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8.apply(Directive.scala:93) at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8.apply(Directive.scala:90) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$mapRequestContext$1$$anonfun$apply$1.apply(BasicDirectives.scala:43) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$mapRequestContext$1$$anonfun$apply$1.apply(BasicDirectives.scala:43) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154) at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1$$anonfun$apply$1.apply(RouteConcatenation.scala:47) at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1$$anonfun$apply$1.apply(RouteConcatenation.scala:44) at akka.http.scaladsl.util.FastFuture$.akka$http$scaladsl$util$FastFuture$$strictTransform$1(FastFuture.scala:41) at akka.http.scaladsl.util.FastFuture$.transformWith$extension1(FastFuture.scala:45) at akka.http.scaladsl.util.FastFuture$.flatMap$extension(FastFuture.scala:26) at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:44) at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:42) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154) at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8$$anonfun$apply$9$$anonfun$apply$10.apply(Directive.scala:93) at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8$$anonfun$apply$9$$anonfun$apply$10.apply(Directive.scala:93) at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8$$anonfun$apply$9$$anonfun$apply$10.apply(Directive.scala:93) at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8$$anonfun$apply$9$$anonfun$apply$10.apply(Directive.scala:93) at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8$$anonfun$apply$9$$anonfun$apply$10.apply(Directive.scala:93) at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8$$anonfun$apply$9$$anonfun$apply$10.apply(Directive.scala:93) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154) at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8.apply(Directive.scala:93) at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8.apply(Directive.scala:90) at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8.apply(Directive.scala:93) at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8.apply(Directive.scala:90) at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8.apply(Directive.scala:93) at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8.apply(Directive.scala:90) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154) at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8$$anonfun$apply$11.apply(Directive.scala:94) at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8$$anonfun$apply$11.apply(Directive.scala:93) at akka.http.scaladsl.util.FastFuture$.akka$http$scaladsl$util$FastFuture$$strictTransform$1(FastFuture.scala:41) at akka.http.scaladsl.util.FastFuture$.transformWith$extension1(FastFuture.scala:45) at akka.http.scaladsl.util.FastFuture$.flatMap$extension(FastFuture.scala:26) at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8.apply(Directive.scala:93) at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8.apply(Directive.scala:90) at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8$$anonfun$apply$11.apply(Directive.scala:94) at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8$$anonfun$apply$11.apply(Directive.scala:93) at akka.http.scaladsl.util.FastFuture$.akka$http$scaladsl$util$FastFuture$$strictTransform$1(FastFuture.scala:41) at akka.http.scaladsl.util.FastFuture$.transformWith$extension1(FastFuture.scala:45) at akka.http.scaladsl.util.FastFuture$.flatMap$extension(FastFuture.scala:26) at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8.apply(Directive.scala:93) at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8.apply(Directive.scala:90) at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:44) at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:42) at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:44) at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:42) at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:44) at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:42) at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:44) at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:42) at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:44) at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:42) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$mapRouteResultWith$1$$anonfun$apply$4.apply(BasicDirectives.scala:67) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$mapRouteResultWith$1$$anonfun$apply$4.apply(BasicDirectives.scala:67) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154) at akka.http.scaladsl.server.directives.ExecutionDirectives$$anonfun$handleExceptions$1$$anonfun$apply$1.apply(ExecutionDirectives.scala:32) at akka.http.scaladsl.server.directives.ExecutionDirectives$$anonfun$handleExceptions$1$$anonfun$apply$1.apply(ExecutionDirectives.scala:28) at akka.http.scaladsl.server.Route$$anonfun$asyncHandler$1.apply(Route.scala:79) at akka.http.scaladsl.server.Route$$anonfun$asyncHandler$1.apply(Route.scala:78) at akka.stream.impl.fusing.MapAsync$$anon$24.onPush(Ops.scala:1169) at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:747) at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:710) at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:616) at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:471) at akka.stream.impl.fusing.GraphInterpreterShell.receive(ActorGraphInterpreter.scala:423) at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:603) at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:618) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:529) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Steps to reproduce:

Scala Stream Collector: add endpoint to retrieve network_userid

It would be really useful if the scala collector had an enpoint that would return the network user id held in the third party cookie. This would allow me to enrich api requests so that my server can make snowplow calls with the corresponding network user id to allow for easy tracking, I accept some may have third party cookies disabled however in the main this would be really useful and avoid me trying to join using a mixture of ip/footprint/datetime.

Scala Stream Collector: nuid behavior inconsistent with clojure collector

See snowplow/snowplow#3450

Above bug exists only in Clojure Collector. In SSC (through Snowplow Mini) we were able to get Iglu Adapter and custom-set nuid work, which at first made me believe that there's no nuid in SSC payload at all, but it seems it's logic is just different.

Instead of just always adding there content of cookie (as in Clojure Collector) - it first checks if nuid is present in tracker's payload and if so - adds its content as tracker payload, otherwise - uses cookie's value.

So, I'm not sure which colloector's behavior is more correct, but they're clearly inconsistent.

Stream Collector: Cookie Bounce feature would not work if the tracker sends the payload via POST

There are a few issues here.

  • The cookie bounce will never happen for POST, because in that case, response is not expected to be a pixel, so this branch
    val bounce = config.cookieBounce.enabled && nuidOpt.isEmpty && !bouncing && pixelExpected && !redirect
    will never be true.
  • In case of post, the returning status code must be 307, otherwise the client will not re-use the same payload for a redirect, and that's where the bigger problem lies. Some browsers (like Firefox) will not follow a 307 POST redirect unless the xhr content type is set to application/x-www-form-urlencoded.

So let's assume that i've modified the collector to return a 307, i've modified the JS tracker to send data as urlencoded form, the next thing that needs to be done is to allow that content type to be pushed to the sink.

So far it looks like a lot of things need to be changed, and still there are no guarantees that it would work for esoteric browsers such as IE 8.

Do you guys have any thoughts on this problem and the right way to solve it?

Kafka stream collector -Error registering AppInfo mbean

During startup of the Kafka Stream Collector it shows this:
(snowplow_scala_stream_collector_kafka_0.16.0)

[main] WARN org.apache.kafka.common.utils.AppInfoParser - Error registering AppInfo mbean
821]: javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=snowplow
821]: at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
821]: at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
821]: at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
821]: at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
821]: at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
821]: at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
821]: at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62)
821]: at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:423)
821]: at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:297)
821]: at com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KafkaSink.createProducer(KafkaSink.scala:60)
821]: at com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KafkaSink.<init>(KafkaSink.scala:35)
821]: at com.snowplowanalytics.snowplow.collectors.scalastream.KafkaCollector$.main(KafkaCollector.scala:33)
821]: at com.snowplowanalytics.snowplow.collectors.scalastream.KafkaCollector.main(KafkaCollector.scala)

However, the entries are still pushed towards Kafka successfully. As far as my knowledge goes, I think this has to do with metrics stuff, but not sure.

Scala Stream Collector: allow placeholders in redirect url to be filled with event data

When i call the collector endpoint to register an ad_click event and then redirect the user to the actual content, i would like to use placeholders within that redirect url, that are being filled with actual data from the event, like referer, network_id, or any other information which is not part of the query paramter. Like this:

http://{{collector_url}}/r/tp2?u=http%3A%2F%2Fexample.com%26nuid={{NUID}}%26duid={{DUID}}%ref={{REF}}

And this would result in a redirect to

http://example.com?nuid=07a79f8a-b24d-44f2-8292-8043653374ea&duid=0e1eeb13-ba55-4eb7-8344-00fb949f964b&ref=google.com

Scala Stream Collector: implement back-pressure

With the current architecture, if we have a malfunctioning Kinesis stream, then we will:

  1. Continue to accept events from a tracker cache and 200 them (so tracker deletes them from cache)
  2. Add all these events to an in-memory buffer in the hope that Kinesis will start functioning again

In other words - we don't implement back-pressure, and we take a significant risk that we will end up with at-most-once processing rather than at-least-once processing.

A better approach would be to set a failure threshold (e.g. Kinesis out of action for X minutes, or N events in the in-memory buffer), and if that is reached, then we start returning 503s (or similar) and the trackers keep the events on their side...

Placeholder for tracker telling collector if it has a cache or not

Only a tracker knows if it has a chance to re-send an event on failure.

After #32, we could consider some kind of hinting so the tracker can tell the collector to really, really try to send the event - i.e. the tracker is handing off the event to the collector, and the collector has to make best efforts to send it.

[13:41] Ed Lewis: Are all devices good at caching data before sending to a tracker or if a tracker fails?
[13:41] Alex Dean: not all of them Ed
        sometimes that's a tracker limitation
        i.e. we just haven't implemented that caching yet
        but I can imagine situations where it would be impossible for a client device to cache
        e.g. very locked down envs, or IoT
[13:42] Ed Lewis: oh ok - I'm just wondering if the tracker could ever absolutely guarantee that data will get processed
        so the backpressure stops there as it were
[13:43] Alex Dean: yeah it's quite nuanced
        for example, if a pixel is loaded in an email
[13:43] Ed Lewis: though I agree in the vast majority of situations it makes sense for the trackers to hold onto information if the collector stops returning OK's
[13:43] Alex Dean: there is no place to back that up
        it's only the tracker that knows if it has a capability to back up
        the collector has no clue

[13:47] Ed Lewis: yeah I see what you mean
[13:47] Joshua Beemster: Should we add something to let the collector know we can't backup the event if it is removed

Let scala collector deliver asset files

For third party load tests or monitoring services usually the domain ownership has to be proven. This could be done either by DNS TXT records or by uploading a file publicly available url. Can the scala collector be configured to serve a local file? Like

http://collector.mydomain/myfile.txt

In fact the collector is a spray server, so i guess it is possible.

Scala Stream Collector: send Snowplow hearbeat

Return a response something like:

{ "schema": "iglu:com.snowplowanalytics.sdl/stream_app/jsonschema/1-0-0",
  "data": {
    "name": "xxx",
    "description": "xxx",
    "cliArgs": "xxx",
    "version": "1.2.2",
    "tags": [ ],
    "in": [ { "protocol": "http" }, { "protocol": "https" } ],
    "outSuccess": { "protocol": "kinesis", "name": "raw", "schema": "iglu:com.snowplowanalytics.snowplow/CollectorPayload/thrift/1-0-0" },
    "outFailure": {}
  }
}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.