snowplow / stream-collector Goto Github PK
View Code? Open in Web Editor NEWCollector for cloud-native web, mobile and event analytics, running on AWS and GCP
Home Page: http://snowplowanalytics.com
License: Other
Collector for cloud-native web, mobile and event analytics, running on AWS and GCP
Home Page: http://snowplowanalytics.com
License: Other
Perhaps we should think of IP anonymization as a modifier, not an enrichment, and we should make it available in the collector.
We have historically avoided doing any kind of transformation in the collector, but this can cause issues with PII.
Placeholder for now but logging for every failed record seems too fine-grained
We could group errors by failures.
Review the whole logging story anyway
Project: Scala Stream Collector (Kinesis)
Version: 0.14.0
Expected behavior: Running scala stream collector
within Kubernetes environment when an instance/container is being signalled to terminate. It should stop accepting requests and clear off existing tasks into the sink ASAP before fully terminating.
Actual behavior: Terminate with an error
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@51d2451 rejected from java.util.concurrent.ScheduledThreadPoolExecutor@4e8777ab[Shutting down, pool size = 8, active threads = 1, queued tasks = 2, completed tasks = 5]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)
at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)
at java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622)
at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:136)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at scala.concurrent.Promise$class.complete(Promise.scala:55)
at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:157)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:23)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Steps to reproduce:
scala stream collector
running in Kubernetes environment.In case it is changed again from 1MB. See snowplow/snowplow#1736
At the moment, if the collector cannot write to Kinesis, the events will back-up in an in-memory buffer.
Should we change this to an on-disk KV store, like RocksDB?
Pros:
Cons:
Three options:
I think I am leaning towards 1 as it is the most modular (it doesn't require the JavaScript Tracker to use; it doesn't require changes to be made to the trackers).
When posting to the Kafka Stream Collector, the browser blocks the request, and says:
has been blocked by CORS policy: No 'Access-Control-Allow-Origin' header is present on the requested resource.
Does the POST method not spit out the Access-Control-Allow-Origin
header or am I missing something?
The POST comes from domain.com
and goes to info.domain.com:8300
where the collector is listening.
DOCKER_USERNAME
DOCKER_PASSWORD
Project: Scala Stream Collector - stdout
Version: 1.0.0
Expected behavior: The jar should be created with name - snowplow-stream-collector-stdout-{$versionn-no}.jar
Actual behavior: The project just clean and compile and no jar is created
Steps to reproduce:
sbt "project stdout" assembly
PR already create to resolve it -
snowplow/snowplow#4278
Project: Scala Stream Collector
Version: 0.15.0
Expected behavior:
Actual behavior:
Steps to reproduce:
I’m implementing ad impression tracking and ad click tracking in Google Campaign Manager. I can’t do the click tracker as an encoded unstructured event schema because the generated URL goes over 500 chars. So unfortunately as a workaround I need to use a structured event with a made up structure. Eg:
It would be great to have an update to the generic iglu adapter so that it supports redirects. Eg:
This would make implementing click tracking much simpler and powerful at a time, not just in Google DCM, but in general in any other context.
If the streaming technology used (e.g. PubSub or Kinesis) is not available, the collector will keep on accumulating raw events in memory.
Those raw events should rather be flushed to disk for later recovery in a write ahead log.
Project: scala-stream-collector
Version: snowplow_scala_stream_collector_nsq_0.15.0.zip
Expected behavior:
Actual behavior:
[ERROR] [07/05/2019 09:07:05.449] [scala-stream-collector-akka.actor.default-dispatcher-10] [akka.actor.ActorSystemImpl(scala-stream-collector)] Error during processing of request: 'command: PUB adxpixelgood timedout'. Completing with 500 Internal Server Error response. To change default exception handling behavior, provide a custom ExceptionHandler. java.util.concurrent.TimeoutException: command: PUB adxpixelgood timedout at com.snowplowanalytics.client.nsq.Connection.commandAndWait(Connection.java:167) at com.snowplowanalytics.client.nsq.NSQProducer.produce(NSQProducer.java:113) at com.snowplowanalytics.client.nsq.NSQProducer.produceMulti(NSQProducer.java:82) at com.snowplowanalytics.snowplow.collectors.scalastream.sinks.NsqSink.storeRawEvents(NsqSink.scala:55) at com.snowplowanalytics.snowplow.collectors.scalastream.CollectorService.sinkEvent(CollectorService.scala:218) at com.snowplowanalytics.snowplow.collectors.scalastream.CollectorService.cookie(CollectorService.scala:113) at com.snowplowanalytics.snowplow.collectors.scalastream.CollectorRoute$$anonfun$collectorRoute$1$$anonfun$apply$1$$anonfun$apply$2$$anonfun$apply$3$$anonfun$apply$11$$anonfun$apply$12.apply(CollectorRoute.scala:87) at com.snowplowanalytics.snowplow.collectors.scalastream.CollectorRoute$$anonfun$collectorRoute$1$$anonfun$apply$1$$anonfun$apply$2$$anonfun$apply$3$$anonfun$apply$11$$anonfun$apply$12.apply(CollectorRoute.scala:86) at akka.http.scaladsl.server.Directive$$anonfun$addByNameNullaryApply$1$$anonfun$apply$13.apply(Directive.scala:134) at akka.http.scaladsl.server.Directive$$anonfun$addByNameNullaryApply$1$$anonfun$apply$13.apply(Directive.scala:134) at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8$$anonfun$apply$9$$anonfun$apply$10.apply(Directive.scala:93) at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8$$anonfun$apply$9$$anonfun$apply$10.apply(Directive.scala:93) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$mapRouteResult$1$$anonfun$apply$3.apply(BasicDirectives.scala:61) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$mapRouteResult$1$$anonfun$apply$3.apply(BasicDirectives.scala:61) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154) at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8.apply(Directive.scala:93) at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8.apply(Directive.scala:90) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$mapRequestContext$1$$anonfun$apply$1.apply(BasicDirectives.scala:43) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$mapRequestContext$1$$anonfun$apply$1.apply(BasicDirectives.scala:43) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154) at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1$$anonfun$apply$1.apply(RouteConcatenation.scala:47) at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1$$anonfun$apply$1.apply(RouteConcatenation.scala:44) at akka.http.scaladsl.util.FastFuture$.akka$http$scaladsl$util$FastFuture$$strictTransform$1(FastFuture.scala:41) at akka.http.scaladsl.util.FastFuture$.transformWith$extension1(FastFuture.scala:45) at akka.http.scaladsl.util.FastFuture$.flatMap$extension(FastFuture.scala:26) at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:44) at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:42) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154) at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8$$anonfun$apply$9$$anonfun$apply$10.apply(Directive.scala:93) at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8$$anonfun$apply$9$$anonfun$apply$10.apply(Directive.scala:93) at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8$$anonfun$apply$9$$anonfun$apply$10.apply(Directive.scala:93) at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8$$anonfun$apply$9$$anonfun$apply$10.apply(Directive.scala:93) at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8$$anonfun$apply$9$$anonfun$apply$10.apply(Directive.scala:93) at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8$$anonfun$apply$9$$anonfun$apply$10.apply(Directive.scala:93) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154) at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8.apply(Directive.scala:93) at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8.apply(Directive.scala:90) at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8.apply(Directive.scala:93) at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8.apply(Directive.scala:90) at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8.apply(Directive.scala:93) at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8.apply(Directive.scala:90) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154) at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8$$anonfun$apply$11.apply(Directive.scala:94) at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8$$anonfun$apply$11.apply(Directive.scala:93) at akka.http.scaladsl.util.FastFuture$.akka$http$scaladsl$util$FastFuture$$strictTransform$1(FastFuture.scala:41) at akka.http.scaladsl.util.FastFuture$.transformWith$extension1(FastFuture.scala:45) at akka.http.scaladsl.util.FastFuture$.flatMap$extension(FastFuture.scala:26) at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8.apply(Directive.scala:93) at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8.apply(Directive.scala:90) at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8$$anonfun$apply$11.apply(Directive.scala:94) at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8$$anonfun$apply$11.apply(Directive.scala:93) at akka.http.scaladsl.util.FastFuture$.akka$http$scaladsl$util$FastFuture$$strictTransform$1(FastFuture.scala:41) at akka.http.scaladsl.util.FastFuture$.transformWith$extension1(FastFuture.scala:45) at akka.http.scaladsl.util.FastFuture$.flatMap$extension(FastFuture.scala:26) at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8.apply(Directive.scala:93) at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8.apply(Directive.scala:90) at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:44) at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:42) at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:44) at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:42) at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:44) at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:42) at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:44) at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:42) at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:44) at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:42) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$mapRouteResultWith$1$$anonfun$apply$4.apply(BasicDirectives.scala:67) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$mapRouteResultWith$1$$anonfun$apply$4.apply(BasicDirectives.scala:67) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154) at akka.http.scaladsl.server.directives.ExecutionDirectives$$anonfun$handleExceptions$1$$anonfun$apply$1.apply(ExecutionDirectives.scala:32) at akka.http.scaladsl.server.directives.ExecutionDirectives$$anonfun$handleExceptions$1$$anonfun$apply$1.apply(ExecutionDirectives.scala:28) at akka.http.scaladsl.server.Route$$anonfun$asyncHandler$1.apply(Route.scala:79) at akka.http.scaladsl.server.Route$$anonfun$asyncHandler$1.apply(Route.scala:78) at akka.stream.impl.fusing.MapAsync$$anon$24.onPush(Ops.scala:1169) at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:747) at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:710) at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:616) at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:471) at akka.stream.impl.fusing.GraphInterpreterShell.receive(ActorGraphInterpreter.scala:423) at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:603) at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:618) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:529) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Steps to reproduce:
This code could be placed in a library, so that collectors with extra features can be created, like this is the case for Snowplow Micro for example.
See https://www.eff.org/dnt-policy#faq-What-does-the-dnt-policy.txt-promise-mean?
Basically, if the collector is respecting DNT, then we share an endpoint which clarifies that, available at:
https://$collector/.well-known/dnt-policy.txt
This is what Metamarkets do... Idea is that somebody would add the Snowplow collector as a bidless seat, i.e. a "bidder" to their RTB, and then Snowplow would see all of the request stream (it wouldn't obviously bid however).
Quite a neat way of collecting RTB data.
/cc @yalisassoon
It would be really useful if the scala collector had an enpoint that would return the network user id held in the third party cookie. This would allow me to enrich api requests so that my server can make snowplow calls with the corresponding network user id to allow for easy tracking, I accept some may have third party cookies disabled however in the main this would be really useful and avoid me trying to join using a mixture of ip/footprint/datetime.
This would work as follows:
Access-Control-Allow-Origin
with the domainCompanion ticket to snowplow/snowplow#2082.
Might be easier as not constrained by Elastic Beanstalk / Tomcat
In case of IP address hotspotting
Above bug exists only in Clojure Collector. In SSC (through Snowplow Mini) we were able to get Iglu Adapter and custom-set nuid
work, which at first made me believe that there's no nuid
in SSC payload at all, but it seems it's logic is just different.
Instead of just always adding there content of cookie (as in Clojure Collector) - it first checks if nuid
is present in tracker's payload and if so - adds its content as tracker payload, otherwise - uses cookie's value.
So, I'm not sure which colloector's behavior is more correct, but they're clearly inconsistent.
At the moment it is very hard to see how many events hit the collector as opposed to how many get loaded into Storage. If we could publish a custom metric which counted not only the GET requests but also got a count from POST requests per 5 minute interval we could then match these counts together to better understand how that traffic is being processed downstream.
cc/ @alexanderdean
No reason why we can't use the imposter.json to verify collector behavior.
Blocked by akka/akka-meta#27
There are a few issues here.
val bounce = config.cookieBounce.enabled && nuidOpt.isEmpty && !bouncing && pixelExpected && !redirect
application/x-www-form-urlencoded
.So let's assume that i've modified the collector to return a 307, i've modified the JS tracker to send data as urlencoded form, the next thing that needs to be done is to allow that content type to be pushed to the sink.
So far it looks like a lot of things need to be changed, and still there are no guarantees that it would work for esoteric browsers such as IE 8.
Do you guys have any thoughts on this problem and the right way to solve it?
During startup of the Kafka Stream Collector it shows this:
(snowplow_scala_stream_collector_kafka_0.16.0)
[main] WARN org.apache.kafka.common.utils.AppInfoParser - Error registering AppInfo mbean
821]: javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=snowplow
821]: at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
821]: at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
821]: at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
821]: at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
821]: at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
821]: at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
821]: at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62)
821]: at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:423)
821]: at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:297)
821]: at com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KafkaSink.createProducer(KafkaSink.scala:60)
821]: at com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KafkaSink.<init>(KafkaSink.scala:35)
821]: at com.snowplowanalytics.snowplow.collectors.scalastream.KafkaCollector$.main(KafkaCollector.scala:33)
821]: at com.snowplowanalytics.snowplow.collectors.scalastream.KafkaCollector.main(KafkaCollector.scala)
However, the entries are still pushed towards Kafka successfully. As far as my knowledge goes, I think this has to do with metrics stuff, but not sure.
When i call the collector endpoint to register an ad_click event and then redirect the user to the actual content, i would like to use placeholders within that redirect url, that are being filled with actual data from the event, like referer, network_id, or any other information which is not part of the query paramter. Like this:
http://{{collector_url}}/r/tp2?u=http%3A%2F%2Fexample.com%26nuid={{NUID}}%26duid={{DUID}}%ref={{REF}}
And this would result in a redirect to
http://example.com?nuid=07a79f8a-b24d-44f2-8292-8043653374ea&duid=0e1eeb13-ba55-4eb7-8344-00fb949f964b&ref=google.com
With the current architecture, if we have a malfunctioning Kinesis stream, then we will:
In other words - we don't implement back-pressure, and we take a significant risk that we will end up with at-most-once processing rather than at-least-once processing.
A better approach would be to set a failure threshold (e.g. Kinesis out of action for X minutes, or N events in the in-memory buffer), and if that is reached, then we start returning 503s (or similar) and the trackers keep the events on their side...
Only a tracker knows if it has a chance to re-send an event on failure.
After #32, we could consider some kind of hinting so the tracker can tell the collector to really, really try to send the event - i.e. the tracker is handing off the event to the collector, and the collector has to make best efforts to send it.
[13:41] Ed Lewis: Are all devices good at caching data before sending to a tracker or if a tracker fails?
[13:41] Alex Dean: not all of them Ed
sometimes that's a tracker limitation
i.e. we just haven't implemented that caching yet
but I can imagine situations where it would be impossible for a client device to cache
e.g. very locked down envs, or IoT
[13:42] Ed Lewis: oh ok - I'm just wondering if the tracker could ever absolutely guarantee that data will get processed
so the backpressure stops there as it were
[13:43] Alex Dean: yeah it's quite nuanced
for example, if a pixel is loaded in an email
[13:43] Ed Lewis: though I agree in the vast majority of situations it makes sense for the trackers to hold onto information if the collector stops returning OK's
[13:43] Alex Dean: there is no place to back that up
it's only the tracker that knows if it has a capability to back up
the collector has no clue
[13:47] Ed Lewis: yeah I see what you mean
[13:47] Joshua Beemster: Should we add something to let the collector know we can't backup the event if it is removed
Would require significant new functionality in the tracking SDKs
Not sure this is necessary.
For third party load tests or monitoring services usually the domain ownership has to be proven. This could be done either by DNS TXT records or by uploading a file publicly available url. Can the scala collector be configured to serve a local file? Like
http://collector.mydomain/myfile.txt
In fact the collector is a spray server, so i guess it is possible.
So that we can match the functionality in snowplow/snowplow#652
Is there a hack where the cs-object
recorded in the CloudFront access log would show us com.snowplowanalytics
etc?
Lower priority because a lot of webhook providers are POST only.
Right now its possible only for root path
This would be particularly helpful when people are trying to do batch loads via a collector, or have a poorly configured collector.
Return a response something like:
{ "schema": "iglu:com.snowplowanalytics.sdl/stream_app/jsonschema/1-0-0",
"data": {
"name": "xxx",
"description": "xxx",
"cliArgs": "xxx",
"version": "1.2.2",
"tags": [ ],
"in": [ { "protocol": "http" }, { "protocol": "https" } ],
"outSuccess": { "protocol": "kinesis", "name": "raw", "schema": "iglu:com.snowplowanalytics.snowplow/CollectorPayload/thrift/1-0-0" },
"outFailure": {}
}
}
This PR introduces gzip content-encoding for the android tracker:
snowplow/snowplow-android-tracker#295
The collector needs to support that encoding before we introduce the feature.
Like in snowplow/snowplow#2412.
It should be possible to write good events to a stream and bad events to stderr.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.