Code Monkey home page Code Monkey logo

dlt-meta's People

Contributors

howardwu-db avatar msdotnetclr avatar nfx avatar ravi-databricks avatar ravi-db avatar rtdtwo avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

dlt-meta's Issues

java.lang.RuntimeException: non-nullable field authBytes was serialized as null

With Event Hubs source configuration like the following:

  "source_format": "eventhub",
  "source_details": {
     "source_schema_path": "{dbfs_path}/integration-tests/resources/eventhub_iot_schema.ddl",
     "eventhub.accessKeyName": "{eventhub_accesskey_name}",
     "eventhub.name": "{eventhub_name}",
     "eventhub.secretsScopeName": "{eventhub_secrets_scope_name}",
     "kafka.sasl.mechanism": "PLAIN",
     "kafka.security.protocol": "SASL_SSL",
     "eventhub.namespace": "{eventhub_nmspace}",
     "eventhub.port": "{eventhub_port}"
  },

The pipeline may fail with the following error:

Connection to node -1 ({eventhub_nmspace}.servicebus.windows.net/xx.xx.xx.xx:9093) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue.
...
Unexpected error from {eventhub_nmspace}.servicebus.windows.net/xx.xx.xx.xx (channelId=-1); closing connection
java.lang.RuntimeException: non-nullable field authBytes was serialized as null

Handling historical silver transformations with Full Refresh

I've built a process to generate our silver_transformations.json file. I have a question/future issue for what a DLT "Full refresh" could do if you have silver_transformations that might change for a table over time. If a "Full refresh" is done, then I believe it would only grab what is current in the dataflowspecTable for silver, which could miss any historical transformations put in place at their respective times. Am I missing something architecturally with DLT or DLT-meta that can handle this use case? I appreciate any help/conversation surrounding this topic.

Delta source_format key error

When using the delta option for the bronze source_format the read_dlt_delta method will fail with a key error:

.{bronze_dataflow_spec.sourceDetails["table"]}
KeyError: 'table'

Switching the key to source_table worked for me locally.

Unity dataflowspec tables

Hello,

if the dataflowspecs tables + bronze/silver tables are stored in schema managed by UC, what is the reason to use/set paths for these tables?
I'm unable to create dataflowspec tables if I want them to be stored in schema managed by UC. If I store it in hive_metastore, it runs fine.

The main issue is that users don't know the paths for the tables managed by UC, they should only reference the tables names in specific schema. Or am I missing something?

"silver_dataflowspec_table": "silver_dataflowspec_table",
"silver_dataflowspec_path": "dbfs:/onboarding_tables_cdc/silver",
"bronze_dataflowspec_table": "bronze_dataflowspec_table",
"bronze_dataflowspec_path": "dbfs:/onboarding_tables_cdc/bronze",

Thanks.

support for adding file metadata to bronze dataframe using autoloader reader (_metadata option support)

Need to provide ability so that file metadata can be added to dataframe
e.g

import dlt
@dlt.table
def bronze():
  return (spark.readStream.format("cloudFiles")
    # define the schema for the ~6 common columns across files.  All other input fields will be "rescued" into a JSON string column that can be queried via dot notation.
    .schema("Common1 string, Common2 string, _file_path string") # _file_path is a hidden auto-field but shows up in rescueData column JSON with this method.  Spoofing that I have the same column in my input file so i can drop this spoofed column later
    .option("cloudFiles.format", "csv")
    .option("cloudFiles.schemaEvolutionMode", "rescue")
    .option("cloudFiles.rescuedDataColumn","extraFields") # override default _rescuedData column name with whatever you want to call this column 
    .option("header","true")
    .load("/Volumes/vol/data/*.txt")
    .select("*","_metadata") # add file metadata information to output
    .drop("_file_path") # discard dummy input column to keep _file_path out of extraFields rescue data column
  )

Getting ValueError while trying to launch DLT-META using interactive notebook

Hello,

Here's a code:
onboarding_params_map = {
"database": "dlt_demo",
"onboarding_file_path": "onboarding_files/onboarding.json",
"bronze_dataflowspec_table": "bronze_dataflowspec_table",
"bronze_dataflowspec_path": "dbfs:/FileStore/bronze",
"silver_dataflowspec_table": "silver_dataflowspec_table",
"silver_dataflowspec_path": "dbfs:/FileStore/silver".format("silver"),
"overwrite": "True",
"onboard_layer": "bronze_silver",
"env": "prod",
"version": "v1",
"import_author":"MANTAS"
}

OnboardDataflowspec(spark, onboarding_params_map).onboard_dataflow_specs()

I'm getting "ValueError: missing attributes : set()" while all parameters are specified

expect_or_quarantine

Hi, not an error but I was looking at this and noticed you defined

"expect_or_quarantine":{
"valid_operation":"operation IN ('APPEND', 'DELETE', 'UPDATE')"
}

in one of the dqe. But when I tried, it throws

AttributeError: module 'dlt' has no attribute 'expect_or_quarantine',Map(),Map(),List(),List(),Map())

Is there something I missed?

Merge option for onboarding json into metadata table

Feature request, not bug. It would be good to be able to merge the onboarding Json into the metadata tables by key (data_flow_id). This way we don't have to overwrite the table and we could maintain central metadata tables for all source entities that we're ingesting. This could be done manually of course but would be nice to have it baked in into the framework.

Multiple select expression support

Hello,

I ingest JSON data into bronze layer and then try to apply some transformations on it to promote it to the silver layer.

here's the problem: when I try to explode the ingested nested json and then select all columns I get the following error:


from pyspark.sql.types import StructType, StructField, StringType, ArrayType

 

data = [("id1", [("a", "111"), ("b", "222")]),

        ("id2", [("c", "333"), ("d", "444")])]

 

schema = StructType([

    StructField("id", StringType()),

    StructField("payload", ArrayType(

        StructType([

            StructField("c1", StringType()),

            StructField("c2", StringType())

        ])

    ))

])

 

df = spark.createDataFrame(data, schema)

df2 = df.selectExpr(

"explode(payload) as temp",

"temp.*"

)

 

display(df2.schema)

 

AnalysisException: Cannot resolve 'temp.*' given input columns ''.; line 1 pos 0

 

However if I select all columns in a separate selectExpr, all is good:


df2 = df.selectExpr(

"explode(payload) as temp").selectExpr(

"temp.*"

)

 

display(df2.schema)

 

df2:pyspark.sql.dataframe.DataFrame

c1:string

c2:string

StructType([StructField('c1', StringType(), True), StructField('c2', StringType(), True)])

 

Now suppose I want to drop the unwanted columns from the result:


df2 = df.selectExpr(

"explode(payload) as temp").selectExpr(

"temp.*",

"except(c1)"

)

 

display(df2.schema)

Which gives the error:


AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `c1` cannot be resolved. Did you mean one of the following? [`temp`].; line 1 pos 0

 

However if add another selectExpr on top of a previous one, it works!


df2 = df.selectExpr(

"explode(payload) as temp").selectExpr(

"temp.*").selectExpr(

"* except(c1)"

)

 

display(df2.schema)

 

StructType([StructField('c2', StringType(), True)])

 

As far as I understood from the DLT Meta source code:


    def get_silver_schema(self):

        """Get Silver table Schema."""

        silver_dataflow_spec: SilverDataflowSpec = self.dataflowSpec

        # source_database = silver_dataflow_spec.sourceDetails["database"]

        # source_table = silver_dataflow_spec.sourceDetails["table"]

        select_exp = silver_dataflow_spec.selectExp

        where_clause = silver_dataflow_spec.whereClause

        raw_delta_table_stream = self.spark.read.load(

            path=silver_dataflow_spec.sourceDetails["path"],

            format="delta"

            # #f"{source_database}.{source_table}"

        ).selectExpr(*select_exp)

 

the selectExpr is applied once on an array of select expressions.

Can we apply it separately on each of the select expression so as to avoid the above errors and make it more flexible in transforming?

Thank you

Ability to add columns to bronze tables similar to silver table query

It's not an issue but a feature request. This would be useful if we want to add the name of the source file name or put in processing time. Example:
val df = spark.readStream.format("cloudFiles")
.schema(schema)
.option("cloudFiles.format", "csv")
.option("cloudFiles.region","ap-south-1")
.load("path")
.withColumn("filePath",input_file_name())

Not able to onboard and deploy in interactive python terminal

Hello,

I'm trying to onboard and deploy DLT-META with all the default parameters in terminal, but getting this error:

Traceback (most recent call last):
File "/Users/mantasmicikevicius/dlt-meta/src/cli.py", line 500, in
main(*sys.argv[1:])
File "/Users/mantasmicikevicius/dlt-meta/src/cli.py", line 496, in main
MAPPINGcommand
File "/Users/mantasmicikevicius/dlt-meta/src/cli.py", line 467, in onboard
dltmeta.onboard(cmd)
File "/Users/mantasmicikevicius/dlt-meta/src/cli.py", line 158, in onboard
self._ws.dbfs.upload(cmd.dbfs_path + f"/dltmeta_conf/{onboarding_filename}", ob_file, overwrite=True)
File "/Users/mantasmicikevicius/dlt-meta/.venv/lib/python3.11/site-packages/databricks/sdk/mixins/files.py", line 320, in upload
with self.open(path, write=True, overwrite=overwrite) as dst:
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/mantasmicikevicius/dlt-meta/.venv/lib/python3.11/site-packages/databricks/sdk/mixins/files.py", line 316, in open
return _DbfsIO(self, path, read=read, write=write, overwrite=overwrite)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/mantasmicikevicius/dlt-meta/.venv/lib/python3.11/site-packages/databricks/sdk/mixins/files.py", line 38, in init
elif write: self._created = api.create(path, overwrite=overwrite)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/mantasmicikevicius/dlt-meta/.venv/lib/python3.11/site-packages/databricks/sdk/service/files.py", line 382, in create
res = self._api.do('POST', '/api/2.0/dbfs/create', body=body, headers=headers)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/mantasmicikevicius/dlt-meta/.venv/lib/python3.11/site-packages/databricks/sdk/core.py", line 128, in do
return retryable(self._perform)(method,
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/mantasmicikevicius/dlt-meta/.venv/lib/python3.11/site-packages/databricks/sdk/retries.py", line 54, in wrapper
raise err
File "/Users/mantasmicikevicius/dlt-meta/.venv/lib/python3.11/site-packages/databricks/sdk/retries.py", line 33, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/Users/mantasmicikevicius/dlt-meta/.venv/lib/python3.11/site-packages/databricks/sdk/core.py", line 221, in _perform
raise self._make_nicer_error(response=response, **payload) from None
databricks.sdk.errors.platform.ResourceAlreadyExists: A file or directory already exists at the input path dbfs:/dlt-meta_cli_demo/dltmeta_conf/onboarding.json.
Error: exit status 1

Facing a issue in launching DLT-Meta Pipeline through command line

Parsing argument complete. args=Namespace(cloud_provider_name='aws', dbr_version='12.2.x-scala2.12', dbfs_path='dbfs:/dais-dlt-meta-demo-automated/')
Traceback (most recent call last):
File "C:\Users\nehamjain\dlt-meta\dlt-meta-demo\launch_demo.py", line 345, in
main()
File "C:\Users\nehamjain\dlt-meta\dlt-meta-demo\launch_demo.py", line 244, in main
api_client = get_api_client()
^^^^^^^^^^^^^^^^
File "C:\Users\nehamjain\dlt-meta\dlt-meta-demo\launch_demo.py", line 17, in get_api_client
api_client = _get_api_client(config, command_name="labs_dlt-meta")
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\nehamjain\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.11_qbz5n2kfra8p0\LocalCache\local-packages\Python311\site-packages\databricks_cli\configure\config.py", line 102, in _get_api_client
verify = config.insecure is None
^^^^^^^^^^^^^^^
AttributeError: 'NoneType' object has no attribute 'insecure'

Add non-Delta as Sink

Support non delta as sink using metadata approach.

  • In metadata if sink is non delta use Structure streaming approach with foreachbatch
  • Use DAB to deploy non-DLT pipelines to databricks workspace

Silver Transformations Became a Dependency

TypeError: __init__() missing 1 required positional argument: 'dataQualityExpectations',None,Map(),Map(),List(),List(),Map())

When a Delta Live Table is onboarded thru a previous version of DLT-META without silver data quality expectations, the code will fail when invoking the launch notebook.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.