Code Monkey home page Code Monkey logo

kafka-scala-examples's Introduction

kafka-scala-examples

Build Status Scala Steward badge

Examples in Scala of

Local environment

# start locally
# - zookeeper
# - kafka
# - kafka-rest
# - kafka-ui
# - schema-registry
# - schema-registry-ui
# - ksql-server
# - ksql-cli
# - kafka-connect
# - kafka-connect-ui
docker-compose up

# (mac|linux) view kafka ui
[open|xdg-open] http://localhost:8000

# (mac|linux) view schema-registry ui
[open|xdg-open] http://localhost:8001

# (mac|linux) view kafka-connect ui
[open|xdg-open] http://localhost:8002

# cleanup
docker-compose down -v

If containers are crashing, make sure you have enough resources

# verify memory and cpu usage
docker ps -q | xargs docker stats --no-stream

# verify status
docker inspect <CONTAINER_NAME> | jq '.[].State'

avro

Description

Avro serialization and deserialization examples of

Demo

# console
sbt avro/console

# generate avro classes
# avro/target/scala-2.12/classes/com/kafka/demo/User.class
sbt clean avro/compile

# test
sbt clean avro/test

Resources

kafka

Description

Kafka apis example of

Demo

# access kafka
docker exec -it local-kafka bash

# create topic
# convention <MESSAGE_TYPE>.<DATASET_NAME>.<DATA_NAME>
# example [example.no-schema.original|example.no-schema.cakesolutions]
kafka-topics --zookeeper zookeeper:2181 \
  --create --if-not-exists --replication-factor 1 --partitions 1 --topic <TOPIC_NAME>

# delete topic
kafka-topics --zookeeper zookeeper:2181 \
  --delete --topic <TOPIC_NAME>

# view topic
kafka-topics --zookeeper zookeeper:2181 --list 
kafka-topics --zookeeper zookeeper:2181 --describe --topic <TOPIC_NAME>

# view topic offset
kafka-run-class kafka.tools.GetOffsetShell \
  --broker-list kafka:9092 \
  --time -1 \
  --topic <TOPIC_NAME>

# list consumer groups
kafka-consumer-groups --bootstrap-server kafka:9092 --list

# view consumer group offset
kafka-consumer-groups \
  --bootstrap-server kafka:9092 \
  --group <GROUP_NAME> \
  --describe

# reset consumer group offset
kafka-consumer-groups \
  --bootstrap-server kafka:9092 \
  --group <GROUP_NAME> \
  --topic <TOPIC_NAME> \
  --reset-offsets \
  --to-earliest \
  --execute

# console producer
kafka-console-producer --broker-list kafka:9092 --topic <TOPIC_NAME>
kafkacat -P -b 0 -t <TOPIC_NAME>

# console consumer
kafka-console-consumer --bootstrap-server kafka:9092 --topic <TOPIC_NAME> --from-beginning
kafkacat -C -b 0 -t <TOPIC_NAME>

# producer example
sbt "kafka/runMain com.kafka.demo.original.Producer"
sbt "kafka/runMain com.kafka.demo.cakesolutions.Producer"

# consumer example
sbt "kafka/runMain com.kafka.demo.original.Consumer"
sbt "kafka/runMain com.kafka.demo.cakesolutions.Consumer"

# test
sbt clean kafka/test
sbt "test:testOnly *KafkaSpec"

Resources

schema-registry

Description

# register schema
# convention <TOPIC_NAME>-key or <TOPIC_NAME>-value
http -v POST :8081/subjects/example.with-schema.simple-value/versions \
  Accept:application/vnd.schemaregistry.v1+json \
  schema='{"type":"string"}'

# import schema from file
http -v POST :8081/subjects/example.with-schema.user-value/versions \
  Accept:application/vnd.schemaregistry.v1+json \
  schema=@avro/src/main/avro/user.avsc

# export schema to file
http :8081/subjects/example.with-schema.user-value/versions/latest \
  | jq -r '.schema|fromjson' \
  | tee avro/src/main/avro/user-latest.avsc

# list subjects
http -v :8081/subjects

# list subject's versions
http -v :8081/subjects/example.with-schema.simple-value/versions

# fetch by version
http -v :8081/subjects/example.with-schema.simple-value/versions/1

# fetch by id
http -v :8081/schemas/ids/1

# test compatibility
http -v POST :8081/compatibility/subjects/example.with-schema.simple-value/versions/latest \
  Accept:application/vnd.schemaregistry.v1+json \
  schema='{"type":"string"}'

# delete version
http -v DELETE :8081/subjects/example.with-schema.simple-value/versions/1

# delete latest version
http -v DELETE :8081/subjects/example.with-schema.simple-value/versions/latest

# delete subject
http -v DELETE :8081/subjects/example.with-schema.simple-value

# stringify
jq tostring avro/src/main/avro/user.avsc

Demo

# generate SpecificRecord classes under "schema-registry/target/scala-2.12/src_managed/main/compiled_avro"
sbt clean schema-registry/avroScalaGenerateSpecific

# (optional) create schema
http -v POST :8081/subjects/example.with-schema.payment-key/versions \
  Accept:application/vnd.schemaregistry.v1+json \
  schema='{"type":"string"}'
http -v POST :8081/subjects/example.with-schema.payment-value/versions \
  Accept:application/vnd.schemaregistry.v1+json \
  schema=@schema-registry/src/main/avro/Payment.avsc

# access kafka
docker exec -it local-kafka bash

# (optional) create topic
kafka-topics --zookeeper zookeeper:2181 \
  --create --if-not-exists --replication-factor 1 --partitions 1 --topic example.with-schema.payment

# console producer (binary)
kafka-console-producer --broker-list kafka:9092 --topic example.with-schema.payment

# console consumer (binary)
kafka-console-consumer --bootstrap-server kafka:9092 --topic example.with-schema.payment

# access schema-registry
docker exec -it local-schema-registry bash

# avro console producer
# example "MyKey",{"id":"MyId","amount":10}
kafka-avro-console-producer --broker-list kafka:29092 \
  --topic example.with-schema.payment \
  --property schema.registry.url=http://schema-registry:8081 \
  --property parse.key=true \
  --property key.separator=, \
  --property key.schema='{"type":"string"}' \
  --property value.schema='{"namespace":"io.confluent.examples.clients.basicavro","type":"record","name":"Payment","fields":[{"name":"id","type":"string"},{"name":"amount","type":"double"}]}'

# avro console consumer
kafka-avro-console-consumer --bootstrap-server kafka:29092 \
  --topic example.with-schema.payment \
  --property schema.registry.url=http://schema-registry:8081 \
  --property schema.id.separator=: \
  --property print.key=true \
  --property print.schema.ids=true \
  --property key.separator=, \
  --from-beginning

# producer example
sbt "schema-registry/runMain com.kafka.demo.specific.Producer"

# consumer example
sbt "schema-registry/runMain com.kafka.demo.specific.Consumer"

# tests
sbt "schema-registry/test:testOnly *KafkaSchemaRegistrySpecificSpec"
# producer example
sbt "schema-registry/runMain com.kafka.demo.generic.Producer"

# consumer example
sbt "schema-registry/runMain com.kafka.demo.generic.Consumer"

# tests
sbt "schema-registry/test:testOnly *KafkaSchemaRegistryGenericSpec"

Resources

Alternatives

TODO

  • generic + schema evolution
  • ovotech
  • multi-schema
  • formulation

kafka-streams

Description

Kafka Streams apis examples

Demo-1

# access kafka
docker exec -it local-kafka bash

# create topic
# example [example.to-upper-case-app.input|example.to-upper-case-app.output]
kafka-topics --zookeeper zookeeper:2181 \
  --create --if-not-exists --replication-factor 1 --partitions 1 --topic <TOPIC_NAME>

# ToUpperCaseApp example (input topic required)
sbt "streams/runMain com.kafka.demo.streams.ToUpperCaseApp"

# produce
kafka-console-producer --broker-list kafka:9092 \
  --topic example.to-upper-case-app.input

# consume
kafka-console-consumer --bootstrap-server kafka:9092 \
  --topic example.to-upper-case-app.output

# test
sbt clean streams/test

Demo-2

Tested with embedded-kafka and embedded-kafka-schema-registry

# access kafka
docker exec -it local-kafka bash

# create topic
# example [json.streams-json-to-avro-app.input|avro.streams-json-to-avro-app.output]
kafka-topics --zookeeper zookeeper:2181 \
  --create --if-not-exists --replication-factor 1 --partitions 1 --topic <TOPIC_NAME>

# produce (default StringSerializer)
kafka-console-producer \
  --broker-list kafka:9092 \
  --property "parse.key=true" \
  --property "key.separator=:" \
  --property "key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer" \
  --property "value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer" \
  --topic <TOPIC_NAME>

# consume (default StringDeserializer)
kafka-console-consumer \
  --bootstrap-server kafka:9092 \
  --from-beginning \
  --property "print.key=true" \
  --property "key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer" \
  --property "value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer" \
  --topic <TOPIC_NAME>

# access schema-registry
docker exec -it local-schema-registry bash

# consume avro
kafka-avro-console-consumer --bootstrap-server kafka:29092 \
  --property schema.registry.url=http://schema-registry:8081 \
  --property schema.id.separator=: \
  --property print.key=true \
  --property print.schema.ids=true \
  --property key.separator=, \
  --from-beginning \
  --topic <TOPIC_NAME>

# JsonToAvroApp example (input topic required)
sbt "streams-json-avro/runMain com.kafka.demo.JsonToAvroApp"

# test
sbt clean streams-json-avro/test

Example

# json
mykey:{"valueInt":42,"valueString":"foo"}

# log
[json.streams-json-to-avro-app.input]: mykey, JsonModel(42,foo)
[avro.streams-json-to-avro-app.output]: KeyAvroModel(mykey), ValueAvroModel(42,FOO)

Demo-3

TODO

  • CatsKafkaStreamsApp [source]
# run app
sbt -jvm-debug 5005 "cats-kafka-streams/runMain com.kafka.demo.CatsKafkaStreamsApp"

Demo-4

TODO

# run app
sbt -jvm-debug 5005 "zio-kafka-streams/runMain com.kafka.demo.ZioKafkaStreamsApp"

Resources

ksql

Description

Setup Kafka

# access kafka
docker exec -it local-kafka bash

# create topic
kafka-topics --zookeeper zookeeper:2181 \
  --create --if-not-exists --replication-factor 1 --partitions 1 --topic USER_PROFILE

# produce sample data
kafka-console-producer --broker-list kafka:9092 --topic USER_PROFILE << EOF
{"userid": 1000, "firstname": "Alison", "lastname": "Smith", "countrycode": "GB", "rating": 4.7}
EOF

# consume
kafka-console-consumer --bootstrap-server kafka:9092 --topic USER_PROFILE --from-beginning

Access KSQL CLI

  • using the server

    # access ksql-server
    docker exec -it local-ksql-server bash
    
    # start ksql cli
    ksql http://ksql-server:8088
  • using a local instance

    # connect to local cli
    docker exec -it local-ksql-cli ksql http://ksql-server:8088
  • using a temporary instance

    # connect to remote server
    docker run --rm \
      --network=kafka-scala-examples_local_kafka_network \
      -it confluentinc/cp-ksql-cli http://ksql-server:8088

Execute SQL statements

# create stream
CREATE STREAM user_profile (\
  userid INT, \
  firstname VARCHAR, \
  lastname VARCHAR, \
  countrycode VARCHAR, \
  rating DOUBLE \
  ) WITH (KAFKA_TOPIC = 'USER_PROFILE', VALUE_FORMAT = 'JSON');

# verify stream
list streams;
describe user_profile;

# query stream
SELECT userid, firstname, lastname, countrycode, rating FROM user_profile EMIT CHANGES;

Expect the consumer and the query to show the generated data

# generate data
docker run --rm \
  -v $(pwd)/local/ksql:/datagen \
  --network=kafka-scala-examples_local_kafka_network \
  -it confluentinc/ksql-examples ksql-datagen \
  bootstrap-server=kafka:29092 \
  schemaRegistryUrl=http://schema-registry:8081 \
  schema=datagen/user_profile.avro \
  format=json \
  topic=USER_PROFILE \
  key=userid \
  maxInterval=5000 \
  iterations=100

kafka-connect

Setup PostgreSQL locally

# create shared network
docker-compose up

# start postgres
docker-compose -f docker-compose.postgres.yml up

# (mac|linux) view postgres ui
# [schema=public|database=postgres|username=postgres|password=postgres]
[open|xdg-open] http://localhost:8080

Setup connectors

# list connector
http -v :8083/connectors

# init data to generate schema
cp local/connect/data/resources-0.txt.orig local/connect/data/resources-0.txt

# setup spooldir source connector
http -v --json POST :8083/connectors < local/connect/config/source-spooldir-connector.json

# ingest data
echo "{\"accountId\":\"123\",\"resourceType\":\"XXX\",\"value\":\"X1\"}" > local/connect/data/resources-1.txt

# setup jdbc sink connector
# topic = SCHEMA.DATABASE = "public.postgres"
http -v --json POST :8083/connectors < local/connect/config/sink-jdbc-connector.json

# verify data
docker exec -it local-postgres bash -c "psql -U postgres postgres"
select * from public.postgres;

# cleanup
docker-compose -f docker-compose.postgres.yml down -v

extra

Resources

Tools

Companies

kafka-scala-examples's People

Contributors

niqdev avatar scala-steward avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

kafka-scala-examples's Issues

Bintray is down

Hi there,
tried to compile the project but all the cakesolutions dependencies are no long available.
Can see some issue here: cakesolutions/scala-kafka-client#146

Apparently the Bintray service is down.

Any chance to fix the build somehow ?
Thanks

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.