Code Monkey home page Code Monkey logo

kafka-source's Introduction

Kafka Source

This is a WIP exploring how to create a riff event-source connected to an external Kafka broker

Integration Testing

Install a separate Kafka broker the riff helm chart for kafka:

$cd ~/go/src/github.com/projectriff/riff
$helm install helm-charts/kafka --name external

Get the kafka host IP:

docker ps -f name=k8s_kafka_external-kafka* -q | xargs docker inspect | grep ADVERTISED

Install riff:

$ helm install projectriff/riff --name projectriff --devel --namespace riff-system --set kafka.create=true --set rbac
.create=false --set httpGateway.service.type=NodePort

Update spring.kafka.bootstrap-servers in application.properties to use the kafka IP.

$ ./mvnw clean package

Run a docker build and deploy resources. This creates the riff topics and a deployment that is similar to what the function controller would build, including the sidecar container, etc.

$ cd <this project root>
$ riff build
$ kubectl apply -f kafka-source-topics.yaml
$ kubectl apply -f deploy.yaml

Using minio to produce messages

Install the minio client

$ create local mount directories, e.g., `/mnt/data` and `/mnt/config`
$ docker run --rm -p 9000:9000 --name minio1 -v /mnt/data:/data -v /mnt/config:/root/.minio  minio/minio:latest server /data

The initial run will create credentials a config.json in /mnt/config. It will also display a message like:

mc config host add myminio http://172.17.0.2:9000 6225PS5P2A3OVKPA6H0J W9TdOPYjeAEmdxLlK88BExK0u8FqxxDUi1YleWaw

Use localhost:9000 instead if you installed mc locally. Edit /mnt/data/config.json to enable Kafka notifications. Edit generated "kafka" entry, using the kafka host IP.

"kafka": {
			"1": {
				"enable": true,
				"brokers": [
					"<kafka-ip>:9092"
				],
				"topic": "bucketevents"
			}
		},

Restart the minio docker container. You should see this message:

SQS ARNs:  arn:minio:sqs::1:kafka

Create a bucket and create and register an event for that bucket.

$ mc mb myminio/foobar
Bucket created successfully `myminio/foobar`.

$mc events add myminio/foobar arn:minio:sqs::1:kafka
Successfully added arn:minio:sqs::1:kafka

$ mc events list myminio/foobar
arn:minio:sqs::1:kafka   s3:ObjectCreated:*,s3:ObjectRemoved:*,s3:ObjectAccessed:*   Filter:

The subscription is for any object event in the bucket. Generate messages to the bucketevents topic:

$ mc cp pom.xml myminio/foobar

Current issues

$ kubectl logs -l function=kafka-source -c sidecar
2018/03/31 19:09:10 Sidecar for function 'kafka-source' (kafka-source->events) using grpc dispatcher starting
2018/03/31 19:09:11 Rebalanced: &{Type:rebalance start Claimed:map[] Released:map[] Current:map[]}
2018/03/31 19:09:11 Rebalanced: &{Type:rebalance OK Claimed:map[kafka-source:[0]] Released:map[] Current:map[kafka-source:[0]]}
2018/03/31 19:09:15 <<< Message{{"prefetch":256,"autoCancel":true,"subscribers":[],"wip":0,"sourceMode":0,"done":false}, map[timestamp:[1522523355889]]}
2018/03/31 19:09:15 Stream to function has closed

meanwhile…​

$ kubectl logs -l function=kafka-source -c main indicates the function is receiving messages

The message {"prefetch":256,"autoCancel":true,"subscribers":[],"wip":0,"sourceMode":0,"done":false} is emitted to the events topic whenever the pod is restarted, e.g., $ kubectl delete pod -l function=kafka-source

If I create new messages before the pod is running, e.g.,

$ kubectl delete deploy -l function=kafka-source
$ mc cp pom.xml myminio/foobar
$ mc cp pom.xml myminio/foobar
$ kubectl apply -f deploy.yaml

I get:

Caused by: java.lang.IllegalStateException: Cannot convert from class org.apache.kafka.clients.consumer.ConsumerRecord
	at io.projectriff.invoker.JavaFunctionInvokerServer.toBytes(JavaFunctionInvokerServer.java:170) ~[app/:na]
	at io.projectriff.invoker.JavaFunctionInvokerServer.payloadToBytes(JavaFunctionInvokerServer.java:155) ~[app/:na]
	at io.projectriff.invoker.JavaFunctionInvokerServer.lambda$call$12(JavaFunctionInvokerServer.java:117) ~[app/:na]
	at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:130) ~[reactor-core-3.2.0.M1.jar:3.2.0.M1]

kafka-source's People

Watchers

 avatar  avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.