while running the lakehouse churn job, the job got failed with notebook exception -
com.databricks.WorkflowException: com.databricks.NotebookExecutionException: FAILED
From DLT pipeline the error code is pasted below for reference.
Update 0507a1 is FAILED.
java.lang.RuntimeException: Failed to execute python command for notebook '/Users/[email protected]/databricks_demo/lakehouse-retail-c360/01-Data-ingestion/01.2-DLT-churn-Python-UDF' with id RunnableCommandId(4793992086333503899) and error AnsiResult(---------------------------------------------------------------------------
RestException Traceback (most recent call last)
File :6
2 import mlflow
3 # Stage/version
4 # Model name |
5 # | |
----> 6 predict_churn_udf = mlflow.pyfunc.spark_udf(spark, "models:/dbdemos_customer_churn/Production")
7 spark.udf.register("predict_churn", predict_churn_udf)
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-1810d376-b464-4920-b311-abfa3f12b7f8/lib/python3.9/site-packages/mlflow/pyfunc/init.py:996, in spark_udf(spark, model_uri, result_type, env_manager)
989 if not any(isinstance(elem_type, x) for x in supported_types):
990 raise MlflowException(
991 message="Invalid result_type '{}'. Result type can only be one of or an array of one "
992 "of the following types: {}".format(str(elem_type), str(supported_types)),
993 error_code=INVALID_PARAMETER_VALUE,
994 )
--> 996 local_model_path = _download_artifact_from_uri(
997 artifact_uri=model_uri,
998 output_path=_create_model_downloading_tmp_dir(should_use_nfs),
999 )
1001 if env_manager == _EnvManager.LOCAL:
1002 # Assume spark executor python environment is the same with spark driver side.
1003 _warn_dependency_requirement_mismatches(local_model_path)
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-1810d376-b464-4920-b311-abfa3f12b7f8/lib/python3.9/site-packages/mlflow/tracking/artifact_utils.py:100, in _download_artifact_from_uri(artifact_uri, output_path)
94 """
95 :param artifact_uri: The absolute URI of the artifact to download.
96 :param output_path: The local filesystem path to which to download the artifact. If unspecified,
97 a local output path will be created.
98 """
99 root_uri, artifact_path = _get_root_uri_and_artifact_path(artifact_uri)
--> 100 return get_artifact_repository(artifact_uri=root_uri).download_artifacts(
101 artifact_path=artifact_path, dst_path=output_path
102 )
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-1810d376-b464-4920-b311-abfa3f12b7f8/lib/python3.9/site-packages/mlflow/store/artifact/artifact_repository_registry.py:114, in get_artifact_repository(artifact_uri)
104 def get_artifact_repository(artifact_uri):
105 """Get an artifact repository from the registry based on the scheme of artifact_uri
106
107 :param artifact_uri: The artifact store URI. This URI is used to select which artifact
(...)
112 requirements.
113 """
--> 114 return _artifact_repository_registry.get_artifact_repository(artifact_uri)
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-1810d376-b464-4920-b311-abfa3f12b7f8/lib/python3.9/site-packages/mlflow/store/artifact/artifact_repository_registry.py:72, in ArtifactRepositoryRegistry.get_artifact_repository(self, artifact_uri)
65 if repository is None:
66 raise MlflowException(
67 "Could not find a registered artifact repository for: {}. "
68 "Currently registered schemes are: {}".format(
69 artifact_uri, list(self._registry.keys())
70 )
71 )
---> 72 return repository(artifact_uri)
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-1810d376-b464-4920-b311-abfa3f12b7f8/lib/python3.9/site-packages/mlflow/store/artifact/models_artifact_repo.py:42, in ModelsArtifactRepository.init(self, artifact_uri)
37 self.repo = UnityCatalogModelsArtifactRepository(
38 artifact_uri=artifact_uri, registry_uri=registry_uri
39 )
40 elif is_using_databricks_registry(artifact_uri):
41 # Use the DatabricksModelsArtifactRepository if a databricks profile is being used.
---> 42 self.repo = DatabricksModelsArtifactRepository(artifact_uri)
43 else:
44 uri = ModelsArtifactRepository.get_underlying_uri(artifact_uri)
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-1810d376-b464-4920-b311-abfa3f12b7f8/lib/python3.9/site-packages/mlflow/store/artifact/databricks_models_artifact_repo.py:63, in DatabricksModelsArtifactRepository.init(self, artifact_uri)
59 self.databricks_profile_uri = (
60 get_databricks_profile_uri_from_artifact_uri(artifact_uri) or mlflow.get_registry_uri()
61 )
62 client = MlflowClient(registry_uri=self.databricks_profile_uri)
---> 63 self.model_name, self.model_version = get_model_name_and_version(client, artifact_uri)
64 # Use an isolated thread pool executor for chunk uploads/downloads to avoid a deadlock
65 # caused by waiting for a chunk-upload/download task within a file-upload/download task.
66 # See https://superfastpython.com/threadpoolexecutor-deadlock/#Deadlock_1_Submit_and_Wait_for_a_Task_Within_a_Task
67 # for more details
68 self.chunk_thread_pool = self._create_thread_pool()
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-1810d376-b464-4920-b311-abfa3f12b7f8/lib/python3.9/site-packages/mlflow/store/artifact/utils/models.py:94, in get_model_name_and_version(client, models_uri)
92 if model_alias is not None:
93 return model_name, client.get_model_version_by_alias(model_name, model_alias).version
---> 94 return model_name, str(_get_latest_model_version(client, model_name, model_stage))
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-1810d376-b464-4920-b311-abfa3f12b7f8/lib/python3.9/site-packages/mlflow/store/artifact/utils/models.py:32, in _get_latest_model_version(client, name, stage)
27 def _get_latest_model_version(client, name, stage):
28 """
29 Returns the latest version of the stage if stage is not None. Otherwise return the latest of all
30 versions.
31 """
---> 32 latest = client.get_latest_versions(name, None if stage is None else [stage])
33 if len(latest) == 0:
34 stage_str = "" if stage is None else f" and stage '{stage}'"
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-1810d376-b464-4920-b311-abfa3f12b7f8/lib/python3.9/site-packages/mlflow/tracking/client.py:2425, in MlflowClient.get_latest_versions(self, name, stages)
2353 def get_latest_versions(self, name: str, stages: List[str] = None) -> List[ModelVersion]:
2354 """
2355 Latest version models for each requests stage. If no stages
provided, returns the
2356 latest version for each stage.
(...)
2423 current_stage: None
2424 """
-> 2425 return self._get_registry_client().get_latest_versions(name, stages)
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-1810d376-b464-4920-b311-abfa3f12b7f8/lib/python3.9/site-packages/mlflow/tracking/_model_registry/client.py:140, in ModelRegistryClient.get_latest_versions(self, name, stages)
130 def get_latest_versions(self, name, stages=None):
131 """
132 Latest version models for each requests stage. If no stages
provided, returns the
133 latest version for each stage.
(...)
138 :return: List of :py:class:mlflow.entities.model_registry.ModelVersion
objects.
139 """
--> 140 return self.store.get_latest_versions(name, stages)
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-1810d376-b464-4920-b311-abfa3f12b7f8/lib/python3.9/site-packages/mlflow/store/model_registry/rest_store.py:169, in RestStore.get_latest_versions(self, name, stages)
159 """
160 Latest version models for each requested stage. If no stages
argument is provided,
161 returns the latest version for each stage.
(...)
166 :return: List of :py:class:mlflow.entities.model_registry.ModelVersion
objects.
167 """
168 req_body = message_to_json(GetLatestVersions(name=name, stages=stages))
--> 169 response_proto = self._call_endpoint(GetLatestVersions, req_body, call_all_endpoints=True)
170 return [
171 ModelVersion.from_proto(model_version)
172 for model_version in response_proto.model_versions
173 ]
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-1810d376-b464-4920-b311-abfa3f12b7f8/lib/python3.9/site-packages/mlflow/store/model_registry/base_rest_store.py:39, in BaseRestStore._call_endpoint(self, api, json_body, call_all_endpoints, extra_headers)
37 if call_all_endpoints:
38 endpoints = self._get_all_endpoints_from_method(api)
---> 39 return call_endpoints(
40 self.get_host_creds(), endpoints, json_body, response_proto, extra_headers
41 )
42 else:
43 endpoint, method = self._get_endpoint_from_method(api)
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-1810d376-b464-4920-b311-abfa3f12b7f8/lib/python3.9/site-packages/mlflow/utils/rest_utils.py:217, in call_endpoints(host_creds, endpoints, json_body, response_proto, extra_headers)
215 except RestException as e:
216 if e.error_code != ErrorCode.Name(ENDPOINT_NOT_FOUND) or i == len(endpoints) - 1:
--> 217 raise e
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-1810d376-b464-4920-b311-abfa3f12b7f8/lib/python3.9/site-packages/mlflow/utils/rest_utils.py:212, in call_endpoints(host_creds, endpoints, json_body, response_proto, extra_headers)
210 for i, (endpoint, method) in enumerate(endpoints):
211 try:
--> 212 return call_endpoint(
213 host_creds, endpoint, method, json_body, response_proto, extra_headers
214 )
215 except RestException as e:
216 if e.error_code != ErrorCode.Name(ENDPOINT_NOT_FOUND) or i == len(endpoints) - 1:
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-1810d376-b464-4920-b311-abfa3f12b7f8/lib/python3.9/site-packages/mlflow/utils/rest_utils.py:201, in call_endpoint(host_creds, endpoint, method, json_body, response_proto, extra_headers)
199 call_kwargs["json"] = json_body
200 response = http_request(**call_kwargs)
--> 201 response = verify_rest_response(response, endpoint)
202 js_dict = json.loads(response.text)
203 parse_dict(js_dict=js_dict, message=response_proto)
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-1810d376-b464-4920-b311-abfa3f12b7f8/lib/python3.9/site-packages/mlflow/utils/rest_utils.py:133, in verify_rest_response(response, endpoint)
131 if response.status_code != 200:
132 if _can_parse_as_json_object(response.text):
--> 133 raise RestException(json.loads(response.text))
134 else:
135 base_msg = "API request to endpoint {} failed with error code {} != 200".format(
136 endpoint,
137 response.status_code,
138 )
RestException: RESOURCE_DOES_NOT_EXIST: RegisteredModel 'dbdemos_customer_churn' does not exist. It might have been deleted.,None,Map(),Map(),List(),List(),Map())
at com.databricks.pipelines.execution.core.languages.PythonRepl.runCmd(PythonRepl.scala:335)
at com.databricks.pipelines.execution.service.PipelineRunnable$.$anonfun$loadPythonGraph$8(PipelineGraphLoader.scala:597)
at com.databricks.pipelines.execution.service.PipelineRunnable$.$anonfun$loadPythonGraph$8$adapted(PipelineGraphLoader.scala:595)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at com.databricks.pipelines.execution.service.PipelineRunnable$.$anonfun$loadPythonGraph$7(PipelineGraphLoader.scala:595)
at com.databricks.pipelines.execution.service.PipelineRunnable$.$anonfun$loadPythonGraph$7$adapted(PipelineGraphLoader.scala:572)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:193)
at com.databricks.pipelines.execution.service.PipelineRunnable$.loadPythonGraph(PipelineGraphLoader.scala:572)
at com.databricks.pipelines.execution.service.PipelineGraphLoader.loadGraph(PipelineGraphLoader.scala:324)
at com.databricks.pipelines.execution.service.PipelineGraphLoader.loadGraph(PipelineGraphLoader.scala:205)
at com.databricks.pipelines.execution.service.DLTComputeRunnableContext.loadGraph(DLTComputeRunnableContext.scala:96)
at com.databricks.pipelines.execution.core.UpdateExecution.$anonfun$initializeAndLoadGraph$1(UpdateExecution.scala:364)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.pipelines.execution.core.monitoring.DeltaPipelinesUsageLogging.$anonfun$recordPipelinesOperation$3(DeltaPipelinesUsageLogging.scala:118)
at com.databricks.pipelines.common.monitoring.OperationStatusReporter.executeWithPeriodicReporting(OperationStatusReporter.scala:120)
at com.databricks.pipelines.common.monitoring.OperationStatusReporter$.executeWithPeriodicReporting(OperationStatusReporter.scala:160)
at com.databricks.pipelines.execution.core.monitoring.DeltaPipelinesUsageLogging.$anonfun$recordPipelinesOperation$6(DeltaPipelinesUsageLogging.scala:137)
at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:555)
at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:650)
at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags$4(UsageLogging.scala:671)
at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:412)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:158)
at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:410)
at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:407)
at com.databricks.pipelines.execution.core.monitoring.PublicLogging.withAttributionContext(DeltaPipelinesUsageLogging.scala:25)
at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:455)
at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:440)
at com.databricks.pipelines.execution.core.monitoring.PublicLogging.withAttributionTags(DeltaPipelinesUsageLogging.scala:25)
at com.databricks.logging.UsageLogging.recordOperationWithResultTags(UsageLogging.scala:645)
at com.databricks.logging.UsageLogging.recordOperationWithResultTags$(UsageLogging.scala:564)
at com.databricks.pipelines.execution.core.monitoring.PublicLogging.recordOperationWithResultTags(DeltaPipelinesUsageLogging.scala:25)
at com.databricks.logging.UsageLogging.recordOperation(UsageLogging.scala:555)
at com.databricks.logging.UsageLogging.recordOperation$(UsageLogging.scala:525)
at com.databricks.pipelines.execution.core.monitoring.PublicLogging.recordOperation(DeltaPipelinesUsageLogging.scala:25)
at com.databricks.pipelines.execution.core.monitoring.PublicLogging.recordOperation0(DeltaPipelinesUsageLogging.scala:60)
at com.databricks.pipelines.execution.core.monitoring.DeltaPipelinesUsageLogging.$anonfun$recordPipelinesOperation$1(DeltaPipelinesUsageLogging.scala:130)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
at com.databricks.pipelines.execution.core.monitoring.DeltaPipelinesUsageLogging.recordPipelinesOperation(DeltaPipelinesUsageLogging.scala:107)
at com.databricks.pipelines.execution.core.monitoring.DeltaPipelinesUsageLogging.recordPipelinesOperation$(DeltaPipelinesUsageLogging.scala:102)
at com.databricks.pipelines.execution.core.UpdateExecution.recordPipelinesOperation(UpdateExecution.scala:55)
at com.databricks.pipelines.execution.core.UpdateExecution.executeStage(UpdateExecution.scala:257)
at com.databricks.pipelines.execution.core.UpdateExecution.initializeAndLoadGraph(UpdateExecution.scala:360)
at com.databricks.pipelines.execution.core.UpdateExecution.executeUpdate(UpdateExecution.scala:344)
at com.databricks.pipelines.execution.core.UpdateExecution.$anonfun$start$3(UpdateExecution.scala:126)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.pipelines.execution.core.WorkloadAttributionContextUtils$.runWithDLTWorkloadTags(WorkloadAttributionContextUtils_DBR_12_Minus.scala:6)
at com.databricks.pipelines.execution.core.UpdateExecution.$anonfun$start$1(UpdateExecution.scala:122)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.pipelines.execution.core.UCContextCompanion$OptionUCContextHelper.runWithNewUCSIfAvailable(BaseUCContext.scala:283)
at com.databricks.pipelines.execution.core.UpdateExecution.start(UpdateExecution.scala:119)
at com.databricks.pipelines.execution.service.ExecutionBackend$$anon$2.$anonfun$run$2(ExecutionBackend.scala:670)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.pipelines.execution.core.CommandContextUtils$.withCommandContext(CommandContextUtils.scala:47)
at com.databricks.pipelines.execution.service.ExecutionBackend$$anon$2.run(ExecutionBackend.scala:670)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.$anonfun$run$1(SparkThreadLocalForwardingThreadPoolExecutor.scala:114)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.IdentityClaim$.withClaim(IdentityClaim.scala:48)
at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.$anonfun$runWithCaptured$4(SparkThreadLocalForwardingThreadPoolExecutor.scala:77)
at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:41)
at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.runWithCaptured(SparkThreadLocalForwardingThreadPoolExecutor.scala:76)
at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.runWithCaptured$(SparkThreadLocalForwardingThreadPoolExecutor.scala:62)
at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.runWithCaptured(SparkThreadLocalForwardingThreadPoolExecutor.scala:111)
at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.run(SparkThreadLocalForwardingThreadPoolExecutor.scala:114)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
@QuentinAmbard - Please let me know if you could help with the mentioned issue, or with any questions if I can.