databrickslabs / dlt-meta Goto Github PK
View Code? Open in Web Editor NEWThis is metadata driven DLT based framework for bronze/silver pipelines
License: Other
This is metadata driven DLT based framework for bronze/silver pipelines
License: Other
With Event Hubs source configuration like the following:
"source_format": "eventhub",
"source_details": {
"source_schema_path": "{dbfs_path}/integration-tests/resources/eventhub_iot_schema.ddl",
"eventhub.accessKeyName": "{eventhub_accesskey_name}",
"eventhub.name": "{eventhub_name}",
"eventhub.secretsScopeName": "{eventhub_secrets_scope_name}",
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
"eventhub.namespace": "{eventhub_nmspace}",
"eventhub.port": "{eventhub_port}"
},
The pipeline may fail with the following error:
Connection to node -1 ({eventhub_nmspace}.servicebus.windows.net/xx.xx.xx.xx:9093) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue.
...
Unexpected error from {eventhub_nmspace}.servicebus.windows.net/xx.xx.xx.xx (channelId=-1); closing connection
java.lang.RuntimeException: non-nullable field authBytes was serialized as null
Create CLI documentation which shows gif or videos on how to use dlt-meta using databricks labs cli.
I've built a process to generate our silver_transformations.json file. I have a question/future issue for what a DLT "Full refresh" could do if you have silver_transformations that might change for a table over time. If a "Full refresh" is done, then I believe it would only grab what is current in the dataflowspecTable for silver, which could miss any historical transformations put in place at their respective times. Am I missing something architecturally with DLT or DLT-meta that can handle this use case? I appreciate any help/conversation surrounding this topic.
When using the delta option for the bronze source_format the read_dlt_delta method will fail with a key error:
.{bronze_dataflow_spec.sourceDetails["table"]}
KeyError: 'table'
Switching the key to source_table
worked for me locally.
Hello,
if the dataflowspecs tables + bronze/silver tables are stored in schema managed by UC, what is the reason to use/set paths for these tables?
I'm unable to create dataflowspec tables if I want them to be stored in schema managed by UC. If I store it in hive_metastore, it runs fine.
The main issue is that users don't know the paths for the tables managed by UC, they should only reference the tables names in specific schema. Or am I missing something?
"silver_dataflowspec_table": "silver_dataflowspec_table",
"silver_dataflowspec_path": "dbfs:/onboarding_tables_cdc/silver",
"bronze_dataflowspec_table": "bronze_dataflowspec_table",
"bronze_dataflowspec_path": "dbfs:/onboarding_tables_cdc/bronze",
Thanks.
Following the default docs with default argument values, It fails with the same error, regardless the dbfs folder:
databricks.sdk.errors.platform.ResourceAlreadyExists: A file or directory already exists at the input path dbfs:/dlt-meta_cli_demo/dltmeta_conf/onboarding.json
.
Could we add a warning to the onboarding process that would throw a warning when a table in the onboarding file is used but the corresponding silver transformation table couldn't be found? It may also make sense for the source_schema_path to be listed, but either the file isn't found or is blank. That might add some QoL when prototyping and testing new configurations.
Need to provide ability so that file metadata can be added to dataframe
e.g
import dlt
@dlt.table
def bronze():
return (spark.readStream.format("cloudFiles")
# define the schema for the ~6 common columns across files. All other input fields will be "rescued" into a JSON string column that can be queried via dot notation.
.schema("Common1 string, Common2 string, _file_path string") # _file_path is a hidden auto-field but shows up in rescueData column JSON with this method. Spoofing that I have the same column in my input file so i can drop this spoofed column later
.option("cloudFiles.format", "csv")
.option("cloudFiles.schemaEvolutionMode", "rescue")
.option("cloudFiles.rescuedDataColumn","extraFields") # override default _rescuedData column name with whatever you want to call this column
.option("header","true")
.load("/Volumes/vol/data/*.txt")
.select("*","_metadata") # add file metadata information to output
.drop("_file_path") # discard dummy input column to keep _file_path out of extraFields rescue data column
)
Hello,
I am getting this error while initializing the dlt pipeline for bronze table. Please suggest what might be the cause. My source is eventhub.
"no tables are defined by the library of this pipeline. This error usually occurs when there are view definitions"
Thanks
Hello,
Here's a code:
onboarding_params_map = {
"database": "dlt_demo",
"onboarding_file_path": "onboarding_files/onboarding.json",
"bronze_dataflowspec_table": "bronze_dataflowspec_table",
"bronze_dataflowspec_path": "dbfs:/FileStore/bronze",
"silver_dataflowspec_table": "silver_dataflowspec_table",
"silver_dataflowspec_path": "dbfs:/FileStore/silver".format("silver"),
"overwrite": "True",
"onboard_layer": "bronze_silver",
"env": "prod",
"version": "v1",
"import_author":"MANTAS"
}
OnboardDataflowspec(spark, onboarding_params_map).onboard_dataflow_specs()
I'm getting "ValueError: missing attributes : set()" while all parameters are specified
Hi, not an error but I was looking at this and noticed you defined
"expect_or_quarantine":{
"valid_operation":"operation IN ('APPEND', 'DELETE', 'UPDATE')"
}
in one of the dqe. But when I tried, it throws
AttributeError: module 'dlt' has no attribute 'expect_or_quarantine',Map(),Map(),List(),List(),Map())
Is there something I missed?
The create_target_table() and create_streaming_live_table() functions are deprecated. Databricks recommends updating existing code to use the create_streaming_table() function
Feature request, not bug. It would be good to be able to merge the onboarding Json into the metadata tables by key (data_flow_id). This way we don't have to overwrite the table and we could maintain central metadata tables for all source entities that we're ingesting. This could be done manually of course but would be nice to have it baked in into the framework.
Hello,
I ingest JSON data into bronze layer and then try to apply some transformations on it to promote it to the silver layer.
here's the problem: when I try to explode the ingested nested json and then select all columns I get the following error:
from pyspark.sql.types import StructType, StructField, StringType, ArrayType
data = [("id1", [("a", "111"), ("b", "222")]),
("id2", [("c", "333"), ("d", "444")])]
schema = StructType([
StructField("id", StringType()),
StructField("payload", ArrayType(
StructType([
StructField("c1", StringType()),
StructField("c2", StringType())
])
))
])
df = spark.createDataFrame(data, schema)
df2 = df.selectExpr(
"explode(payload) as temp",
"temp.*"
)
display(df2.schema)
AnalysisException: Cannot resolve 'temp.*' given input columns ''.; line 1 pos 0
However if I select all columns in a separate selectExpr, all is good:
df2 = df.selectExpr(
"explode(payload) as temp").selectExpr(
"temp.*"
)
display(df2.schema)
df2:pyspark.sql.dataframe.DataFrame
c1:string
c2:string
StructType([StructField('c1', StringType(), True), StructField('c2', StringType(), True)])
Now suppose I want to drop the unwanted columns from the result:
df2 = df.selectExpr(
"explode(payload) as temp").selectExpr(
"temp.*",
"except(c1)"
)
display(df2.schema)
Which gives the error:
AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `c1` cannot be resolved. Did you mean one of the following? [`temp`].; line 1 pos 0
However if add another selectExpr on top of a previous one, it works!
df2 = df.selectExpr(
"explode(payload) as temp").selectExpr(
"temp.*").selectExpr(
"* except(c1)"
)
display(df2.schema)
StructType([StructField('c2', StringType(), True)])
As far as I understood from the DLT Meta source code:
def get_silver_schema(self):
"""Get Silver table Schema."""
silver_dataflow_spec: SilverDataflowSpec = self.dataflowSpec
# source_database = silver_dataflow_spec.sourceDetails["database"]
# source_table = silver_dataflow_spec.sourceDetails["table"]
select_exp = silver_dataflow_spec.selectExp
where_clause = silver_dataflow_spec.whereClause
raw_delta_table_stream = self.spark.read.load(
path=silver_dataflow_spec.sourceDetails["path"],
format="delta"
# #f"{source_database}.{source_table}"
).selectExpr(*select_exp)
the selectExpr is applied once on an array of select expressions.
Can we apply it separately on each of the select expression so as to avoid the above errors and make it more flexible in transforming?
Thank you
Integrate append_flow API for following use cases:
It's not an issue but a feature request. This would be useful if we want to add the name of the source file name or put in processing time. Example:
val df = spark.readStream.format("cloudFiles")
.schema(schema)
.option("cloudFiles.format", "csv")
.option("cloudFiles.region","ap-south-1")
.load("path")
.withColumn("filePath",input_file_name())
Hello,
I'm trying to onboard and deploy DLT-META with all the default parameters in terminal, but getting this error:
Traceback (most recent call last):
File "/Users/mantasmicikevicius/dlt-meta/src/cli.py", line 500, in
main(*sys.argv[1:])
File "/Users/mantasmicikevicius/dlt-meta/src/cli.py", line 496, in main
MAPPINGcommand
File "/Users/mantasmicikevicius/dlt-meta/src/cli.py", line 467, in onboard
dltmeta.onboard(cmd)
File "/Users/mantasmicikevicius/dlt-meta/src/cli.py", line 158, in onboard
self._ws.dbfs.upload(cmd.dbfs_path + f"/dltmeta_conf/{onboarding_filename}", ob_file, overwrite=True)
File "/Users/mantasmicikevicius/dlt-meta/.venv/lib/python3.11/site-packages/databricks/sdk/mixins/files.py", line 320, in upload
with self.open(path, write=True, overwrite=overwrite) as dst:
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/mantasmicikevicius/dlt-meta/.venv/lib/python3.11/site-packages/databricks/sdk/mixins/files.py", line 316, in open
return _DbfsIO(self, path, read=read, write=write, overwrite=overwrite)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/mantasmicikevicius/dlt-meta/.venv/lib/python3.11/site-packages/databricks/sdk/mixins/files.py", line 38, in init
elif write: self._created = api.create(path, overwrite=overwrite)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/mantasmicikevicius/dlt-meta/.venv/lib/python3.11/site-packages/databricks/sdk/service/files.py", line 382, in create
res = self._api.do('POST', '/api/2.0/dbfs/create', body=body, headers=headers)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/mantasmicikevicius/dlt-meta/.venv/lib/python3.11/site-packages/databricks/sdk/core.py", line 128, in do
return retryable(self._perform)(method,
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/mantasmicikevicius/dlt-meta/.venv/lib/python3.11/site-packages/databricks/sdk/retries.py", line 54, in wrapper
raise err
File "/Users/mantasmicikevicius/dlt-meta/.venv/lib/python3.11/site-packages/databricks/sdk/retries.py", line 33, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/Users/mantasmicikevicius/dlt-meta/.venv/lib/python3.11/site-packages/databricks/sdk/core.py", line 221, in _perform
raise self._make_nicer_error(response=response, **payload) from None
databricks.sdk.errors.platform.ResourceAlreadyExists: A file or directory already exists at the input path dbfs:/dlt-meta_cli_demo/dltmeta_conf/onboarding.json.
Error: exit status 1
Parsing argument complete. args=Namespace(cloud_provider_name='aws', dbr_version='12.2.x-scala2.12', dbfs_path='dbfs:/dais-dlt-meta-demo-automated/')
Traceback (most recent call last):
File "C:\Users\nehamjain\dlt-meta\dlt-meta-demo\launch_demo.py", line 345, in
main()
File "C:\Users\nehamjain\dlt-meta\dlt-meta-demo\launch_demo.py", line 244, in main
api_client = get_api_client()
^^^^^^^^^^^^^^^^
File "C:\Users\nehamjain\dlt-meta\dlt-meta-demo\launch_demo.py", line 17, in get_api_client
api_client = _get_api_client(config, command_name="labs_dlt-meta")
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\nehamjain\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.11_qbz5n2kfra8p0\LocalCache\local-packages\Python311\site-packages\databricks_cli\configure\config.py", line 102, in _get_api_client
verify = config.insecure is None
^^^^^^^^^^^^^^^
AttributeError: 'NoneType' object has no attribute 'insecure'
Support non delta as sink using metadata approach.
Currently apply changes assumes version column as Integer types, we need to infer data types from sequence_by
Hi Team, thank you so much for the UC branch merge. We are now able to work with UC without any issues or workarounds. However, when i tried running the pipeline for a different dataflow group, this new group overwrites whatever existed before it, instead of writing alongside the other tables. Could you let me know how to fix this?
Integrate databricks labs blueprint into code base for CLI and unit tests
TypeError: __init__() missing 1 required positional argument: 'dataQualityExpectations',None,Map(),Map(),List(),List(),Map())
When a Delta Live Table is onboarded thru a previous version of DLT-META without silver data quality expectations, the code will fail when invoking the launch notebook.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.