This reference architecture shows an end-to-end stream processing pipeline. This type of pipeline has four stages: ingest, process, store, and analysis and reporting. For this reference architecture, the pipeline ingests data from two sources, performs a join on related records from each stream, enriches the result, and calculates an average in real time. The results are stored for further analysis.
Scenario: A taxi company collects data about each taxi trip. For this scenario, we assume there are two separate devices sending data. The taxi has a meter that sends information about each ride โ the duration, distance, and pickup and dropoff locations. A separate device accepts payments from customers and sends data about fares. To spot ridership trends, the taxi company wants to calculate the average tip per mile driven, in real time, for each neighborhood.
A deployment for this reference architecture is available on GitHub.
-
Clone, fork, or download this GitHub repository.
-
Install Docker to run the data generator.
-
Install Azure CLI.
-
Install Databricks CLI.
-
Install a Java IDE, with the following resources:
- JDK 1.8
- Scala SDK 2.11
- Maven 3.5.4
-
Create a directory named
DataFile
in the root of the cloned Github repository in your local file system. -
Open a web browser and navigate to https://uofi.app.box.com/v/NYCtaxidata/folder/2332219935.
-
Click the Download button on this page to download a zip file of all the taxi data for that year.
-
Extract the zip file to the
DataFile
directory.[!NOTE] This zip file contains other zip files. Don't extract the child zip files.
The directory structure must look like the following:
/DataFile /FOIL2013 trip_data_1.zip trip_data_2.zip trip_data_3.zip ...
-
From a shell or Windows Command Prompt, run the following command and follow the sign-in prompt:
az login
-
Navigate to the folder named
azure
in the GitHub repository:cd azure
-
Run the following commands to deploy the Azure resources:
export resourceGroup='[Resource group name]' export resourceLocation='[Region]' export eventHubNamespace='[Event Hubs namespace name]' export databricksWorkspaceName='[Azure Databricks workspace name]' export cosmosDatabaseAccount='[Cosmos DB database name]' export logAnalyticsWorkspaceName='[Log Analytics workspace name]' export logAnalyticsWorkspaceRegion='[Log Analytics region]' # Create a resource group az group create --name $resourceGroup --location $resourceLocation # Deploy resources az deployment group create --resource-group $resourceGroup \ --template-file deployresources.json --parameters \ eventHubNamespace=$eventHubNamespace \ databricksWorkspaceName=$databricksWorkspaceName \ cosmosDatabaseAccount=$cosmosDatabaseAccount \ logAnalyticsWorkspaceName=$logAnalyticsWorkspaceName \ logAnalyticsWorkspaceRegion=$logAnalyticsWorkspaceRegion
-
The output of the deployment is written to the console once complete. Search the output for the following JSON:
"outputs": {
"cosmosDb": {
"type": "Object",
"value": {
"hostName": <value>,
"secret": <value>,
"username": <value>
}
},
"eventHubs": {
"type": "Object",
"value": {
"taxi-fare-eh": <value>,
"taxi-ride-eh": <value>
}
},
"logAnalytics": {
"type": "Object",
"value": {
"secret": <value>,
"workspaceId": <value>
}
}
},
These values are the secrets that will be added to Databricks secrets
in upcoming sections. Keep them secure until you add them in those sections.
-
In the Azure portal, navigate to the resource group created in the deploy the Azure resources section above. Click on Azure Cosmos DB Account. Create a table with the
Cassandra API
. -
In the overview blade, click add table.
-
When the add table blade opens, enter
newyorktaxi
in the Keyspace name text box. -
In the enter CQL command to create the table section, enter
neighborhoodstats
in the text box besidenewyorktaxi
. -
In the text box below, enter the following:
(neighborhood text, window_end timestamp, number_of_rides bigint,total_fare_amount double, primary key(neighborhood, window_end))
-
In the Throughput (1,000 - 1,000,000 RU/s) text box enter the value
4000
. -
Click OK.
Using the Azure Databricks CLI installed in step 2 of the prerequisites, create the Azure Databricks secret scope:
databricks secrets create-scope --scope "azure-databricks-job"
First, enter the secrets for EventHub:
-
Add the secret for the taxi ride EventHub:
databricks secrets put --scope "azure-databricks-job" --key "taxi-ride"
Once executed, this command opens the vi editor. Enter the taxi-ride-eh value from the eventHubs output section in step 4 of the deploy the Azure resources section. Save and exit vi.
-
Add the secret for the taxi fare EventHub:
databricks secrets put --scope "azure-databricks-job" --key "taxi-fare"
Once executed, this command opens the vi editor. Enter the taxi-fare-eh value from the eventHubs output section in step 4 of the deploy the Azure resources section. Save and exit vi.
Next, enter the secrets for Cosmos DB:
-
Open the Azure portal, and navigate to the resource group specified in step 3 of the deploy the Azure resources section. Click on the Azure Cosmos DB Account.
-
Using the Azure Databricks CLI, add the secret for the Cosmos DB user name:
databricks secrets put --scope azure-databricks-job --key "cassandra-username"
Once executed, this command opens the vi editor. Enter the username value from the CosmosDb output section in step 4 of the deploy the Azure resources section. Save and exit vi.
- Next, add the secret for the Cosmos DB password:
databricks secrets put --scope azure-databricks-job --key "cassandra-password"
Once executed, this command opens the vi editor. Enter the secret value from the CosmosDb output section in step 4 of the deploy the Azure resources section. Save and exit vi.
Note
If using an Azure Key Vault-backed secret scope, the scope must be named azure-databricks-job and the secrets must have the exact same names as those above.
-
Create a directory in the Databricks file system:
dbfs mkdirs dbfs:/azure-databricks-jobs
-
Navigate to the
DataFile
directory and enter the following:dbfs cp ZillowNeighborhoods-NY.zip dbfs:/azure-databricks-jobs
For this section, you require the Log Analytics workspace ID and primary key. The workspace ID is the workspaceId value from the logAnalytics output section in step 4 of the deploy the Azure resources section. The primary key is the secret from the output section.
-
To configure log4j logging, open
\azure\AzureDataBricksJob\src\main\resources\com\microsoft\pnp\azuredatabricksjob\log4j.properties
. Edit the following two values:log4j.appender.A1.workspaceId=<Log Analytics workspace ID> log4j.appender.A1.secret=<Log Analytics primary key>
-
To configure custom logging, open
\azure\azure-databricks-monitoring\scripts\metrics.properties
. Edit the following two values:*.sink.loganalytics.workspaceId=<Log Analytics workspace ID> *.sink.loganalytics.secret=<Log Analytics primary key>
-
Use your Java IDE to import the Maven project file named pom.xml located in the root directory.
-
Perform a clean build. The output of this build is files named azure-databricks-job-1.0-SNAPSHOT.jar and azure-databricks-monitoring-0.9.jar.
-
Copy the azure-databricks-monitoring-0.9.jar file to the Databricks file system by entering the following command in the Databricks CLI:
databricks fs cp --overwrite azure-databricks-monitoring-0.9.jar dbfs:/azure-databricks-job/azure-databricks-monitoring-0.9.jar
-
Copy the custom logging properties from
\azure\azure-databricks-monitoring\scripts\metrics.properties
to the Databricks file system by entering the following command:databricks fs cp --overwrite metrics.properties dbfs:/azure-databricks-job/metrics.properties
-
While you haven't yet decided on a name for your Databricks cluster, select one now. You'll enter the name below in the Databricks file system path for your cluster. Copy the initialization script from
\azure\azure-databricks-monitoring\scripts\spark.metrics
to the Databricks file system by entering the following command:databricks fs cp --overwrite spark-metrics.sh dbfs:/databricks/init/spark-metrics.sh
-
In the Databricks workspace, click "Clusters", then click "create cluster". Enter the cluster name you created in step 3 of the configure custom logging for the Databricks job section above.
-
Select a standard cluster mode.
-
Set Databricks runtime version to 6.4 (includes Apache Spark 2.4.5 Scala 2.11)
-
Set Python version to 3.
-
Set Driver Type to Same as worker
-
Set Worker Type to Standard_DS3_v2.
-
Set Min Workers to 2.
-
Deselect Enable autoscaling.
-
Expand Advanced Options and choose the tab Init Scripts.
-
Enter dbfs:/databricks/init/spark-metrics.sh.
-
Click the Add button.
-
Click the Create Cluster button.
-
In the Databricks workspace, click "Jobs", "create job".
-
Enter a job name.
-
Click "set jar", this opens the "Upload JAR to Run" dialog box.
-
Drag the azure-databricks-job-1.0-SNAPSHOT.jar file you created in the build the .jar for the Databricks job section to the Drop JAR here to upload box.
-
Enter com.microsoft.pnp.TaxiCabReader in the Main Class field.
-
In the arguments field, enter the following:
-n jar:file:/dbfs/azure-databricks-jobs/ZillowNeighborhoods-NY.zip!/ZillowNeighborhoods-NY.shp --taxi-ride-consumer-group taxi-ride-eh-cg --taxi-fare-consumer-group taxi-fare-eh-cg --window-interval "1 minute" --cassandra-host <Cosmos DB Cassandra host name from above>
-
Install the dependent libraries by following these steps:
-
In the Databricks user interface, click on the home button.
-
In the Users drop-down, click on your user account name to open your account workspace settings.
-
Click on the drop-down arrow beside your account name, click on create, and click on Library to open the New Library dialog.
-
Under the Library Source, select Maven.
-
Under the Coordinates heading, enter
com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.5
in the text box. -
Click on Create to open the Artifacts window.
-
Under Status on running clusters check the Attach automatically to all clusters checkbox.
-
Repeat steps 1 - 7 for the
com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.0.0
Maven coordinate. -
Repeat steps 1 - 7 for the
com.datastax.spark:spark-cassandra-connector_2.11:2.3.2
Maven coordinate. -
Repeat steps 1 - 6 for the
org.geotools:gt-shapefile:23.0
Maven coordinate. -
Click on Advanced Options.
-
Enter
https://repo.osgeo.org/repository/release/
in the Repository text box. -
Click Create to open the Artifacts window.
-
Under Status on running clusters check the Attach automatically to all clusters checkbox.
-
-
Add the dependent libraries added in step 7 to the job created at the end of step 6:
-
In the Azure Databricks workspace, click on Jobs.
-
Click on the job name created in step 2 of the create a Databricks job section.
-
Beside the Dependent Libraries section, click on Add to open the Add Dependent Library dialog.
-
Under Library From select Workspace.
-
Click on users, then your username, then click on
azure-eventhubs-spark_2.11:2.3.15
. -
Click OK.
-
Repeat steps 1 - 6 for
spark-cassandra-connector_2.11:2.3.2
,gt-shapefile:23.0
andazure-cosmos-cassandra-spark-helper:1.0.0
.
-
-
Beside Cluster:, click on Edit. This opens the Configure Cluster dialog. In the Cluster Type drop-down, select Existing Cluster. In the Select Cluster drop-down, select the cluster created the create a Databricks cluster section. Click confirm.
-
Click run now.
-
Navigate to the directory named
onprem
in the GitHub repository. -
Update the values in the file main.env as follows:
RIDE_EVENT_HUB=[Connection string for the taxi-ride event hub] FARE_EVENT_HUB=[Connection string for the taxi-fare event hub] RIDE_DATA_FILE_PATH=/DataFile/FOIL2013 MINUTES_TO_LEAD=0 PUSH_RIDE_DATA_FIRST=false
The connection string for the taxi-ride event hub is the taxi-ride-eh value from the eventHubs output section in step 4 of the deploy the Azure resources section. The connection string for the taxi-fare event hub the taxi-fare-eh value from the eventHubs output section in step 4 of the deploy the Azure resources section.
-
Run the following command to build the Docker image.
docker build --no-cache -t dataloader .
-
Navigate back to the parent directory.
cd ..
-
Run the following command to run the Docker image.
docker run -v `pwd`/DataFile:/DataFile --env-file=onprem/main.env dataloader:latest
The output should look like the following:
Created 10000 records for TaxiFare
Created 10000 records for TaxiRide
Created 20000 records for TaxiFare
Created 20000 records for TaxiRide
Created 30000 records for TaxiFare
...
To verify the Databricks job is running correctly, open the Azure portal and navigate to the Cosmos DB database. Open the Data Explorer blade and examine the data in the taxi records table.
[1] Donovan, Brian; Work, Dan (2016): New York City Taxi Trip Data (2010-2013). University of Illinois at Urbana-Champaign. https://doi.org/10.13012/J8PN93H8