- Concepts
- Prerequisites
- Objectives
- Costs
- Before you begin
- Setting up your environment
- Creating resources
- Deploy the Entry Tags recording pipeline
- Manual Test
- Limitations
- Cleaning up
- License
Historical metadata of your data-warehouse is a treasure trove to discover not just insights about changing data patterns, but also quality and user behavior. The challenge is Data Catalog keeps a single version of metadata for fast searchability.
This solution is intended for technical people with responsibility for metadata management, data governance and related analytics.
This tutorial suggests a solution to create a historical record of metadata Data Catalog tags by creating change records in real-time through capture and parsing of the Audit Logs from Cloud Logging and processing them in real-time by using Pub/Sub and Dataflow to append into a BigQuery table for historical analysis.
Warning |
Apply restrictive access controls to the Tag history BigQuery table.
Data Catalog results are ACLed based on requestors' permissions |
- Cloud Data Catalog provides a single plane for searching and managing both Technical and Business metadata of your data-warehouse/data-lake in Google Cloud and beyond. Data Catalog uses Tags to organize metadata and makes it discoverable.
- BigQuery is Google Cloud’s Serverless, highly scalable, and cost-effective multi-cloud data warehouse designed for business agility, that you can use to run petabyte sized queries. BigQuery also provides APIs for reading Table schema.
- Dataflow is Google Cloud’s Serverless data processing service by both stream and batch.
- Pub/Sub is Google Cloud’s flexible, reliable, real-time messaging service for independent applications to publish and subscribe to asynchronous events.
This tutorial assumes some familiarity with Shell scripts and basic knowledge of Google Cloud Platform.
- Setup Log sink to export Data Catalog Audit logs to PubSub
- Deploy a streaming Dataflow pipeline to parse the logs
- Enrich the logs with Entry Tag information from Data Catalog
- Store the metadata tags attached to the modified Entry in a BigQuery table for historical reference
This tutorial uses billable components of Google Cloud, including
- Data Catalog
- Cloud Dataflow
- Cloud PubSub
- Cloud Logging
- Cloud Storage
- BigQuery
- Streaming API
- Storage
Use the Pricing Calculator to generate a cost estimate based on your projected usage.
For this tutorial, you need a Google Cloud project. You can create a new one, or select a project you already created.
-
Select or create a Google Cloud project from the project selector page
-
Make sure that billing is enabled for your Google Cloud project
- Enable billing for your project
-
Enable APIs for Data Catalog, BigQuery, Pub/Sub, Dataflow and Cloud Storage services
You can use shell to enable required Google Cloud services.
gcloud services enable \ bigquery.googleapis.com \ storage_component \ datacatalog.googleapis.com \ dataflow.googleapis.com \ pubsub.googleapis.com
-
In the Google Cloud Platform Console, go to Cloud Shell.
At the bottom of the GCP Console, a Cloud Shell session opens and displays a command-line prompt. Cloud Shell is a shell environment with the Cloud SDK already installed, including the gcloud command-line tool, and with values already set for your current project. It can take a few seconds for the session to initialize.
-
In Cloud Shell, clone the source repository:
git clone https://github.com/GoogleCloudPlatform/datacatalog-tag-history.git cd datacatalog-tag-history/
Use your favourite editor to modify the env.sh file to set following variables.
# The GCP project to use for this tutorial export PROJECT_ID="your-project-id" # The BigQuery region to use for Tags table export BIGQUERY_REGION="" # The name of the BigQuery Dataset to create the Tag records table export DATASET_ID="" # The name of the BigQuery table for Tag records export TABLE_ID="EntityTagOperationRecords" # The Compute region to use for running Dataflow jobs and create a temporary storage bucket export REGION_ID="" # define the bucket id export TEMP_GCS_BUCKET="" # define the name of the Pub/Sub log sink in Cloud Logging export LOGS_SINK_NAME="datacatalog-audit-pubsub" #define Pub/Sub topic for receiving AuditLog events export LOGS_SINK_TOPIC_ID="catalog-audit-log-sink" # define the subscription id export LOGS_SUBSCRIPTION_ID="catalog-tags-dumper" # name of the service account to use (not the email address) export TAG_HISTORY_SERVICE_ACCOUNT="tag-history-collector" export TAG_HISTORY_SERVICE_ACCOUNT_EMAIL="${TAG_HISTORY_SERVICE_ACCOUNT}@$(echo $PROJECT_ID | awk -F':' '{print $2"."$1}' | sed 's/^\.//').iam.gserviceaccount.com"
-
Set the variables in environment
source env.sh
-
Setup BigQuery dataset to store Entry's tags when a change event occurs.
Create a new BigQuery dataset to store in the region of your choice
bq --location ${BIGQUERY_REGION} \ --project_id=${PROJECT_ID} \ mk --dataset ${DATASET_ID}
-
Create a Bigquery table for storing Tags using the provided schema, this creates a BigQuery table with
lowerCamelCase
column names.bq mk --table \ --project_id=${PROJECT_ID} \ --description "Catalog Tag snapshots" \ --time_partitioning_field "reconcileTime" \ "${DATASET_ID}.${TABLE_ID}" camelEntityTagOperationRecords.schema
Use the following command if you want
snake_case
column namesbq mk --table \ --project_id=${PROJECT_ID} \ --description "Catalog Tag snapshots" \ --time_partitioning_field "reconcile_time" \ "${DATASET_ID}.${TABLE_ID}" snakeEntityTagOperationRecords.schema
Pub/Sub is Google Cloud's global messaging bus for decoupling processing modules.
-
Create Pub/Sub topic to receive Audit log events
gcloud pubsub topics create ${LOGS_SINK_TOPIC_ID} \ --project ${PROJECT_ID}
-
Create a new Pub/Sub subscription Using a subscription (instead of direct topic) with a Dataflow pipeline, ensures that all messages are processed even when the pipeline may be temporarily down for updates or maintenance.
gcloud pubsub subscriptions create ${LOGS_SUBSCRIPTION_ID} \ --topic=${LOGS_SINK_TOPIC_ID} \ --topic-project=${PROJECT_ID}
Cloud Logging is GCP's powerful log management control plane.
Create a Log sink to send DataCatalog audit events to the Pub/Sub topic, Cloud Logging will push new Data Catalog AuditLogs to the Pub/Sub topic for processing in real-time. \
gcloud logging sinks create ${LOGS_SINK_NAME} \
pubsub.googleapis.com/projects/${PROJECT_ID}/topics/${LOGS_SINK_TOPIC_ID} \
--log-filter="protoPayload.serviceName=\"datacatalog.googleapis.com\" \
AND protoPayload.\"@type\"=\"type.googleapis.com/google.cloud.audit.AuditLog\""
Give Pub/Sub "Publisher" permission to the logging service account to enable pushing log entries into the configured Pub/Sub topic.
# Identify the Logs writer service account
export LOGGING_WRITER_IDENTITY="$(gcloud logging sinks describe ${LOGS_SINK_NAME} --format="get(writerIdentity)" --project ${PROJECT_ID})"
# Grant Publish permission to the Logging writer
gcloud pubsub topics add-iam-policy-binding ${LOGS_SINK_TOPIC_ID} \
--member=${LOGGING_WRITER_IDENTITY} \
--role='roles/pubsub.publisher' \
--project ${PROJECT_ID}
It is recommended to run pipelines with fine-grained access control to improve access partitioning. If your project does not have a user-created service account create one using following instructions.
You can use your browser by navigating to IAM & Admin > Service accounts on the Google Cloud console
-
Create a service account to use as the user-managed controller service account for Dataflow.
gcloud iam service-accounts create ${TAG_HISTORY_SERVICE_ACCOUNT} \ --description="Service Account to run the DataCatalog tag history collection and recording pipeline." \ --display-name="Data Catalog History collection account"
-
Create a custom role with required permissions for accessing BigQuery, Pub/Sub, Dataflow and Data Catalog
export TAG_HISTORY_COLLECTOR_ROLE="tag_history_collector" gcloud iam roles create ${TAG_HISTORY_COLLECTOR_ROLE} --project=${PROJECT_ID} --file=tag_history_collector.yaml
-
Apply the custom role to the service account
gcloud projects add-iam-policy-binding ${PROJECT_ID} \ --member="serviceAccount:${TAG_HISTORY_SERVICE_ACCOUNT_EMAIL}" \ --role=projects/${PROJECT_ID}/roles/${TAG_HISTORY_COLLECTOR_ROLE}
-
Assign
dataflow.worker
role to allow the service account to run as dataflow worker.gcloud projects add-iam-policy-binding ${PROJECT_ID} \ --member="serviceAccount:${TAG_HISTORY_SERVICE_ACCOUNT_EMAIL}" \ --role=roles/dataflow.worker
-
Create a Cloud storage bucket as a temporary and staging bucket for Dataflow.
gsutil mb -l ${REGION_ID} \ -p ${PROJECT_ID} \ gs://${TEMP_GCS_BUCKET}
-
Launch Dataflow pipeline using Maven command
mvn clean generate-sources compile package exec:java \ -Dexec.mainClass=com.google.cloud.solutions.catalogtagrecording.PipelineLauncher \ -Dexec.cleanupDaemonThreads=false \ -Dmaven.test.skip=true \ -Dexec.args=" \ --streaming=true \ --project=${PROJECT_ID} \ --serviceAccount=${TAG_HISTORY_SERVICE_ACCOUNT_EMAIL} \ --runner=DataflowRunner \ --gcpTempLocation=gs://${TEMP_GCS_BUCKET}/temp/ \ --stagingLocation=gs://${TEMP_GCS_BUCKET}/staging/ \ --workerMachineType=n1-standard-1 \ --region=${REGION_ID} \ --tagsBigqueryTable=${PROJECT_ID}:${DATASET_ID}.${TABLE_ID} \ --catalogAuditLogsSubscription=projects/${PROJECT_ID}/subscriptions/${LOGS_SUBSCRIPTION_ID}"
Add
--snakeCaseColumnNames
flag when usingsnake_case
column name schema in Setup BigQuery step.
Follow the guide to Attach Tag to a Data Catalog Entry to verify that the tool captures all the Tags attached to modified Entry.
- This implementation handles only the operations listed below.
CreateTag
UpdateTag
DeleteTag
- Single Data Catalog operation creates multiple tag record entries due to multiple AuditLog events
- The tool polls the Data Catalog service for Entry/Tag information, as the audit logs don't contain change-information. This can result in some changes to an Entry/Tags getting missed.
To avoid incurring charges to your Google Cloud account for the resources used in this tutorial:
The easiest way to eliminate billing is to delete the project you created for the tutorial.
Caution: Deleting a project has the following effects: |
|
-
In the Cloud Console, go to the Manage resources page.
-
In the project list, select the project that you want to delete and then click Delete .
-
In the dialog, type the project ID and then click Shut down to delete the project.
Apache 2.0
This is not an official Google product.