Code Monkey home page Code Monkey logo

crossing-the-streams's Introduction

Sample Stream Pipeline with Pulsar Functions

A sample app[1] that leverages the built-in Pulsar Functions support in Spring for Apache Pulsar to create a streaming pipeline to handle user signups.

The app consists of a Rabbit source, a SCSt function, and a HTTP sink with the following details:

  • Messages sent to RabbitMQ user_signup_queue queue are sourced into Pulsar user-signup-topic topic

  • Messages from user-signup-topic topic are fed into SCSt signup function which:

    • ENTERPRISE tier signups result in a customer message in customer-signup-topic Pulsar topic

  • Messages from customer-signup-topic topic are sinked into Slack webhook via the Pulsar HTTP sink connector.

Pre-requisites

  • Ability to run Docker containers locally and Docker Compose installed - see here for more details

Steps

Note
All commands are expected to be executed from the directory this document lives in /guides/pulsar-functions

Build app and function

Build the application and function libs with the following command:

../../gradlew clean build

Download connectors

The connector libs are not included in the Docker image, download them one time by executing the following script:

./download-connectors.sh

Start services

Start the Pulsar, RabbitMQ, and Cassandra services using Docker Compose with the following command:

docker-compose up -d
Verify rabbitmq is ready by executing the following command
docker logs rabbitmq | grep "Server startup complete"
which should produce
2023-01-04 21:06:39.692935+00:00 [info] <0.721.0> Server startup complete; 4 plugins started.
Verify pulsar is ready by executing the following command
docker logs pulsar 2>&1 | grep "messaging service is ready"
which should produce
2023-01-04T21:39:57 [main] INFO  org.apache.pulsar.broker.PulsarService - messaging service is ready, bootstrap_seconds=4
2023-01-04T21:39:57 [main] INFO  org.apache.pulsar.broker.PulsarService - messaging service is ready, bootstrap service port = 8080, broker url= pulsar://localhost:6650, cluster=standalone, configs=org.apache.pulsar.broker....
Verify pulsar has the connectors installed by executing the following command
curl -s http://localhost:8080/admin/v2/functions/connectors
which should produce
[
{
"name": "http",
"description": "Writes data to an HTTP server (Webhook)",
"sinkClass": "org.apache.pulsar.io.http.HttpSink",
"sinkConfigClass": "org.apache.pulsar.io.http.HttpSinkConfig"
},
{
"name": "rabbitmq",
"description": "RabbitMQ source and sink connector",
"sourceClass": "org.apache.pulsar.io.rabbitmq.RabbitMQSource",
"sinkClass": "org.apache.pulsar.io.rabbitmq.RabbitMQSink",
"sourceConfigClass": "org.apache.pulsar.io.rabbitmq.RabbitMQSourceConfig",
"sinkConfigClass": "org.apache.pulsar.io.rabbitmq.RabbitMQSinkConfig"
}
]

At this point the following services are up and running:

  • rabbitmq

    • management UI

  • pulsar standalone

    • function support enabled

    • http and rabbit connectors installed

Start application

The sample app registers the Pulsar Functions which effectively create the streaming pipeline.

Run the sample app within your IDE or by executing the following command
cd sample-signup-app && ../../../gradlew bootRun
which should produce
[main] PulsarFunctionAdministration : Creating 'UserSignupFunction' function (using local archive: /Users/cbono/repos/spring-pulsar/spring-pulsar-sample-apps/sample-pulsar-functions/signup-function/target/signup-function-0.0.1-SNAPSHOT.jar)
[main] PulsarFunctionAdministration : Creating 'CustomerSignupCassandraSink' sink (using local archive: builtin://cassandra)
[main] PulsarFunctionAdministration : Creating 'UserSignupRabbitSource' source (using local archive: builtin://rabbitmq)
[main] SignupApplication            : Started SignupApplication in 6.485 seconds (process running for 6.839)
Verify the functions are actually registered by executing the following Pulsar commands
docker exec -it pulsar bin/pulsar-admin sources list
docker exec -it pulsar bin/pulsar-admin functions list
docker exec -it pulsar bin/pulsar-admin sinks list
which should produce
[
"UserSignupRabbitSource"
]
UserSignupFunction
[
"CustomerSignupHttpSink"
]

Verify pipeline

The app produces a random user signup record to the RabbitMQ user_signup_queue queue every 5 seconds. It also logs all messages on the user-signup-topic and customer-signup-topic Pulsar topics as well as the last 5 emails sent to the customer_signup Cassandra table.

To verify the pipeline is working simply watch the console log as the app runs. The output should look like similar to the following:

TO RABBIT user_signup_queue => Signup[signupTier=ENTERPRISE, firstName=Samuel, lastName=Weiss, [email protected], signupTimestamp=1673236049021]
FROM PULSAR user-signup => Signup[signupTier=ENTERPRISE, firstName=Samuel, lastName=Weiss, [email protected], signupTimestamp=1673236049021]
FROM PULSAR customer-signup => Customer[firstName=Samuel, lastName=Weiss, [email protected], signupTimestamp=1673236049021]
FROM CASSANDRA => latest (5/18) emails: [email protected], [email protected], [email protected], [email protected], [email protected]...

TO RABBIT user_signup_queue => Signup[signupTier=BASIC, firstName=Arianna, lastName=Edwards, [email protected], signupTimestamp=1673236054031]
FROM PULSAR user-signup => Signup[signupTier=BASIC, firstName=Arianna, lastName=Edwards, [email protected], signupTimestamp=1673236054031]

TO RABBIT user_signup_queue => Signup[signupTier=STANDARD, firstName=Kylie, lastName=Raymond, [email protected], signupTimestamp=1673236059038]
FROM PULSAR user-signup => Signup[signupTier=STANDARD, firstName=Kylie, lastName=Raymond, [email protected], signupTimestamp=1673236059038]

TO RABBIT user_signup_queue => Signup[signupTier=ENTERPRISE, firstName=Nolan, lastName=Floyd, [email protected], signupTimestamp=1673236064045]
FROM PULSAR user-signup => Signup[signupTier=ENTERPRISE, firstName=Nolan, lastName=Floyd, [email protected], signupTimestamp=1673236064045]
FROM PULSAR customer-signup => Customer[firstName=Nolan, lastName=Floyd, [email protected], signupTimestamp=1673236064045]
FROM CASSANDRA => latest (5/19) emails: [email protected], [email protected], [email protected], [email protected], [email protected]...
202
View Pulsar function logs by executing the following command
docker logs pulsar
which should contain signup logs such as
Processing Signup(signupTier=ENTERPRISE, firstName=Gavin, lastName=Wilson, [email protected], signupTimestamp=1673196872351)
ENTERPRISE signup count: 1
Converting to Signup(signupTier=ENTERPRISE, firstName=Gavin, lastName=Wilson, [email protected], signupTimestamp=1673196872351)
Processing Signup(signupTier=FREE, firstName=Nevaeh, lastName=Sexton, [email protected], signupTimestamp=1673196877357)
FREE signup count: 1
Processing Signup(signupTier=ENTERPRISE, firstName=Charlotte, lastName=Beach, [email protected], signupTimestamp=1673196882364)
ENTERPRISE signup count: 2
Converting to Signup(signupTier=ENTERPRISE, firstName=Charlotte, lastName=Beach, [email protected], signupTimestamp=1673196882364)

Select from Cassandra

Each ENTERPRISE signup should result in a record in the Cassandra table. To inspect all customer signup records you can query the Cassandra table.

Invoke the CQLSH utility on the cassandra container w/ the following command
docker exec -it cassandra cqlsh cassandra
From the cqlsh> prompt execute the following
use sample_pulsar_functions_keyspace;
select * from customer_signup;
exit;
which should produce output similar to
customer_email                       | customer_details
--------------------------------------+-----------------------------------------------------------------------------------------------------------------------------
[email protected] |                   {"firstName":"Molly","lastName":"Mckay","email":"[email protected]","signupTimestamp":1673196862339}
[email protected] |                   {"firstName":"Gavin","lastName":"Wilson","email":"[email protected]","signupTimestamp":1673196872351}
[email protected] |                   {"firstName":"Ryan","lastName":"Ramsey","email":"[email protected]","signupTimestamp":1673196892373}

Stop app and services

Stop the sample app by entering CTRL-C in terminal it is running in.

Stop all running services using Docker Compose with the following command:

docker-compose down -v

Useful commands

Details about source

docker exec -ti pulsar bin/pulsar-admin sources get --name UserSignupRabbitSource

Details about sink

docker exec -ti pulsar bin/pulsar-admin sinks get --name CustomerSignupCassandraSink

Details about function

docker exec -ti pulsar bin/pulsar-admin functions get --name UserSignupFunction

Consume messages from output topic of Signup function

docker exec -ti pulsar bin/pulsar-client consume customer_signup -s "co-sub1" -p "Earliest" -n 100

crossing-the-streams's People

Contributors

onobc avatar

Watchers

 avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.