Kafka Source
This is a WIP exploring how to create a riff event-source connected to an external Kafka broker
Integration Testing
Install a separate Kafka broker the riff helm chart for kafka:
$cd ~/go/src/github.com/projectriff/riff
$helm install helm-charts/kafka --name external
Get the kafka host IP:
docker ps -f name=k8s_kafka_external-kafka* -q | xargs docker inspect | grep ADVERTISED
Install riff:
$ helm install projectriff/riff --name projectriff --devel --namespace riff-system --set kafka.create=true --set rbac
.create=false --set httpGateway.service.type=NodePort
Update spring.kafka.bootstrap-servers
in application.properties
to use the kafka IP.
$ ./mvnw clean package
Run a docker build and deploy resources. This creates the riff topics and a deployment that is similar to what the function controller would build, including the sidecar container, etc.
$ cd <this project root>
$ riff build
$ kubectl apply -f kafka-source-topics.yaml
$ kubectl apply -f deploy.yaml
Using minio to produce messages
Install the minio client
$ create local mount directories, e.g., `/mnt/data` and `/mnt/config`
$ docker run --rm -p 9000:9000 --name minio1 -v /mnt/data:/data -v /mnt/config:/root/.minio minio/minio:latest server /data
The initial run will create credentials a config.json
in /mnt/config
. It will also display a message like:
mc config host add myminio http://172.17.0.2:9000 6225PS5P2A3OVKPA6H0J W9TdOPYjeAEmdxLlK88BExK0u8FqxxDUi1YleWaw
Use localhost:9000
instead if you installed mc
locally. Edit /mnt/data/config.json
to enable Kafka
notifications. Edit generated "kafka" entry, using the kafka host IP.
"kafka": {
"1": {
"enable": true,
"brokers": [
"<kafka-ip>:9092"
],
"topic": "bucketevents"
}
},
Restart the minio docker container. You should see this message:
SQS ARNs: arn:minio:sqs::1:kafka
Create a bucket and create and register an event for that bucket.
$ mc mb myminio/foobar
Bucket created successfully `myminio/foobar`.
$mc events add myminio/foobar arn:minio:sqs::1:kafka
Successfully added arn:minio:sqs::1:kafka
$ mc events list myminio/foobar
arn:minio:sqs::1:kafka s3:ObjectCreated:*,s3:ObjectRemoved:*,s3:ObjectAccessed:* Filter:
The subscription is for any object event in the bucket. Generate messages to the bucketevents
topic:
$ mc cp pom.xml myminio/foobar
Current issues
$ kubectl logs -l function=kafka-source -c sidecar
2018/03/31 19:09:10 Sidecar for function 'kafka-source' (kafka-source->events) using grpc dispatcher starting
2018/03/31 19:09:11 Rebalanced: &{Type:rebalance start Claimed:map[] Released:map[] Current:map[]}
2018/03/31 19:09:11 Rebalanced: &{Type:rebalance OK Claimed:map[kafka-source:[0]] Released:map[] Current:map[kafka-source:[0]]}
2018/03/31 19:09:15 <<< Message{{"prefetch":256,"autoCancel":true,"subscribers":[],"wip":0,"sourceMode":0,"done":false}, map[timestamp:[1522523355889]]}
2018/03/31 19:09:15 Stream to function has closed
meanwhile…
$ kubectl logs -l function=kafka-source -c main indicates the function is receiving messages
The message {"prefetch":256,"autoCancel":true,"subscribers":[],"wip":0,"sourceMode":0,"done":false}
is emitted to
the events
topic whenever the pod is restarted, e.g., $ kubectl delete pod -l function=kafka-source
If I create new messages before the pod is running, e.g.,
$ kubectl delete deploy -l function=kafka-source
$ mc cp pom.xml myminio/foobar
$ mc cp pom.xml myminio/foobar
$ kubectl apply -f deploy.yaml
I get:
Caused by: java.lang.IllegalStateException: Cannot convert from class org.apache.kafka.clients.consumer.ConsumerRecord
at io.projectriff.invoker.JavaFunctionInvokerServer.toBytes(JavaFunctionInvokerServer.java:170) ~[app/:na]
at io.projectriff.invoker.JavaFunctionInvokerServer.payloadToBytes(JavaFunctionInvokerServer.java:155) ~[app/:na]
at io.projectriff.invoker.JavaFunctionInvokerServer.lambda$call$12(JavaFunctionInvokerServer.java:117) ~[app/:na]
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:130) ~[reactor-core-3.2.0.M1.jar:3.2.0.M1]