Code Monkey home page Code Monkey logo

blaze's Introduction

BLAZE

TPC-DS master-ce7-builds

The Blaze accelerator for Apache Spark leverages native vectorized execution to accelerate query processing. It combines the power of the Apache Arrow-DataFusion library and the scale of the Spark distributed computing framework.

Blaze takes a fully optimized physical plan from Spark, mapping it into DataFusion's execution plan, and performs native plan computation in Spark executors.

Blaze is composed of the following high-level components:

  • Spark Extension: hooks the whole accelerator into Spark execution lifetime.
  • Spark Shims: specialized codes for different versions of spark.
  • Native Engine: implements the native engine in rust, including:
    • ExecutionPlan protobuf specification
    • JNI gateway
    • Customized operators, expressions, functions

Based on the inherent well-defined extensibility of DataFusion, Blaze can be easily extended to support:

  • Various object stores.
  • Operators.
  • Simple and Aggregate functions.
  • File formats.

We encourage you to extend DataFusion capability directly and add the supports in Blaze with simple modifications in plan-serde and extension translation.

Build from source

To build Blaze, please follow the steps below:

  1. Install Rust

The native execution lib is written in Rust. So you're required to install Rust (nightly) first for compilation. We recommend you to use rustup.

  1. Install JDK+Maven

Blaze has been well tested on jdk8 and maven3.5, should work fine with higher versions.

  1. Check out the source code.
git clone [email protected]:blaze-init/blaze.git
cd blaze
  1. Build the project.

Specify shims package of which spark version that you would like to run on. You could either build Blaze in dev mode for debugging or in release mode to unlock the full potential of Blaze.

SHIM=spark333 # or spark303
MODE=release # or dev
mvn package -P"${SHIM}" -P"${MODE}"

After the build is finished, a fat Jar package that contains all the dependencies will be generated in the target directory.

Build with docker

You can use the following command to build a centos-7 compatible release:

SHIM=spark333 MODE=release ./release-docker.sh

Run Spark Job with Blaze Accelerator

This section describes how to submit and configure a Spark Job with Blaze support.

  1. move blaze jar package to spark client classpath (normally spark-xx.xx.xx/jars/).

  2. add the follow confs to spark configuration in spark-xx.xx.xx/conf/spark-default.conf:

spark.sql.extensions org.apache.spark.sql.blaze.BlazeSparkSessionExtension
spark.shuffle.manager org.apache.spark.sql.execution.blaze.shuffle.BlazeShuffleManager

# other blaze confs defined in spark-extension/src/main/java/org/apache/spark/sql/blaze/BlazeConf.java
  1. submit a query with spark-sql, or other tools like spark-thriftserver:
spark-sql -f tpcds/q01.sql

Performance

Check Benchmark Results with the latest date for the performance comparison with vanilla Spark on TPC-DS 1TB dataset. The benchmark result shows that Blaze saved ~55% query time and ~60% cluster resources in average. ~6x performance achieved for the best case (q06). Stay tuned and join us for more upcoming thrilling numbers.

Query time: 20240202-query-time

Cluster resources: 20240202-resources

We also encourage you to benchmark Blaze and share the results with us. ๐Ÿค—

Community

We're using Discussions to connect with other members of our community. We hope that you:

  • Ask questions you're wondering about.
  • Share ideas.
  • Engage with other community members.
  • Welcome others who are open-minded. Remember that this is a community we build together ๐Ÿ’ช .

License

Blaze is licensed under the Apache 2.0 License. A copy of the license can be found here.

blaze's People

Contributors

dandandan avatar dependabot[bot] avatar houqp avatar lihao712 avatar liujiayi771 avatar nevi-me avatar richox avatar stepfenshawn avatar ted-jiang avatar yjshen avatar zuston avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

blaze's Issues

Inconsistent casting behavior

Describe the bug
when casting from float/double to int/bigint with overflowed input, spark outputs int::max/min whilst blaze outputs null.

To Reproduce

select cast(1e30 to int);

hive/spark outputs: 2147483647
blaze outputs: NULL

Expected behavior
keeps the same behavior with hive/spark

Additional context

Prune columns in SortExec/SortMergeJoinExec

Is your feature request related to a problem? Please describe.
in spark, Sort/SortMergeJoin do not support changing schema during evaluating. this is acceptable in row-based computation. however, in columnar-based computation, it is possible to prune unused columns in theses operators and reduce the overhead of building rubbish output columns.

Describe the solution you'd like
we could add an optional projection field to Sort/SortMergeJoin to specify the required output columns, and prune unneeded columns as soon as possible.

Describe alternatives you've considered
implement a more generic method like ExecutionPlan::evaluate_with_required_columns()?

Additional context

How to Tune Blaze Performance?

I try to run time dev/run-tpcds-test --data-location file:///data/tpcds-sf1 --query-filter q1
But it seems that Blaze is a little slower than JVM version.

Blaze:
real 0m14.413s user 0m39.440s sys 0m2.240s
Jvm (by remove Blaze extension in run-tpcds-test ):
real 0m11.553s user 0m47.240s sys 0m2.280s
How can I tune blaze?

getFieldNameByExprId returning id or returning name?

I'm modifying blaze to execute DataSourceExec, mainly by just hook into parquet scan exec. running into

Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 2) (10.73.23.39 executor driver): java.lang.RuntimeException: called `Result::unwrap()` on an `Err` value: Plan("cannot create execution plan: DataFusionError(ArrowError(SchemaError(\"Unable to get field named \\\"#12\\\". Valid fields: [\\\"id\\\", \\\"data\\\"]\")))")

  def getFieldNameByExprId(expr: NamedExpression): String =     s"#${expr.exprId.id}"

it seems filter/projection is using this to extract exprId as column references, but ParquetScanExec is providing names ?

Error: renamed_column_names length not matched with input schem

Describe the bug
I built and tried Blaze out on Databricks with Scala 2.12 and Spark 3.0.1 (to match Blaze's dependencies).

I get an error when trying to display results.

df = spark.read.format('delta').load('...') # doesn't look like the columns of the table matter
df.display()

I get the below error:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 14.0 failed 4 times, most recent failure: Lost task 0.3 in stage 14.0 (TID 48, 10.139.64.28, executor driver): 
java.lang.RuntimeException: called `Result::unwrap()` on an `Err` value: DataFusionError(Plan("renamed_column_names length not matched with input schema"))
	at org.apache.spark.sql.blaze.JniBridge.callNative(Native Method)
	at org.apache.spark.sql.blaze.BlazeCallNativeWrapper.<init>(NativeSupports.scala:157)
	at org.apache.spark.sql.blaze.NativeSupports$.executeNativePlan(NativeSupports.scala:91)
	at org.apache.spark.sql.blaze.NativeRDD.compute(NativeRDD.scala:43)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
	at org.apache.spark.scheduler.Task.run(Task.scala:117)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:655)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1581)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:658)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

To Reproduce
Steps to reproduce the behavior:

Load blaze on a Databricks cluster (๐Ÿซค not ideal, I know).

Try to display a dataframe

Expected behavior
A clear and concise description of what you expected to happen.

Spark should be able to print the dataframe

Additional context
Add any other context about the problem here.

I built with Ubuntu 18.04 so I could get the right glibc version. I don't think this is relevant though as it seems that the issue is from DataFusion's side

Supports nested complex types

Is your feature request related to a problem? Please describe.
currently we add an assertion to stop converting plans which schema contains nested complex types since they are not supported in arrow::array::make_builder. but in the latest version only bucket repartitioner depends on it, and bucket repartitioner can be safely fallback to sort repartitioner in that case.

Describe the solution you'd like

  1. remove the assertion.
  2. fallback bucket repartitioner to sort when schema contains complex type.

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.

Use 4-byte length for storing byte slices

Is your feature request related to a problem? Please describe.
byte slices are widely used in some row-based data structures like row keys and aggregate buffers. in 64-bit systems, size of a slice is 16 bytes in total (8 for pointer, 8 for length). however 4 bytes for length is enough.

Describe the solution you'd like
use slimmer structure for Box<[u8]> with 4-byte length. there is a crate slimmer_box which seems promising.

Describe alternatives you've considered

Additional context

Blaze support for Spark versions

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
We have migrated to Spark version 3.3.2 and we would like to know if this will be supported in recent future.

Describe the solution you'd like
Building blaze against > 3.3.2 and support for Spark major version releases.

Describe alternatives you've considered
We constantly upgrade Spark due to other frameworks we use in data pipelines, may be another option is Ballista.

Additional context
Any suggestions on Arrow / data fusion integrations for Spark or distributed compute is highly appreciated.

#168

Runtime errors on both versions

Hi, I just want to ask how to configure "blaze". Currently my configuration is the same as in the documentation, but I'm just wondering if it only works on standalone clusters or also in cloud. I'd like to run my jobs in Google Dataproc. Is it possible? Any ideas what i should change?

spark configuration

spark.sql.extensions org.apache.spark.sql.blaze.BlazeSparkSessionExtension
spark.shuffle.manager org.apache.spark.sql.execution.blaze.shuffle.BlazeShuffleManager
  • blaze for spark 3.0.3 - error
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/sql/execution/joins/package$BuildSide
	at org.apache.spark.sql.blaze.BlazeConvertStrategy$.$anonfun$apply$3(BlazeConvertStrategy.scala:59)
	at org.apache.spark.sql.blaze.BlazeConvertStrategy$.$anonfun$apply$3$adapted(BlazeConvertStrategy.scala:55)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:184)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:183)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:183)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:183)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:183)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:183)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:183)
	at org.apache.spark.sql.blaze.BlazeConvertStrategy$.apply(BlazeConvertStrategy.scala:55)
	at org.apache.spark.sql.blaze.BlazeColumnarOverrides$$anon$1.apply(BlazeSparkSessionExtension.scala:85)
	at org.apache.spark.sql.blaze.BlazeColumnarOverrides$$anon$1.apply(BlazeSparkSessionExtension.scala:74)
	at org.apache.spark.sql.execution.ApplyColumnarRulesAndInsertTransitions.$anonfun$apply$1(Columnar.scala:532)
	at org.apache.spark.sql.execution.ApplyColumnarRulesAndInsertTransitions.$anonfun$apply$1$adapted(Columnar.scala:531)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.sql.execution.ApplyColumnarRulesAndInsertTransitions.apply(Columnar.scala:531)
	at org.apache.spark.sql.execution.ApplyColumnarRulesAndInsertTransitions.apply(Columnar.scala:495)
	at org.apache.spark.sql.execution.QueryExecution$.$anonfun$prepareForExecution$1(QueryExecution.scala:378)
	at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
	at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
	at scala.collection.immutable.List.foldLeft(List.scala:91)
	at org.apache.spark.sql.execution.QueryExecution$.prepareForExecution(QueryExecution.scala:377)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:118)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:144)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:144)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:118)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:111)
	at org.apache.spark.sql.execution.columnar.InMemoryRelation$.apply(InMemoryRelation.scala:305)
	at org.apache.spark.sql.execution.CacheManager.$anonfun$cacheQuery$2(CacheManager.scala:102)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:97)
	at org.apache.spark.sql.Dataset.persist(Dataset.scala:3178)
	at org.apache.spark.sql.Dataset.cache(Dataset.scala:3188)
	at org.xxxx.etl.Transform$.apply(Transform.scala:13)
	at org.xxxx.run(xxxx.scala:25)
	at org.xxx.App$.delayedEndpoint$org$xxxx$App$1(App.scala:15)
	at org.xxxx.App$delayedInit$body.apply(App.scala:6)
	at scala.Function0.apply$mcV$sp(Function0.scala:39)
	at scala.Function0.apply$mcV$sp$(Function0.scala:39)
	at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17)
	at scala.App.$anonfun$main$1$adapted(App.scala:80)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at scala.App.main(App.scala:80)
	at scala.App.main$(App.scala:78)
	at org.xxxx.App$.main(App.scala:6)
	at org.xxxx.App.main(App.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.execution.joins.package$BuildSide
	at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	... 66 more
  • blaze for spark 3.3.3 - error
Exception in thread "main" java.lang.VerifyError: class org.apache.spark.sql.execution.blaze.plan.NativeSortExec overrides final method org.apache.spark.sql.catalyst.trees.TreeNode.withNewChildren(Lscala/collection/Seq;)Lorg/apache/spark/sql/catalyst/trees/TreeNode;
	at java.base/java.lang.ClassLoader.defineClass1(Native Method)
	at java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1022)
	at java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:174)
	at java.base/java.net.URLClassLoader.defineClass(URLClassLoader.java:555)
	at java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:458)
	at java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:452)
	at java.base/java.security.AccessController.doPrivileged(Native Method)
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:451)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
	at java.base/java.lang.Class.getDeclaredConstructors0(Native Method)
	at java.base/java.lang.Class.privateGetDeclaredConstructors(Class.java:3137)
	at java.base/java.lang.Class.getConstructor0(Class.java:3342)
	at java.base/java.lang.Class.newInstance(Class.java:556)
	at org.apache.spark.sql.blaze.Shims$.get$lzycompute(Shims.scala:231)
	at org.apache.spark.sql.blaze.Shims$.get(Shims.scala:229)
	at org.apache.spark.sql.blaze.BlazeSparkSessionExtension.<init>(BlazeSparkSessionExtension.scala:31)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
	at org.apache.spark.sql.SparkSession$.$anonfun$applyExtensions$1(SparkSession.scala:1222)
	at org.apache.spark.sql.SparkSession$.$anonfun$applyExtensions$1$adapted(SparkSession.scala:1219)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$applyExtensions(SparkSession.scala:1219)
	at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:960)
	at org.xxxx.App$.delayedEndpoint$org$xxx$App$1(App.scala:10)
	at org.xxx.App$delayedInit$body.apply(App.scala:6)
	at scala.Function0.apply$mcV$sp(Function0.scala:39)
	at scala.Function0.apply$mcV$sp$(Function0.scala:39)
	at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17)
	at scala.App.$anonfun$main$1$adapted(App.scala:80)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at scala.App.main(App.scala:80)
	at scala.App.main$(App.scala:78)
	at org.xxx.App$.main(App.scala:6)
	at org.xxx.App.main(App.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:973)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1061)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1070)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Supports skipping partial aggregates

Is your feature request related to a problem? Please describe.
as described in SPARK-31973, skipping partial aggregates where data cardinality is high (like group by user_id) sufficiently improves performance.

Describe the solution you'd like
implements partial agg skipping strategy in agg_tables.rs:

  1. mark an AggExec as skippable. (in spark side, where requiredDistribution is empty)
  2. process the first N records.
  3. check the number of input records and the number of aggregated records, if reached threshold, directly outputs all in-memory, spilled and rest records.

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.

kyuubi on spark error

environment๏ผš
cdp:7.x
spark:3.3.3
kyuubi:1.7.1
Occasionally reporting an error today.
error message๏ผš

Error: org.apache.kyuubi.KyuubiSQLException: org.apache.kyuubi.KyuubiSQLException: Error operating ExecuteStatement: org.apache.spark.SparkException: Job aborted due to stage failure: Task 83 in stage 1411.0 failed 4 times, most recent failure: Lost task 83.3 in stage 1411.0 (TID 429682) (node77 executor 390): java.lang.RuntimeException: native executing [partition=83] panics: Execution error: Arrow error: Invalid argument error: all columns in a record batch must have the same length
	
	Driver stacktrace:
		at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2668)
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2604)
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2603)
		at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
		at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
		at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
		at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2603)
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1178)
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1178)
		at scala.Option.foreach(Option.scala:407)
		at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1178)
		at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2856)
		at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2798)
		at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2787)
		at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	Caused by: java.lang.RuntimeException: native executing [partition=83] panics: Execution error: Arrow error: Invalid argument error: all columns in a record batch must have the same length
	
		at org.apache.kyuubi.KyuubiSQLException$.apply(KyuubiSQLException.scala:69)
		at org.apache.kyuubi.engine.spark.operation.SparkOperation$$anonfun$onError$1.$anonfun$applyOrElse$1(SparkOperation.scala:189)
		at org.apache.kyuubi.Utils$.withLockRequired(Utils.scala:395)
		at org.apache.kyuubi.operation.AbstractOperation.withLockRequired(AbstractOperation.scala:51)
		at org.apache.kyuubi.engine.spark.operation.SparkOperation$$anonfun$onError$1.applyOrElse(SparkOperation.scala:177)
		at org.apache.kyuubi.engine.spark.operation.SparkOperation$$anonfun$onError$1.applyOrElse(SparkOperation.scala:172)
		at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
		at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.$anonfun$executeStatement$1(ExecuteStatement.scala:88)
		at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
		at org.apache.kyuubi.engine.spark.operation.SparkOperation.$anonfun$withLocalProperties$1(SparkOperation.scala:155)
		at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
		at org.apache.kyuubi.engine.spark.operation.SparkOperation.withLocalProperties(SparkOperation.scala:139)
		at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.executeStatement(ExecuteStatement.scala:78)
		at org.apache.kyuubi.engine.spark.operation.ExecuteStatement$$anon$1.run(ExecuteStatement.scala:100)
		at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
		at java.util.concurrent.FutureTask.run(FutureTask.java:266)
		at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
		at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
		at java.lang.Thread.run(Thread.java:748)
	Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 83 in stage 1411.0 failed 4 times, most recent failure: Lost task 83.3 in stage 1411.0 (TID 429682) (node77 executor 390): java.lang.RuntimeException: native executing [partition=83] panics: Execution error: Arrow error: Invalid argument error: all columns in a record batch must have the same length
	
	Driver stacktrace:
		at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2668)
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2604)
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2603)
		at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
		at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
		at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
		at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2603)
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1178)
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1178)
		at scala.Option.foreach(Option.scala:407)
		at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1178)
		at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2856)
		at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2798)
		at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2787)
		at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	Caused by: java.lang.RuntimeException: native executing [partition=83] panics: Execution error: Arrow error: Invalid argument error: all columns in a record batch must have the same length
	
		at org.apache.kyuubi.KyuubiSQLException$.apply(KyuubiSQLException.scala:69)
		at org.apache.kyuubi.operation.ExecuteStatement.waitStatementComplete(ExecuteStatement.scala:129)
		at org.apache.kyuubi.operation.ExecuteStatement.$anonfun$runInternal$1(ExecuteStatement.scala:161)
		at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
		at java.util.concurrent.FutureTask.run(FutureTask.java:266)
		at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
		at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
		at java.lang.Thread.run(Thread.java:748) (state=,code=0)

SQL has been encrypted๏ผš

with st_xxxxxxxxxxxxxxx_day as (select
dt.xxxxx
,xxxx
,xxxxxx
,xxxxxxx
,maxxxxxxxxx
,ismme5gxxxxxxxx
,iss1u5gxxxxxxxx
,dcnr
,is_5g_on_off
,is_flow_limit
,dv.vendor as fact_id
,dv.vendor as cat_one
,dv.model_id
,null as mt_type
,dv.model_name as terminalname
,dv.sw_version as software_version
,null as card_slot
,dv.support_volte as  is_support_volte
,xxxxxxx_tranoctets
,xxxxxxx_5g_tranoctets
,xxxxxxx_4g_tranoctets
,xxxxxxx_webdlvalidoctets
,xxxxxxx_webdlvaliddelay
,xxxxxxx_5g_webdlvalidoctets
,xxxxxxx_5g_webdlvaliddelay
,xxxxxxx_4g_webdlvalidoctets
,xxxxxxx_4g_webdlvaliddelay
,xxxxxx_cnt_totalattach_req
,xxxxxx_cnt_totalattach_succ
,xxxxxx_cnt_md_totalattach_req
,xxxxxx_cnt_md_totalattach_succ
,xxxxxx_cnt_md_totalattach_fail
,xxxxxx_erbmodify_req
,xxxxxx_erbmodify_succ
,xxxxxx_erbmodify_err
,mdcellcnt
,nomdcellcnt
,xxx_tranoctets
,xxx_webdlvalidoctets
,xxx_webdlvaliddelay
,thzl
,swzl
,cpyzf
,sfzqx
,yytfw
,rexian
,predict
,xxx_nbrallWebReq
,xxx_nbrallWebResSucc
,xxx_WebResDelay
,xxx_nbrTcpConn1stReq
,xxx_nbrTcpConn1stSucc
,xxx_nbrTcpConn2ndSucc
,xxx_tcpConn1stDelay
,xxx_tcpConn2ndDelay
,xxx_tcpConnDelay
,xxx_FirstScreenFincnt
,xxx_FirstScreenFinTime
,xxx_VideoBufferServCnt
,xxx_VideoServCnt
,xxx_VideoDlOctets
,xxx_VideoDlDuxxxxxxxxion
,xxx_videoThroughput_cnt
,xxx_videoThroughput
,xxx_videoBufferCnt
,xxx_VideoPlayTime
,xxx_InitBufferReq
,xxx_InitBufferSucc
,xxx_InitBufferDuxxxxxxxxion
,lte_flux
,g5_under_lte_flux
,g5_flux
from (select
dt.xxxxx as xxxxx
,max(tabletype) as tabletype
,max(xxxx) as xxxx
,max(case when rank=1 then xxxxxx end) as xxxxxx
,max(case when rank=1 then xxxxxxx end) as xxxxxxx
,max(maxxxxxxxxx) as maxxxxxxxxx
,max(ismme5gxxxxxxxx) ismme5gxxxxxxxx
,max(iss1u5gxxxxxxxx) iss1u5gxxxxxxxx
,max(dcnr) as dcnr
,max(is_5g_on_off) as is_5g_on_off
,sum(xxxxxxx_tranoctets) as xxxxxxx_tranoctets
,sum(xxxxxxx_5g_tranoctets) as xxxxxxx_5g_tranoctets
,sum(xxxxxxx_4g_tranoctets) as xxxxxxx_4g_tranoctets
,sum(xxxxxxx_webdlvalidoctets) as xxxxxxx_webdlvalidoctets
,sum(xxxxxxx_webdlvaliddelay) as xxxxxxx_webdlvaliddelay
,sum(xxxxxxx_5g_webdlvalidoctets) as xxxxxxx_5g_webdlvalidoctets
,sum(xxxxxxx_5g_webdlvaliddelay) as xxxxxxx_5g_webdlvaliddelay
,sum(xxxxxxx_4g_webdlvalidoctets) as xxxxxxx_4g_webdlvalidoctets
,sum(xxxxxxx_4g_webdlvaliddelay) as xxxxxxx_4g_webdlvaliddelay
,sum(xxxxxx_cnt_totalattach_req) as xxxxxx_cnt_totalattach_req
,sum(xxxxxx_cnt_totalattach_succ) as xxxxxx_cnt_totalattach_succ
,sum(xxxxxx_cnt_md_totalattach_req) as xxxxxx_cnt_md_totalattach_req
,sum(xxxxxx_cnt_md_totalattach_succ) as xxxxxx_cnt_md_totalattach_succ
,sum(xxxxxx_cnt_md_totalattach_fail) as xxxxxx_cnt_md_totalattach_fail
,sum(xxxxxx_erbmodify_req) as xxxxxx_erbmodify_req
,sum(xxxxxx_erbmodify_succ) as xxxxxx_erbmodify_succ
,sum(xxxxxx_erbmodify_err) as xxxxxx_erbmodify_err
,max(mdcellcnt) as mdcellcnt
,max(nomdcellcnt) as nomdcellcnt
,sum(xxx_tranoctets) as xxx_tranoctets
,sum(xxx_webdlvalidoctets) as xxx_webdlvalidoctets
,sum(xxx_webdlvaliddelay) as xxx_webdlvaliddelay
,max(is_flow_limit) as is_flow_limit
,max(thzl) as thzl
,max(swzl) as swzl
,max(cpyzf) as cpyzf
,max(sfzqx) as sfzqx
,max(yytfw) as yytfw
,max(rexian) as rexian
,max(predict) as predict
,sum(xxx_nbrallWebReq) as xxx_nbrallWebReq
,sum(xxx_nbrallWebResSucc) as xxx_nbrallWebResSucc
,sum(xxx_WebResDelay) as xxx_WebResDelay
,sum(xxx_nbrTcpConn1stReq) as xxx_nbrTcpConn1stReq
,sum(xxx_nbrTcpConn1stSucc) as xxx_nbrTcpConn1stSucc
,sum(xxx_nbrTcpConn2ndSucc) as xxx_nbrTcpConn2ndSucc
,sum(xxx_tcpConn1stDelay) as xxx_tcpConn1stDelay
,sum(xxx_tcpConn2ndDelay) as xxx_tcpConn2ndDelay
,sum(xxx_tcpConnDelay) as xxx_tcpConnDelay
,sum(xxx_FirstScreenFincnt) as xxx_FirstScreenFincnt
,sum(xxx_FirstScreenFinTime) as xxx_FirstScreenFinTime
,sum(xxx_VideoBufferServCnt) as xxx_VideoBufferServCnt
,sum(xxx_VideoServCnt) as xxx_VideoServCnt
,sum(xxx_VideoDlOctets) as xxx_VideoDlOctets
,sum(xxx_VideoDlDuxxxxxxxxion) as xxx_VideoDlDuxxxxxxxxion
,sum(xxx_videoThroughput_cnt) as xxx_videoThroughput_cnt
,sum(xxx_videoThroughput) as xxx_videoThroughput
,sum(xxx_videoBufferCnt) as xxx_videoBufferCnt
,sum(xxx_VideoPlayTime) as xxx_VideoPlayTime
,sum(xxx_InitBufferReq) as xxx_InitBufferReq
,sum(xxx_InitBufferSucc) as xxx_InitBufferSucc
,sum(xxx_InitBufferDuxxxxxxxxion) as xxx_InitBufferDuxxxxxxxxion
,sum(lte_flux) as lte_flux
,sum(g5_under_lte_flux) as g5_under_lte_flux
,sum(g5_flux) as g5_flux
from (
select
xxxxx as xxxxx
,max(tabletype) as tabletype
,xxxx as xxxx
,xxxxxx as xxxxxx
,xxxxxxx as xxxxxxx
,max(case when cellid is not null then cast(xxxxxxxx as bigint) end) as maxxxxxxxxx
,max(case when tabletype=1 and xxxxxxxx in ('10','9') and cellid is not null then 1 else 0 end) ismme5gxxxxxxxx
,max(case when tabletype=2 and xxxxxxxx in ('10','9') and cellid is not null then 1 else 0 end) iss1u5gxxxxxxxx
,max(case when xxxxxxxx in ('10','9') then 1 else dcnr end) as dcnr
,max(is_5g_on_off) as is_5g_on_off
,nvl(sum(xxx_ultranoctets),0)+nvl(sum(xxx_dltranoctets),0) as xxxxxxx_tranoctets
,sum(case when xxxxxxxx in ('10','9') then xxxxxxx_ultranoctets end)+sum(case when xxxxxxxx in ('10','9') then xxxxxxx_dltranoctets end) as xxxxxxx_5g_tranoctets
,sum(case when xxxxxxxx='6' then xxxxxxx_ultranoctets end)+sum(case when xxxxxxxx='6' then xxxxxxx_dltranoctets end) as xxxxxxx_4g_tranoctets
,sum(xxxxxxx_webdlvalidoctets) as xxxxxxx_webdlvalidoctets
,sum(xxxxxxx_webdlvaliddelay) as xxxxxxx_webdlvaliddelay
,sum(case when xxxxxxxx='9' then xxxxxxx_webdlvalidoctets end) as xxxxxxx_5g_webdlvalidoctets
,sum(case when xxxxxxxx='9' then xxxxxxx_webdlvaliddelay end) as xxxxxxx_5g_webdlvaliddelay
,sum(case when xxxxxxxx='6' then xxxxxxx_webdlvalidoctets end) as xxxxxxx_4g_webdlvalidoctets
,sum(case when xxxxxxxx='6' then xxxxxxx_webdlvaliddelay end) as xxxxxxx_4g_webdlvaliddelay
,sum(xxxxxx_cnt_totalattach_req) as xxxxxx_cnt_totalattach_req
,sum(xxxxxx_cnt_totalattach_succ) as xxxxxx_cnt_totalattach_succ
,sum(case when ismaodian=1 then xxxxxx_cnt_totalattach_req end) as xxxxxx_cnt_md_totalattach_req
,sum(case when ismaodian=1 then xxxxxx_cnt_totalattach_succ end) as xxxxxx_cnt_md_totalattach_succ
,sum(case when ismaodian=1 then xxxxxx_cnt_totalattach_req else 0 end)- sum(case when ismaodian=1 then xxxxxx_cnt_totalattach_succ else 0 end) as xxxxxx_cnt_md_totalattach_fail
,sum(xxxxxx_erbmodify_req) as xxxxxx_erbmodify_req
,sum(xxxxxx_erbmodify_succ) as xxxxxx_erbmodify_succ
,sum(xxxxxx_erbmodify_err) as xxxxxx_erbmodify_err
,count(distinct case when ismaodian=1  then cellid end) as mdcellcnt
,count(distinct cellid)-count(distinct case when ismaodian=1  then cellid end) as nomdcellcnt
,sum(case when xxxxxxxx='10' then xxx_ultranoctets end)+sum(case when xxxxxxxx='10' then xxx_dltranoctets end) as xxx_tranoctets
,sum(case when xxxxxxxx='10' then xxx_webdlvalidoctets end) as xxx_webdlvalidoctets
,sum(case when xxxxxxxx='10' then xxx_webdlvaliddelay end) as xxx_webdlvaliddelay
,max(is_flow_limit) as is_flow_limit
,max(thzl) as thzl
,max(swzl) as swzl
,max(cpyzf) as cpyzf
,max(sfzqx) as sfzqx
,max(yytfw) as yytfw
,max(rexian) as rexian
,max(predict) as predict
,sum(xxx_nbrallWebReq) as xxx_nbrallWebReq
,sum(xxx_nbrallWebResSucc) as xxx_nbrallWebResSucc
,sum(xxx_WebResDelay) as xxx_WebResDelay
,sum(xxx_nbrTcpConn1stReq) as xxx_nbrTcpConn1stReq
,sum(xxx_nbrTcpConn1stSucc) as xxx_nbrTcpConn1stSucc
,sum(xxx_nbrTcpConn2ndSucc) as xxx_nbrTcpConn2ndSucc
,sum(xxx_tcpConn1stDelay) as xxx_tcpConn1stDelay
,sum(xxx_tcpConn2ndDelay) as xxx_tcpConn2ndDelay
,sum(xxx_tcpConnDelay) as xxx_tcpConnDelay
,sum(xxx_FirstScreenFincnt) as xxx_FirstScreenFincnt
,sum(xxx_FirstScreenFinTime) as xxx_FirstScreenFinTime
,sum(xxx_VideoBufferServCnt) as xxx_VideoBufferServCnt
,sum(xxx_VideoServCnt) as xxx_VideoServCnt
,sum(xxx_VideoDlOctets) as xxx_VideoDlOctets
,sum(xxx_VideoDlDuxxxxxxxxion) as xxx_VideoDlDuxxxxxxxxion
,sum(xxx_videoThroughput_cnt) as xxx_videoThroughput_cnt
,sum(xxx_videoThroughput) as xxx_videoThroughput
,sum(xxx_videoBufferCnt) as xxx_videoBufferCnt
,sum(xxx_VideoPlayTime) as xxx_VideoPlayTime
,sum(xxx_InitBufferReq) as xxx_InitBufferReq
,sum(xxx_InitBufferSucc) as xxx_InitBufferSucc
,sum(xxx_InitBufferDuxxxxxxxxion) as xxx_InitBufferDuxxxxxxxxion
,sum(xdrcnt) as xdrcnt
,sum(lte_flux) as lte_flux
,sum(g5_under_lte_flux) as g5_under_lte_flux
,sum(g5_flux) as g5_flux
,row_number() over(partition by xxxxx order by (nvl(sum(xxx_ultranoctets),0)+nvl(sum(xxx_dltranoctets),0)) desc,sum(xdrcnt) desc) rank
from (select
1 tabletype
,xxxxxxxx
,eci as cellid
,xxxxx
,xxxx
,substr(xxxxxx,1,8) as xxxxxxx
,xxxxxx as xxxxxx
,cast(null as string) as dcnr
,case when xxxxxx_erbmodify_succ>0 or xxxxxxxx='9' then 1 else 0 end as ismaodian
,null as xxxxxxx_ultranoctets
,null as xxxxxxx_dltranoctets
,null as xxxxxxx_webdlvalidoctets
,null as xxxxxxx_webdlvaliddelay
,xxxxxx_cnt_totalattach_req
,xxxxxx_cnt_totalattach_succ
,xxxxxx_erbmodify_req
,xxxxxx_erbmodify_succ
,xxxxxx_erbmodify_err
,null as xxx_ultranoctets
,null as xxx_dltranoctets
,case when xxxxxxxx='10' then 1 else 0 end as is_5g_on_off
,cast(null as double) as xxx_webdlvalidoctets
,cast(null as double) as xxx_webdlvaliddelay
,cast(null as string) as is_flow_limit
,cast(null as double) as thzl
,cast(null as double) as swzl
,cast(null as double) as cpyzf
,cast(null as double) as sfzqx
,cast(null as double) as yytfw
,cast(null as double) as rexian
,cast(null as double) as predict
,cast(null as double) as xxx_nbrallWebReq
,cast(null as double) as xxx_nbrallWebResSucc
,cast(null as double) as xxx_WebResDelay
,cast(null as double) as xxx_nbrTcpConn1stReq
,cast(null as double) as xxx_nbrTcpConn1stSucc
,cast(null as double) as xxx_nbrTcpConn2ndSucc
,cast(null as double) as xxx_tcpConn1stDelay
,cast(null as double) as xxx_tcpConn2ndDelay
,cast(null as double) as xxx_tcpConnDelay
,cast(null as double) as xxx_FirstScreenFincnt
,cast(null as double) as xxx_FirstScreenFinTime
,cast(null as double) as xxx_VideoBufferServCnt
,cast(null as double) as xxx_VideoServCnt
,cast(null as double) as xxx_VideoDlOctets
,cast(null as double) as xxx_VideoDlDuxxxxxxxxion
,cast(null as double) as xxx_videoThroughput_cnt
,cast(null as double) as xxx_videoThroughput
,cast(null as double) as xxx_videoBufferCnt
,cast(null as double) as xxx_VideoPlayTime
,cast(null as double) as xxx_InitBufferReq
,cast(null as double) as xxx_InitBufferSucc
,cast(null as double) as xxx_InitBufferDuxxxxxxxxion
,xxxxxx_xdrcnt as xdrcnt
,cast(null as string) as lte_flux
,cast(null as string) as g5_under_lte_flux
,cast(null as string) as g5_flux
from xxx_xxx.xxxxxxxxxxxxxxxxxxxx_day
where day=20231002 and interfaceid='5'

union all
select
1 as tabletype
,xxxxxxxx as xxxxxxxx
,nci as cellid
,xxxxx as xxxxx
,xxxx as xxxx
,substr(xxxxxx,1,8) as xxxxxxx
,xxxxxx as xxxxxx
,null as dcnr
,case when xxxxxxxx='9' then 1 else 0 end as ismaodian
,null as xxxxxxx_ultranoctets
,null as xxxxxxx_dltranoctets
,null as xxxxxxx_webdlvalidoctets
,null as xxxxxxx_webdlvaliddelay
,null as xxxxxx_cnt_totalattach_req
,null as xxxxxx_cnt_totalattach_succ
,null as xxxxxx_erbmodify_req
,null as xxxxxx_erbmodify_succ
,null as xxxxxx_erbmodify_err
,null as xxx_ultranoctets
,null as xxx_dltranoctets
,case when xxxxxxxx='10' then 1 else 0 end is_5g_on_off
,null as xxx_webdlvalidoctets
,null as xxx_webdlvaliddelay
,null as is_flow_limit
,null as thzl
,null as swzl
,null as cpyzf
,null as sfzqx
,null as yytfw
,null as rexian
,null as predict
,null as xxx_nbrallWebReq
,null as xxx_nbrallWebResSucc
,null as xxx_WebResDelay
,null as xxx_nbrTcpConn1stReq
,null as xxx_nbrTcpConn1stSucc
,null as xxx_nbrTcpConn2ndSucc
,null as xxx_tcpConn1stDelay
,null as xxx_tcpConn2ndDelay
,null as xxx_tcpConnDelay
,null as xxx_FirstScreenFincnt
,null as xxx_FirstScreenFinTime
,null as xxx_VideoBufferServCnt
,null as xxx_VideoServCnt
,null as xxx_VideoDlOctets
,null as xxx_VideoDlDuxxxxxxxxion
,null as xxx_videoThroughput_cnt
,null as xxx_videoThroughput
,null as xxx_videoBufferCnt
,null as xxx_VideoPlayTime
,null as xxx_InitBufferReq
,null as xxx_InitBufferSucc
,null as xxx_InitBufferDuxxxxxxxxion
,sac_xdrcnt as xdrcnt
,null as lte_flux
,null as g5_under_lte_flux
,null as g5_flux
from xxx_xxx.xxxxxx_day
where day=20231002 and interfaceid='39'

union all
select
2 as tabletype
,xxxxxxxx as xxxxxxxx
,cgi as cellid
,xxxxx
,xxxx
,substr(xxxxxx,1,8) as xxxxxxx
,xxxxxx as xxxxxx
,null dcnr
,case when xxxxxxxx='9' then 1 else 0 end as ismaodian
,xxx_ultranoctets as xxxxxxx_ultranoctets
,xxx_dltranoctets as xxxxxxx_dltranoctets
,xxx_webdlvalidoctets as xxxxxxx_webdlvalidoctets
,xxx_webdlvaliddelay as xxxxxxx_webdlvaliddelay
,null as xxxxxx_cnt_totalattach_req
,null as xxxxxx_cnt_totalattach_succ
,null as xxxxxx_erbmodify_req
,null as xxxxxx_erbmodify_succ
,null as xxxxxx_erbmodify_err
,xxx_ultranoctets
,xxx_dltranoctets
,case when xxxxxxxx='10' then 1 else 0 end as is_5g_on_off
,xxx_webdlvalidoctets
,xxx_webdlvaliddelay
,null as is_flow_limit
,null as thzl
,null as swzl
,null as cpyzf
,null as sfzqx
,null as yytfw
,null as rexian
,null as predict
,xxx_nbrallWebReq
,xxx_nbrallWebResSucc
,xxx_WebResDelay
,xxx_nbrTcpConn1stReq
,xxx_nbrTcpConn1stSucc
,xxx_nbrTcpConn2ndSucc
,xxx_tcpConn1stDelay
,xxx_tcpConn2ndDelay
,xxx_tcpConnDelay
,xxx_FirstScreenFincnt
,xxx_FirstScreenFinTime
,xxx_VideoBufferServCnt
,xxx_VideoServCnt
,xxx_VideoDlOctets
,xxx_VideoDlDuxxxxxxxxion
,cast(null as double) as xxx_videoThroughput_cnt
,cast(null as double) as xxx_videoThroughput
,xxx_videoBufferCnt
,cast(null as double) as xxx_VideoPlayTime
,xxx_InitBufferReq
,xxx_InitBufferSucc
,xxx_InitBufferDuxxxxxxxxion
,xxx_xdrcnt as xdrcnt
,null as lte_flux
,null as g5_under_lte_flux
,null as g5_flux
from xxx_xxx.xxxxxxx_day
where day=20231002 and xxx_xdrcnt>0
) dt group by xxxxx
,xxxx
,xxxxxxx
,xxxxxx
) dt group by dt.xxxxx ) dt
left join (select ue_tac_id,model_id,model_name,vendor,support_volte,sw_version from ods_xxxxx.device where day=20230712 group by ue_tac_id,model_id,model_name,vendor,support_volte,sw_version) dv on dt.xxxxxxx=dv.ue_tac_id
where dt.xxxxx is not null and tabletype>0)
,xxxx_day as (select
da.xxxxxxxxx
,da.xxxxxxxxxxx
,da.xxxxxxxxxxxx
,da.cellid as cellid
,da.earlyresident
,da.nightresident
,da.xxxxx
,da.xxxx
,db.user_lv
,db.nearlthreemonth_arpu
,db.inner_months
,db.dinner_name
,db.dinner_type
,db.is_5gdinner
,da.xxxxxx as xxxxxx
,substr(da.xxxxxx,1,8) as xxxxxxx
,db.xxx_kpimrcnt as xxx_kpimrcnt
,db.xxx_kpimr110cnt as xxx_kpimr110cnt
,db.xxx_kpimrweakcoverage_xxxxxxxxe as xxx_kpimrweakcoverage_xxxxxxxxe
,case when xxx_kpimrweakcoverage_xxxxxxxxe<=10 then 25
when xxx_kpimrweakcoverage_xxxxxxxxe is null then 25
when xxx_kpimrweakcoverage_xxxxxxxxe >=30 then 0
else 25-(xxx_kpimrweakcoverage_xxxxxxxxe-10)*(25/(30-10)) end as xxx_Coverage_score
,xxx_nbrTcpConn2ndSucc
,xxx_nbrTcpConn1stReq
,xxx_tcpConnSuccxxxxxxxxio
,case when xxx_tcpConnSuccxxxxxxxxio<=60 then 0
when xxx_tcpConnSuccxxxxxxxxio>=95 then 5
when xxx_tcpConnSuccxxxxxxxxio is null then 5
else 5-(xxx_tcpConnSuccxxxxxxxxio-95)*(5/(60-95)) end as xxx_tcpConnSucc_score
,xxx_nbrallWebReq
,xxx_nbrallWebResSucc
,xxx_WeballResxxxxxxxxio
,case when xxx_WeballResxxxxxxxxio<=60 then 0
when xxx_WeballResxxxxxxxxio>=95 then 5
when xxx_WeballResxxxxxxxxio is null then 5
else 5-(xxx_WeballResxxxxxxxxio-95)*(5/(60-95)) end as xxx_WebSucc_core
,xxx_shoppingxdrcnt
,xxx_shoppingxdrsucccnt
,xxx_shoppingxdrPoorCnt
,xxx_shoppingtcpDLRttGt200Delay
,xxx_shoppingtcpULRttGt200Delay
,xxx_shoppingtcpRtt200_xxxxxxxxe
,case when xxx_shoppingtcpRtt200_xxxxxxxxe <= 5 then 5
when xxx_shoppingtcpRtt200_xxxxxxxxe is null then 5
when xxx_shoppingtcpRtt200_xxxxxxxxe>=15 then 0
else 5-(xxx_shoppingtcpRtt200_xxxxxxxxe-5)*(5/(15-5)) end as xxx_Shopping_score
,xxx_gamexdrcnt
,xxx_gamexdrsucccnt
,xxx_gamexdrPoorCnt
,xxx_gametcpDLRttGt200Delay
,xxx_gametcpULRttGt200Delay
,xxx_gametcpRtt200_xxxxxxxxe
,case when xxx_gametcpRtt200_xxxxxxxxe<=5 then 10
when xxx_gametcpRtt200_xxxxxxxxe is null then 10
when xxx_gametcpRtt200_xxxxxxxxe>=15 then 0
else 10-(xxx_gametcpRtt200_xxxxxxxxe-5)*(10/(15-5)) end as xxx_game_score
,xxx_avgVideoDlcnt
,xxx_VideoDl3Mbpscnt
,xxx_VideoDl3Mbps_xxxxxxxxe
,case when xxx_VideoDl3Mbps_xxxxxxxxe<=5 then 10
when xxx_VideoDl3Mbps_xxxxxxxxe is null then 10
when xxx_VideoDl3Mbps_xxxxxxxxe>=15 then 0
else 10-(xxx_VideoDl3Mbps_xxxxxxxxe-5)*(10/(15-5)) end as xxx_Video_score
,Call_Union_Req
,Call_Union_Suss
,Call_Union_xxxxxxxxe
,case when Call_Union_xxxxxxxxe<=60 then 0
when Call_Union_xxxxxxxxe>=95 then 20
when Call_Union_xxxxxxxxe is null then 20
else 20-(Call_Union_xxxxxxxxe-95)*(20/(60-95)) end as Volte_successxxxxxxxxe_score
,call_k1
,Drop_Union_Cnt
,Drop_Union_xxxxxxxxe
,case when Drop_Union_xxxxxxxxe<=0.2 then 20
when Drop_Union_xxxxxxxxe is null then 20
when Drop_Union_xxxxxxxxe>=1 then 0
else 20-(Drop_Union_xxxxxxxxe-0.2)*(20/(1-0.2)) end as Volte_dropxxxxxxxxe_score
,case when xxx_kpimrweakcoverage_xxxxxxxxe<=10 then 25
when xxx_kpimrweakcoverage_xxxxxxxxe is null then 25
when xxx_kpimrweakcoverage_xxxxxxxxe >=30 then 0
else 25-(xxx_kpimrweakcoverage_xxxxxxxxe-10)*(25/(30-10)) end +
case when xxx_tcpConnSuccxxxxxxxxio<=60 then 0
when xxx_tcpConnSuccxxxxxxxxio>=95 then 5
when xxx_tcpConnSuccxxxxxxxxio is null then 5
else 5-(xxx_tcpConnSuccxxxxxxxxio-95)*(5/(60-95)) end +
case when xxx_WeballResxxxxxxxxio<=60 then 0
when xxx_WeballResxxxxxxxxio>=95 then 5
when xxx_WeballResxxxxxxxxio is null then 5
else 5-(xxx_WeballResxxxxxxxxio-95)*(5/(60-95)) end +
case when xxx_shoppingtcpRtt200_xxxxxxxxe <= 5 then 5
when xxx_shoppingtcpRtt200_xxxxxxxxe is null then 5
when xxx_shoppingtcpRtt200_xxxxxxxxe>=15 then 0
else 5-(xxx_shoppingtcpRtt200_xxxxxxxxe-5)*(5/(15-5)) end +
case when xxx_gametcpRtt200_xxxxxxxxe<=5 then 10
when xxx_gametcpRtt200_xxxxxxxxe is null then 10
when xxx_gametcpRtt200_xxxxxxxxe>=15 then 0
else 10-(xxx_gametcpRtt200_xxxxxxxxe-5)*(10/(15-5)) end +
case when xxx_VideoDl3Mbps_xxxxxxxxe<=5 then 10
when xxx_VideoDl3Mbps_xxxxxxxxe is null then 10
when xxx_VideoDl3Mbps_xxxxxxxxe>=15 then 0
else 10-(xxx_VideoDl3Mbps_xxxxxxxxe-5)*(10/(15-5)) end +
case when Call_Union_xxxxxxxxe<=60 then 0
when Call_Union_xxxxxxxxe>=95 then 20
when Call_Union_xxxxxxxxe is null then 20
else 20-(Call_Union_xxxxxxxxe-95)*(20/(60-95)) end +
case when Drop_Union_xxxxxxxxe<=0.2 then 20
when Drop_Union_xxxxxxxxe is null then 20
when Drop_Union_xxxxxxxxe>=1 then 0
else 20-(Drop_Union_xxxxxxxxe-0.2)*(20/(1-0.2)) end as Total_perceived_score
,case when (case when xxx_kpimrweakcoverage_xxxxxxxxe<=10 then 25
when xxx_kpimrweakcoverage_xxxxxxxxe is null then 25
when xxx_kpimrweakcoverage_xxxxxxxxe >=30 then 0
else 25-(xxx_kpimrweakcoverage_xxxxxxxxe-10)*(25/(30-10)) end +
case when xxx_tcpConnSuccxxxxxxxxio<=60 then 0
when xxx_tcpConnSuccxxxxxxxxio>=95 then 5
when xxx_tcpConnSuccxxxxxxxxio is null then 5
else 5-(xxx_tcpConnSuccxxxxxxxxio-95)*(5/(60-95)) end +
case when xxx_WeballResxxxxxxxxio<=60 then 0
when xxx_WeballResxxxxxxxxio>=95 then 5
when xxx_WeballResxxxxxxxxio is null then 5
else 5-(xxx_WeballResxxxxxxxxio-95)*(5/(60-95)) end +
case when xxx_shoppingtcpRtt200_xxxxxxxxe <= 5 then 5
when xxx_shoppingtcpRtt200_xxxxxxxxe is null then 5
when xxx_shoppingtcpRtt200_xxxxxxxxe>=15 then 0
else 5-(xxx_shoppingtcpRtt200_xxxxxxxxe-5)*(5/(15-5)) end +
case when xxx_gametcpRtt200_xxxxxxxxe<=5 then 10
when xxx_gametcpRtt200_xxxxxxxxe is null then 10
when xxx_gametcpRtt200_xxxxxxxxe>=15 then 0
else 10-(xxx_gametcpRtt200_xxxxxxxxe-5)*(10/(15-5)) end +
case when xxx_VideoDl3Mbps_xxxxxxxxe<=5 then 10
when xxx_VideoDl3Mbps_xxxxxxxxe is null then 10
when xxx_VideoDl3Mbps_xxxxxxxxe>=15 then 0
else 10-(xxx_VideoDl3Mbps_xxxxxxxxe-5)*(10/(15-5)) end +
case when Call_Union_xxxxxxxxe<=60 then 0
when Call_Union_xxxxxxxxe>=95 then 20
when Call_Union_xxxxxxxxe is null then 20
else 20-(Call_Union_xxxxxxxxe-95)*(20/(60-95)) end +
case when Drop_Union_xxxxxxxxe<=0.2 then 20
when Drop_Union_xxxxxxxxe is null then 20
when Drop_Union_xxxxxxxxe>=1 then 0
else 20-(Drop_Union_xxxxxxxxe-0.2)*(20/(1-0.2)) end) >= 85 then 'ไผ˜็ง€'
when (case when xxx_kpimrweakcoverage_xxxxxxxxe<=10 then 25
when xxx_kpimrweakcoverage_xxxxxxxxe is null then 25
when xxx_kpimrweakcoverage_xxxxxxxxe >=30 then 0
else 25-(xxx_kpimrweakcoverage_xxxxxxxxe-10)*(25/(30-10)) end +
case when xxx_tcpConnSuccxxxxxxxxio<=60 then 0
when xxx_tcpConnSuccxxxxxxxxio>=95 then 5
when xxx_tcpConnSuccxxxxxxxxio is null then 5
else 5-(xxx_tcpConnSuccxxxxxxxxio-95)*(5/(60-95)) end +
case when xxx_WeballResxxxxxxxxio<=60 then 0
when xxx_WeballResxxxxxxxxio>=95 then 5
when xxx_WeballResxxxxxxxxio is null then 5
else 5-(xxx_WeballResxxxxxxxxio-95)*(5/(60-95)) end +
case when xxx_shoppingtcpRtt200_xxxxxxxxe <= 5 then 5
when xxx_shoppingtcpRtt200_xxxxxxxxe is null then 5
when xxx_shoppingtcpRtt200_xxxxxxxxe>=15 then 0
else 5-(xxx_shoppingtcpRtt200_xxxxxxxxe-5)*(5/(15-5)) end +
case when xxx_gametcpRtt200_xxxxxxxxe<=5 then 10
when xxx_gametcpRtt200_xxxxxxxxe is null then 10
when xxx_gametcpRtt200_xxxxxxxxe>=15 then 0
else 10-(xxx_gametcpRtt200_xxxxxxxxe-5)*(10/(15-5)) end +
case when xxx_VideoDl3Mbps_xxxxxxxxe<=5 then 10
when xxx_VideoDl3Mbps_xxxxxxxxe is null then 10
when xxx_VideoDl3Mbps_xxxxxxxxe>=15 then 0
else 10-(xxx_VideoDl3Mbps_xxxxxxxxe-5)*(10/(15-5)) end +
case when Call_Union_xxxxxxxxe<=60 then 0
when Call_Union_xxxxxxxxe>=95 then 20
when Call_Union_xxxxxxxxe is null then 20
else 20-(Call_Union_xxxxxxxxe-95)*(20/(60-95)) end +
case when Drop_Union_xxxxxxxxe<=0.2 then 20
when Drop_Union_xxxxxxxxe is null then 20
when Drop_Union_xxxxxxxxe>=1 then 0
else 20-(Drop_Union_xxxxxxxxe-0.2)*(20/(1-0.2)) end)<85 and (case when xxx_kpimrweakcoverage_xxxxxxxxe<=10 then 25
when xxx_kpimrweakcoverage_xxxxxxxxe is null then 25
when xxx_kpimrweakcoverage_xxxxxxxxe >=30 then 0
else 25-(xxx_kpimrweakcoverage_xxxxxxxxe-10)*(25/(30-10)) end +
case when xxx_tcpConnSuccxxxxxxxxio<=60 then 0
when xxx_tcpConnSuccxxxxxxxxio>=95 then 5
when xxx_tcpConnSuccxxxxxxxxio is null then 5
else 5-(xxx_tcpConnSuccxxxxxxxxio-95)*(5/(60-95)) end +
case when xxx_WeballResxxxxxxxxio<=60 then 0
when xxx_WeballResxxxxxxxxio>=95 then 5
when xxx_WeballResxxxxxxxxio is null then 5
else 5-(xxx_WeballResxxxxxxxxio-95)*(5/(60-95)) end +
case when xxx_shoppingtcpRtt200_xxxxxxxxe <= 5 then 5
when xxx_shoppingtcpRtt200_xxxxxxxxe is null then 5
when xxx_shoppingtcpRtt200_xxxxxxxxe>=15 then 0
else 5-(xxx_shoppingtcpRtt200_xxxxxxxxe-5)*(5/(15-5)) end +
case when xxx_gametcpRtt200_xxxxxxxxe<=5 then 10
when xxx_gametcpRtt200_xxxxxxxxe is null then 10
when xxx_gametcpRtt200_xxxxxxxxe>=15 then 0
else 10-(xxx_gametcpRtt200_xxxxxxxxe-5)*(10/(15-5)) end +
case when xxx_VideoDl3Mbps_xxxxxxxxe<=5 then 10
when xxx_VideoDl3Mbps_xxxxxxxxe is null then 10
when xxx_VideoDl3Mbps_xxxxxxxxe>=15 then 0
else 10-(xxx_VideoDl3Mbps_xxxxxxxxe-5)*(10/(15-5)) end +
case when Call_Union_xxxxxxxxe<=60 then 0
when Call_Union_xxxxxxxxe>=95 then 20
when Call_Union_xxxxxxxxe is null then 20
else 20-(Call_Union_xxxxxxxxe-95)*(20/(60-95)) end +
case when Drop_Union_xxxxxxxxe<=0.2 then 20
when Drop_Union_xxxxxxxxe is null then 20
when Drop_Union_xxxxxxxxe>=1 then 0
else 20-(Drop_Union_xxxxxxxxe-0.2)*(20/(1-0.2)) end) >= 60 then 'ไธ€่ˆฌ'
else 'ๅทฎ' end as Perceived_results
,sum(1) over(partition by da.cellid,da.xxxxxxxxx) as users_cell
,sum(case when db.user_lv like 'ๅ››ๆ˜Ÿ%' or db.user_lv like 'ไบ”ๆ˜Ÿ%' then 1 end) over(partition by da.cellid,da.xxxxxxxxx) as user_lv45_cell
,da.county as xxxxxxxxxx
,dp.preference_app_type1
,dp.preference_app_type2
,dp.preference_app_type3
,maxxxxxxxxx
,cast(null as bigint) as complain_cnt
from (select
max(datatable) as datatable
,xxxxx
,cellid
,max(xxxx) as xxxx
,max(xxxxxx) as xxxxxx
,max(cast(xxxxxxxx as bigint)) as xxxxxxxx
,max(max(cast(xxxxxxxx as bigint))) over(partition by xxxxx) as maxxxxxxxxx
,max(earlyresident) as earlyresident
,max(nightresident) as nightresident
,max(xxxxxxxxx) as xxxxxxxxx
,max(xxxxxxxxxxxx) as xxxxxxxxxxxx
,max(xxxxxxxxxxx) as xxxxxxxxxxx
,max(county) as county
,sum(xxx_nbrTcpConn2ndSucc) as xxx_nbrTcpConn2ndSucc
,sum(xxx_nbrTcpConn1stReq) as xxx_nbrTcpConn1stReq
,round(sum(xxx_nbrTcpConn2ndSucc)/sum(xxx_nbrTcpConn1stReq),4)*100 as xxx_tcpConnSuccxxxxxxxxio
,sum(xxx_nbrallWebReq) as xxx_nbrallWebReq
,sum(xxx_nbrallWebResSucc) as xxx_nbrallWebResSucc
,round(sum(xxx_nbrallWebResSucc)/sum(xxx_nbrallWebReq),4)*100 as xxx_WeballResxxxxxxxxio
,sum(xxx_shoppingxdrcnt) as xxx_shoppingxdrcnt
,sum(xxx_shoppingxdrsucccnt) as xxx_shoppingxdrsucccnt
,sum(xxx_shoppingxdrPoorCnt) as xxx_shoppingxdrPoorCnt
,sum(xxx_shoppingtcpDLRttGt200Delay) as xxx_shoppingtcpDLRttGt200Delay
,sum(xxx_shoppingtcpULRttGt200Delay) as xxx_shoppingtcpULRttGt200Delay
,round((sum(xxx_shoppingtcpDLRttGt200Delay)+sum(xxx_shoppingtcpULRttGt200Delay))/sum(xxx_shoppingxdrcnt),4)*100 as xxx_shoppingtcpRtt200_xxxxxxxxe
,sum(xxx_gamexdrcnt) as xxx_gamexdrcnt
,sum(xxx_gamexdrsucccnt) as xxx_gamexdrsucccnt
,sum(xxx_gamexdrPoorCnt) as xxx_gamexdrPoorCnt
,sum(xxx_gametcpDLRttGt200Delay) as xxx_gametcpDLRttGt200Delay
,sum(xxx_gametcpULRttGt200Delay) as xxx_gametcpULRttGt200Delay
,round((sum(xxx_gametcpDLRttGt200Delay)+sum(xxx_gametcpULRttGt200Delay))/sum(xxx_gamexdrcnt),4)*100 as xxx_gametcpRtt200_xxxxxxxxe
,sum(xxx_avgVideoDlcnt) as xxx_avgVideoDlcnt
,sum(xxx_VideoDl3Mbpscnt) as xxx_VideoDl3Mbpscnt
,round((sum(xxx_VideoDl3Mbpscnt)/sum(xxx_avgVideoDlcnt)),4)*100 as xxx_VideoDl3Mbps_xxxxxxxxe
,sum(Call_Union_Req) as Call_Union_Req
,sum(Call_Union_Suss) as Call_Union_Suss
,round((sum(Call_Union_Suss)/sum(Call_Union_Req)),4)*100 as Call_Union_xxxxxxxxe
,sum(call_k1) as call_k1
,sum(Drop_Union_Cnt) as Drop_Union_Cnt
,round((sum(Drop_Union_Cnt)/sum(call_k1)),4)*100 as Drop_Union_xxxxxxxxe
from (
select
1 as datatable
,xxxxx
,null as xxxxxxxx
,max(case when stathour='4' then cellid end) as cellid
,max(xxxx) as xxxx
,max(xxxxxx) as xxxxxx
,max(case when stathour='1' then cellid end) as earlyresident
,max(case when stathour='3' then cellid end) as nightresident
,max(case when stathour='4' then xxxxxxxxx end) as xxxxxxxxx
,max(case when stathour='4' then xxxxxxxxxxxx end) as xxxxxxxxxxxx
,max(case when stathour='4' then xxxxxxxxxxx end) as xxxxxxxxxxx
,max(case when stathour='4' then county end) as county
,null as xxx_nbrTcpConn2ndSucc
,null as xxx_nbrTcpConn1stReq
,null as xxx_nbrallWebReq
,null as xxx_nbrallWebResSucc
,null as xxx_shoppingxdrcnt
,null as xxx_shoppingxdrsucccnt
,null as xxx_shoppingxdrPoorCnt
,null as xxx_shoppingtcpDLRttGt200Delay
,null as xxx_shoppingtcpULRttGt200Delay
,null as xxx_gamexdrcnt
,null as xxx_gamexdrsucccnt
,null as xxx_gamexdrPoorCnt
,null as xxx_gametcpDLRttGt200Delay
,null as xxx_gametcpULRttGt200Delay
,null as xxx_avgVideoDlcnt
,null as xxx_VideoDl3Mbpscnt
,null as Call_Union_Req
,null as Call_Union_Suss
,null as call_k1
,null as Drop_Union_Cnt
from xxx_xxx.st_xxxxxxxxxxxx_day
where day=20231002 and rank=1
group by xxxxx
union all
select
1 as datatable
,xxxxx
,xxxxxxxx
,cgi as cellid
,xxxx
,xxxxxx
,null as earlyresident
,null as nightresident
,xxxxxxxxx
,xxxxxxxxxxxx
,xxxxxxxxxxx
,null as county
,xxx_nbrTcpConn2ndSucc as xxx_nbrTcpConn2ndSucc
,xxx_nbrTcpConn1stReq as xxx_nbrTcpConn1stReq
,xxx_nbrallWebReq as xxx_nbrallWebReq
,xxx_nbrallWebResSucc as xxx_nbrallWebResSucc
,case when substr(appunicode,1,2)='13' then xxx_xdrcnt end as xxx_shoppingxdrcnt
,case when substr(appunicode,1,2)='13' then xxx_xdrsucccnt end as xxx_shoppingxdrsucccnt
,case when substr(appunicode,1,2)='13' then xxx_xdrPoorCnt end as xxx_shoppingxdrPoorCnt
,case when substr(appunicode,1,2)='13' then xxx_tcpDLRttGt200Delay end as xxx_shoppingtcpDLRttGt200Delay
,case when substr(appunicode,1,2)='13' then xxx_tcpULRttGt200Delay end as xxx_shoppingtcpULRttGt200Delay
,case when substr(appunicode,1,2)='05' then xxx_xdrcnt end as xxx_gamexdrcnt
,case when substr(appunicode,1,2)='05' then xxx_xdrsucccnt end as xxx_gamexdrsucccnt
,case when substr(appunicode,1,2)='05' then xxx_xdrPoorCnt end as xxx_gamexdrPoorCnt
,case when substr(appunicode,1,2)='05' then xxx_tcpDLRttGt200Delay end as xxx_gametcpDLRttGt200Delay
,case when substr(appunicode,1,2)='05' then xxx_tcpULRttGt200Delay end as xxx_gametcpULRttGt200Delay
,case when substr(appunicode,1,2)='02' then xxx_xdrcnt end as xxx_avgVideoDlcnt
,case when substr(appunicode,1,2)='02' then xxx_VideoDl3Mbpscnt end as xxx_VideoDl3Mbpscnt
,null as Call_Union_Req
,null as Call_Union_Suss
,null as call_k1
,null as Drop_Union_Cnt
from xxx_xxx.xxxxxxx_day
where day=20231002
union all
select
0 as datatable
,xxxxx
,null as xxxxxxxx
,cellid
,null as xxxx
,null as xxxxxx
,null as earlyresident
,null as nightresident
,null as xxxxxxxxx
,null as xxxxxxxxxxxx
,null as xxxxxxxxxxx
,null as county
,null as xxx_nbrTcpConn2ndSucc
,null as xxx_nbrTcpConn1stReq
,null as xxx_nbrallWebReq
,null as xxx_nbrallWebResSucc
,null as xxx_shoppingxdrcnt
,null as xxx_shoppingxdrsucccnt
,null as xxx_shoppingxdrPoorCnt
,null as xxx_shoppingtcpDLRttGt200Delay
,null as xxx_shoppingtcpULRttGt200Delay
,null as xxx_gamexdrcnt
,null as xxx_gamexdrsucccnt
,null as xxx_gamexdrPoorCnt
,null as xxx_gametcpDLRttGt200Delay
,null as xxx_gametcpULRttGt200Delay
,null as xxx_avgVideoDlcnt
,null as xxx_VideoDl3Mbpscnt
,Call_Gm_Req as Call_Union_Req
,Call_Gm_Suss as Call_Union_Suss
,Call_Gm_Ring as call_k1
,Drop_Gm_Cnt as Drop_Union_Cnt
from xxx_xxx.xxxxxxxxxxxxxxxxxxxxxx_hour
where day=20231002
) da
where xxxxx is not null
group by
xxxxx
,cellid
) da
left join (select
service_nbr
,max(install_month) as inner_months
,max(ofr_name) as dinner_name
,max(case when ofr_5g_flag='1' then '5G' when ofr_5g_flag='0' then '4G' end) as dinner_type
,max(ofr_5g_flag) as is_5gdinner
,max(star_flag) as user_lv
,max(net_fee) as nearlthreemonth_arpu
,sum(nvl(cast(rsrp_count_5g as double),0)+nvl(cast(rsrp_count as double),0)) as xxx_kpimrcnt
,round(sum(nvl(cast(rsrp_count_5g as double)*(1-cast(mr_cover_xxxxxxxxe_110_5g as double)/100),0)+nvl(cast(rsrp_count as double)*(1-cast(mr_cover_xxxxxxxxe_110 as double)/100),0))) as xxx_kpimr110cnt
,round(round(sum(nvl(cast(rsrp_count_5g as double)*(1-cast(mr_cover_xxxxxxxxe_110_5g as double)/100),0)
+nvl(cast(rsrp_count as double)*(1-cast(mr_cover_xxxxxxxxe_110 as double)/100),0)))/sum(nvl(cast(rsrp_count_5g as double),0)+nvl(cast(rsrp_count as double),0)),4)*100 as xxx_kpimrweakcoverage_xxxxxxxxe
from xxx_xxx.xxxxxxxxxxxxxxxxxxxxxxx_day
where day=20231002 and net_num='1'
group by service_nbr) db on da.xxxxx=db.service_nbr
left join (select
xxxxx
,max(case when rank=1 then app_type end) as preference_app_type1
,max(case when rank=2 then app_type end) as preference_app_type2
,max(case when rank=3 then app_type end) as preference_app_type3
from(select
xxxxx
,app_type
,sum(xxx_xdrcnt) as xxx_xdrcnt
,row_number() over(partition by xxxxx order by sum(xxx_xdrcnt) desc) rank
from xxx_xxx.xxxxxxxxxxxxxxxxxxxxxxxx_hour
where day=20231002 and appunicode not in ('22006500','22999900')
group by xxxxx
,app_type) da where rank<=3
group by xxxxx) dp on da.xxxxx=dp.xxxxx
where datatable=1)
insert overwrite table xxx_xxx.st_userportrait_day partition(day=20231002,minute='0000',vprovince=200)
select /* +REPARTITION(2) */
cast(from_unixtime(unix_timestamp('20231002','yyyyMMdd')+28800) as timestamp) as timevalue
,xxxxxxxxx
,xxxxxxxxxx
,xxxxxxxxxxx
,xxxxxxxxxxxx
,fullresident
,earlyresident
,nightresident
,da.xxxxx
,xxxx
,xxxxxx
,da.xxxxxxx
,fact_id
,cat_one
,model_id
,terminalname
,mt_type
,null as software_version
,card_slot
,is_support_volte
,dcnr
,maxxxxxxxxx
,ismme5gxxxxxxxx
,iss1u5gxxxxxxxx
,is_5g_on_off
,preference_app_type1
,preference_app_type2
,preference_app_type3
,dinner_type
,is_5gdinner
,is_flow_limit
,user_lv
,inner_months
,nearlthreemonth_arpu
,complain_cnt
,thzl
,swzl
,cpyzf
,sfzqx
,yytfw
,rexian
,predict
,xxxxxx_cnt_totalattach_req
,xxxxxx_cnt_totalattach_succ
,xxxxxx_cnt_md_totalattach_req
,xxxxxx_cnt_md_totalattach_succ
,xxxxxx_cnt_md_totalattach_fail
,mdcellcnt
,nomdcellcnt
,xxxxxx_erbmodify_req
,xxxxxx_erbmodify_succ
,xxxxxx_erbmodify_err
,xxxxxxx_tranoctets
,xxxxxxx_5g_tranoctets
,xxxxxxx_4g_tranoctets
,xxxxxxx_webdlvalidoctets
,xxxxxxx_webdlvaliddelay
,xxxxxxx_5g_webdlvalidoctets
,xxxxxxx_5g_webdlvaliddelay
,xxxxxxx_4g_webdlvalidoctets
,xxxxxxx_4g_webdlvaliddelay
,xxx_tranoctets
,xxx_webdlvalidoctets
,xxx_webdlvaliddelay
,xxx_nbrTcpConn1stReq
,xxx_nbrTcpConn1stSucc
,xxx_tcpConn1stDelay
,xxx_nbrTcpConn2ndSucc
,xxx_tcpConn2ndDelay
,xxx_tcpConnDelay
,xxx_nbrallWebReq
,xxx_nbrallWebResSucc
,xxx_WebResDelay
,xxx_FirstScreenFincnt
,xxx_FirstScreenFinTime
,xxx_VideoBufferServCnt
,xxx_VideoServCnt
,xxx_VideoDlOctets
,xxx_VideoDlDuxxxxxxxxion
,xxx_videoThroughput_cnt
,xxx_videoThroughput
,xxx_videoBufferCnt
,xxx_VideoPlayTime
,xxx_InitBufferReq
,xxx_InitBufferSucc
,xxx_InitBufferDuxxxxxxxxion
,cast(null as double) as xxx_kpimrcnt
,cast(null as double) as xxx_kpimr110cnt
,cast(null as double) as xxx_kpimrweakcoverage_xxxxxxxxe
,cast(null as double) as xxx_coverage_score
,cast(null as double) as xxx_nbrtcpconn2ndsucc_cell
,cast(null as double) as xxx_nbrtcpconn1streq_cell
,cast(null as double) as xxx_tcpconnsuccxxxxxxxxio_cell
,cast(null as double) as xxx_tcpconnsucc_score
,cast(null as double) as xxx_nbrallwebreq_cell
,cast(null as double) as xxx_nbrallwebressucc_cell
,cast(null as double) as xxx_weballresxxxxxxxxio_cell
,cast(null as double) as xxx_websucc_core
,cast(null as double) as xxx_shoppingxdrcnt
,cast(null as double) as xxx_shoppingxdrsucccnt
,cast(null as double) as xxx_shoppingxdrpoorcnt
,cast(null as double) as xxx_shoppingtcpdlrttgt200delay
,cast(null as double) as xxx_shoppingtcpulrttgt200delay
,cast(null as double) as xxx_shoppingtcprtt200_xxxxxxxxe
,cast(null as double) as xxx_shopping_score
,cast(null as double) as xxx_gamexdrcnt
,cast(null as double) as xxx_gamexdrsucccnt
,cast(null as double) as xxx_gamexdrpoorcnt
,cast(null as double) as xxx_gametcpdlrttgt200delay
,cast(null as double) as xxx_gametcpulrttgt200delay
,cast(null as double) as xxx_gametcprtt200_xxxxxxxxe
,cast(null as double) as xxx_game_score
,cast(null as double) as xxx_avgvideodlcnt
,cast(null as double) as xxx_videodl3mbpscnt
,cast(null as double) as xxx_videodl3mbps_xxxxxxxxe
,cast(null as double) as xxx_video_score
,cast(null as double) as call_union_req
,cast(null as double) as call_union_suss
,cast(null as double) as call_union_xxxxxxxxe
,cast(null as double) as volte_successxxxxxxxxe_score
,cast(null as double) as call_k1
,cast(null as double) as drop_union_cnt
,cast(null as double) as drop_union_xxxxxxxxe
,cast(null as double) as volte_dropxxxxxxxxe_score
,cast(null as double) as total_perceived_score
,cast(null as double) as perceived_results
,cast(null as double) as users_cell
,cast(null as double) as user_lv45_cell
,cast(null as double) as dinner_name
,cast(null as double) as user_tag
,cast(null as double) as terminallist_flag
,cast(null as double) as imsflag
,cast(null as double) as lte_flux
,cast(null as double) as g5_under_lte_flux
,cast(null as double) as g5_flux
from (select
max(xxxxxxxxx) as xxxxxxxxx
,max(xxxxxxxxxx) as xxxxxxxxxx
,max(xxxxxxxxxxx) as xxxxxxxxxxx
,max(xxxxxxxxxxxx) as xxxxxxxxxxxx
,max(fullresident) as fullresident
,max(earlyresident) as earlyresident
,max(nightresident) as nightresident
,xxxxx
,max(xxxx) as xxxx
,max(xxxxxx) as xxxxxx
,max(xxxxxxx) as xxxxxxx
,max(fact_id) as fact_id
,max(cat_one) as cat_one
,max(model_id) as model_id
,max(terminalname) as terminalname
,max(mt_type) as mt_type
,max(software_version) as software_version
,max(card_slot) as card_slot
,max(is_support_volte) as is_support_volte
,max(case when mt_type='5' then dcnr end) as dcnr
,max(cast(maxxxxxxxxx as bigint)) as maxxxxxxxxx
,max(ismme5gxxxxxxxx) as ismme5gxxxxxxxx
,max(iss1u5gxxxxxxxx) as iss1u5gxxxxxxxx
,max(is_5g_on_off) as is_5g_on_off
,max(preference_app_type1) as preference_app_type1
,max(preference_app_type2) as preference_app_type2
,max(preference_app_type3) as preference_app_type3
,max(dinner_type) as dinner_type
,max(is_5gdinner) as is_5gdinner
,max(case when is_flow_limit is not null and is_flow_limit<>'' then cast(is_flow_limit as string) else '0' end) as is_flow_limit
,max(user_lv) as user_lv
,max(cast(inner_months as double)) as inner_months
,max(cast(nearlthreemonth_arpu as double)) as nearlthreemonth_arpu
,max(complain_cnt) as complain_cnt
,sum(thzl) as thzl
,sum(swzl) as swzl
,sum(cpyzf) as cpyzf
,sum(sfzqx) as sfzqx
,sum(yytfw) as yytfw
,sum(rexian) as rexian
,sum(predict) as predict
,sum(xxxxxx_cnt_totalattach_req) as xxxxxx_cnt_totalattach_req
,sum(xxxxxx_cnt_totalattach_succ) as xxxxxx_cnt_totalattach_succ
,sum(xxxxxx_cnt_md_totalattach_req) as xxxxxx_cnt_md_totalattach_req
,sum(xxxxxx_cnt_md_totalattach_succ) as xxxxxx_cnt_md_totalattach_succ
,sum(xxxxxx_cnt_md_totalattach_fail) as xxxxxx_cnt_md_totalattach_fail
,sum(mdcellcnt) as mdcellcnt
,sum(nomdcellcnt) as nomdcellcnt
,sum(xxxxxx_erbmodify_req) as xxxxxx_erbmodify_req
,sum(xxxxxx_erbmodify_succ) as xxxxxx_erbmodify_succ
,sum(xxxxxx_erbmodify_err) as xxxxxx_erbmodify_err
,sum(xxxxxxx_tranoctets) as xxxxxxx_tranoctets
,sum(xxxxxxx_5g_tranoctets) as xxxxxxx_5g_tranoctets
,sum(xxxxxxx_4g_tranoctets) as xxxxxxx_4g_tranoctets
,sum(xxxxxxx_webdlvalidoctets) as xxxxxxx_webdlvalidoctets
,sum(xxxxxxx_webdlvaliddelay) as xxxxxxx_webdlvaliddelay
,sum(xxxxxxx_5g_webdlvalidoctets) as xxxxxxx_5g_webdlvalidoctets
,sum(xxxxxxx_5g_webdlvaliddelay) as xxxxxxx_5g_webdlvaliddelay
,sum(xxxxxxx_4g_webdlvalidoctets) as xxxxxxx_4g_webdlvalidoctets
,sum(xxxxxxx_4g_webdlvaliddelay) as xxxxxxx_4g_webdlvaliddelay
,sum(xxx_tranoctets) as xxx_tranoctets
,sum(xxx_webdlvalidoctets) as xxx_webdlvalidoctets
,sum(xxx_webdlvaliddelay) as xxx_webdlvaliddelay
,sum(xxx_nbrTcpConn1stReq) as xxx_nbrTcpConn1stReq
,sum(xxx_nbrTcpConn1stSucc) as xxx_nbrTcpConn1stSucc
,sum(xxx_tcpConn1stDelay) as xxx_tcpConn1stDelay
,sum(xxx_nbrTcpConn2ndSucc) as xxx_nbrTcpConn2ndSucc
,sum(xxx_tcpConn2ndDelay) as xxx_tcpConn2ndDelay
,sum(xxx_tcpConnDelay) as xxx_tcpConnDelay
,sum(xxx_nbrallWebReq) as xxx_nbrallWebReq
,sum(xxx_nbrallWebResSucc) as xxx_nbrallWebResSucc
,sum(xxx_WebResDelay) as xxx_WebResDelay
,sum(xxx_FirstScreenFincnt) as xxx_FirstScreenFincnt
,sum(xxx_FirstScreenFinTime) as xxx_FirstScreenFinTime
,sum(xxx_VideoBufferServCnt) as xxx_VideoBufferServCnt
,sum(xxx_VideoServCnt) as xxx_VideoServCnt
,sum(xxx_VideoDlOctets) as xxx_VideoDlOctets
,sum(xxx_VideoDlDuxxxxxxxxion) as xxx_VideoDlDuxxxxxxxxion
,sum(xxx_videoThroughput_cnt) as xxx_videoThroughput_cnt
,sum(xxx_videoThroughput) as xxx_videoThroughput
,sum(xxx_videoBufferCnt) as xxx_videoBufferCnt
,sum(xxx_VideoPlayTime) as xxx_VideoPlayTime
,sum(xxx_InitBufferReq) as xxx_InitBufferReq
,sum(xxx_InitBufferSucc) as xxx_InitBufferSucc
,sum(xxx_InitBufferDuxxxxxxxxion) as xxx_InitBufferDuxxxxxxxxion
from (
select
null as xxxxxxxxx
,null as xxxxxxxxxxx
,null as xxxxxxxxxxxx
,null as xxxxxxxxxx
,null as earlyresident
,null as nightresident
,null as fullresident
,dcnr
,case when mt_type='4' then '6' else maxxxxxxxxx end as maxxxxxxxxx
,ismme5gxxxxxxxx
,iss1u5gxxxxxxxx
,case when mt_type='4' then '0' else is_5g_on_off end as is_5g_on_off
,xxxxx
,xxxx
,xxxxxx
,xxxxxxx
,fact_id
,cat_one
,model_id
,terminalname
,mt_type
,software_version
,card_slot
,is_support_volte
,null as dinner_name
,null as dinner_type
,null as is_5gdinner
,is_flow_limit
,null as inner_months
,null as user_lv
,null as nearlthreemonth_arpu
,null as complain_cnt
,null as preference_app_type1
,null as preference_app_type2
,null as preference_app_type3
,thzl
,swzl
,cpyzf
,sfzqx
,yytfw
,rexian
,predict
,xxxxxx_cnt_totalattach_req
,xxxxxx_cnt_totalattach_succ
,xxxxxx_cnt_md_totalattach_req
,xxxxxx_cnt_md_totalattach_succ
,xxxxxx_cnt_md_totalattach_fail
,mdcellcnt
,nomdcellcnt
,xxxxxx_erbmodify_req
,xxxxxx_erbmodify_succ
,xxxxxx_erbmodify_err
,xxxxxxx_tranoctets
,xxxxxxx_5g_tranoctets
,xxxxxxx_4g_tranoctets
,xxxxxxx_webdlvalidoctets
,xxxxxxx_webdlvaliddelay
,xxxxxxx_5g_webdlvalidoctets
,xxxxxxx_5g_webdlvaliddelay
,xxxxxxx_4g_webdlvalidoctets
,xxxxxxx_4g_webdlvaliddelay
,xxx_tranoctets
,xxx_webdlvalidoctets
,xxx_webdlvaliddelay
,xxx_nbrTcpConn1stReq
,xxx_nbrTcpConn1stSucc
,xxx_tcpConn1stDelay
,xxx_nbrTcpConn2ndSucc
,xxx_tcpConn2ndDelay
,xxx_tcpConnDelay
,xxx_nbrallWebReq
,xxx_nbrallWebResSucc
,xxx_WebResDelay
,xxx_FirstScreenFincnt
,xxx_FirstScreenFinTime
,xxx_VideoBufferServCnt
,xxx_VideoServCnt
,xxx_VideoDlOctets
,xxx_VideoDlDuxxxxxxxxion
,xxx_videoThroughput_cnt
,xxx_videoThroughput
,xxx_videoBufferCnt
,xxx_VideoPlayTime
,xxx_InitBufferReq
,xxx_InitBufferSucc
,xxx_InitBufferDuxxxxxxxxion
from st_xxxxxxxxxxxxxxx_day
union all
select
xxxxxxxxx
,xxxxxxxxxxx
,xxxxxxxxxxxx
,xxxxxxxxxx
,earlyresident
,nightresident
,cellid as fullresident
,null as dcnr
,null as maxxxxxxxxx
,null as ismme5gxxxxxxxx
,null as iss1u5gxxxxxxxx
,null as is_5g_on_off
,xxxxx
,xxxx
,null as xxxxxx
,null as xxxxxxx
,null as fact_id
,null as cat_one
,null as model_id
,null as terminalname
,null as mt_type
,null as software_version
,null as card_slot
,null as is_support_volte
,dinner_name
,dinner_type
,is_5gdinner
,null as is_flow_limit
,inner_months
,user_lv
,nearlthreemonth_arpu
,complain_cnt
,preference_app_type1
,preference_app_type2
,preference_app_type3
,null as thzl
,null as swzl
,null as cpyzf
,null as sfzqx
,null as yytfw
,null as rexian
,null as predict
,null as xxxxxx_cnt_totalattach_req
,null as xxxxxx_cnt_totalattach_succ
,null as xxxxxx_cnt_md_totalattach_req
,null as xxxxxx_cnt_md_totalattach_succ
,null as xxxxxx_cnt_md_totalattach_fail
,null as mdcellcnt
,null as nomdcellcnt
,null as xxxxxx_erbmodify_req
,null as xxxxxx_erbmodify_succ
,null as xxxxxx_erbmodify_err
,null as xxxxxxx_tranoctets
,null as xxxxxxx_5g_tranoctets
,null as xxxxxxx_4g_tranoctets
,null as xxxxxxx_webdlvalidoctets
,null as xxxxxxx_webdlvaliddelay
,null as xxxxxxx_5g_webdlvalidoctets
,null as xxxxxxx_5g_webdlvaliddelay
,null as xxxxxxx_4g_webdlvalidoctets
,null as xxxxxxx_4g_webdlvaliddelay
,null as xxx_tranoctets
,null as xxx_webdlvalidoctets
,null as xxx_webdlvaliddelay
,null as xxx_nbrTcpConn1stReq
,null as xxx_nbrTcpConn1stSucc
,null as xxx_tcpConn1stDelay
,null as xxx_nbrTcpConn2ndSucc
,null as xxx_tcpConn2ndDelay
,null as xxx_tcpConnDelay
,null as xxx_nbrallWebReq
,null as xxx_nbrallWebResSucc
,null as xxx_WebResDelay
,null as xxx_FirstScreenFincnt
,null as xxx_FirstScreenFinTime
,null as xxx_VideoBufferServCnt
,null as xxx_VideoServCnt
,null as xxx_VideoDlOctets
,null as xxx_VideoDlDuxxxxxxxxion
,null as xxx_videoThroughput_cnt
,null as xxx_videoThroughput
,null as xxx_videoBufferCnt
,null as xxx_VideoPlayTime
,null as xxx_InitBufferReq
,null as xxx_InitBufferSucc
,null as xxx_InitBufferDuxxxxxxxxion
from xxxx_day
) da group by xxxxx) da
left join xxx_xxx.cfg_xxxxx dc on da.xxxxxxx=dc.xxxxxxx
left join xxx_xxx.cfg_xxxxxxl_core dt on da.fullresident=dt.cgi

Compile failure

When I use the rustc version of 1.73.0 , the build of cargo build failed.

error[E0554]: `#![feature]` may not be used on the stable release channel
   --> /Volumes/alex-disk/.cargo/registry/src/index.crates.io-6f17d22bba15001f/packed_simd_2-0.3.8/src/lib.rs:214:1
    |
214 | / #![feature(
215 | |     adt_const_params,
216 | |     repr_simd,
217 | |     rustc_attrs,
...   |
224 | |     custom_inner_attributes,
225 | | )]
    | |__^

   Compiling lexical-write-integer v0.8.5
   Compiling regex v1.9.5
   Compiling arrow-data v45.0.0 (https://github.com/blaze-init/arrow-rs.git?rev=35b6fded9#35b6fded)
error[E0554]: `#![feature]` may not be used on the stable release channel
   --> /Volumes/alex-disk/.cargo/registry/src/index.crates.io-6f17d22bba15001f/packed_simd_2-0.3.8/src/lib.rs:219:5
    |
219 |     stdsimd,
    |     ^^^^^^^

error[E0554]: `#![feature]` may not be used on the stable release channel
   --> /Volumes/alex-disk/.cargo/registry/src/index.crates.io-6f17d22bba15001f/packed_simd_2-0.3.8/src/lib.rs:222:5
    |
222 |     core_intrinsics,
    |     ^^^^^^^^^^^^^^^
error: unrecognized platform-specific intrinsic function: `simd_shuffle2`
  --> /Volumes/alex-disk/.cargo/registry/src/index.crates.io-6f17d22bba15001f/packed_simd_2-0.3.8/src/codegen/llvm.rs:10:5
   |
10 |     pub fn simd_shuffle2<T, U>(x: T, y: T, idx: [u32; 2]) -> U;
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

error: unrecognized platform-specific intrinsic function: `simd_shuffle4`
  --> /Volumes/alex-disk/.cargo/registry/src/index.crates.io-6f17d22bba15001f/packed_simd_2-0.3.8/src/codegen/llvm.rs:11:5
   |
11 |     pub fn simd_shuffle4<T, U>(x: T, y: T, idx: [u32; 4]) -> U;
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

error: unrecognized platform-specific intrinsic function: `simd_shuffle8`
  --> /Volumes/alex-disk/.cargo/registry/src/index.crates.io-6f17d22bba15001f/packed_simd_2-0.3.8/src/codegen/llvm.rs:12:5
   |
12 |     pub fn simd_shuffle8<T, U>(x: T, y: T, idx: [u32; 8]) -> U;
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

error: unrecognized platform-specific intrinsic function: `simd_shuffle16`
  --> /Volumes/alex-disk/.cargo/registry/src/index.crates.io-6f17d22bba15001f/packed_simd_2-0.3.8/src/codegen/llvm.rs:13:5
   |
13 |     pub fn simd_shuffle16<T, U>(x: T, y: T, idx: [u32; 16]) -> U;
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

error: unrecognized platform-specific intrinsic function: `simd_shuffle32`
  --> /Volumes/alex-disk/.cargo/registry/src/index.crates.io-6f17d22bba15001f/packed_simd_2-0.3.8/src/codegen/llvm.rs:14:5
   |
14 |     pub fn simd_shuffle32<T, U>(x: T, y: T, idx: [u32; 32]) -> U;
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

error: unrecognized platform-specific intrinsic function: `simd_shuffle64`
  --> /Volumes/alex-disk/.cargo/registry/src/index.crates.io-6f17d22bba15001f/packed_simd_2-0.3.8/src/codegen/llvm.rs:15:5
   |
15 |     pub fn simd_shuffle64<T, U>(x: T, y: T, idx: [u32; 64]) -> U;
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Evaluate Profile-Guided Optimization (PGO) and LLVM BOLT

Hi!

Recently I checked Profile-Guided Optimization (PGO) improvements on multiple projects. The results are here. According to the multiple tests, PGO can help with improving performance in many cases. That's why I think trying to optimize the Blaze (its Rust part) with PGO can be a good idea.

I can suggest the following action points:

  • Perform PGO benchmarks on Blaze. And if it shows improvements - add a note about possible improvements in Blaze performance with PGO.
  • Providing an easier way (e.g. a build option) to build scripts with PGO can be helpful for the end-users and maintainers since they will be able to optimize Blaze according to their own workloads.
  • Optimize pre-built binaries

Since it's the library, I can suggest the following way to optimize with PGO:

  • Prepare a binary with the Blaze's Rust part (don't know how hard is to implement this for Blaze)
  • Compile this binary with cargo-pgo (the link is below)
  • Run this binary on a sample workload
  • Collect all profiles and recompile the library once again with the collected profiles
  • Benchmark usual vs PGO-optimized builds

Maybe testing Post-Link Optimization techniques (like LLVM BOLT) would be interesting too (Clang and Rustc already use BOLT as an addition to PGO) but I recommend starting from the usual PGO.

For the Rust projects, I recommend starting experimenting with PGO with cargo-pgo.

Here are some examples of how PGO optimization is integrated in other projects:

supports spark.files.ignoreCorruptFiles=true

Is your feature request related to a problem? Please describe.
spark provided such configuration to skip corrupted files in data scanning, which should be also implemented in blaze.

Describe the solution you'd like

Describe alternatives you've considered

Additional context

[BUG] TPCH test with exception: Filesystem closed

Describe the bug
I use blaze for TPCH testing, the execution is abnormal, is TPCH testing not supported now? Thank you

**Error log information **

3/10/11 16:01:41 INFO Executor: Finished task 5.0 in stage 5.0 (TID 421). 3033 bytes result sent to driver
23/10/11 16:01:41 ERROR TaskContextImpl: Error in TaskCompletionListener
java.lang.RuntimeException: native executing [partition=3] panics: Execution error: Execution error: output_with_sender[Filter] error: Execution error: output_with_sender[ParquetScan] error: Execution error: output_with_sender[ParquetScan]: output() returns error: Parquet error: External: External error: Java exception thrown at native-engine/datafusion-ext-commons/src/hadoop_fs.rs:77: Filesystem closed
23/10/11 16:01:41 ERROR Executor: Exception in task 9.0 in stage 5.0 (TID 425)
java.lang.RuntimeException: native executing [partition=9] panics: Execution error: Execution error: output_with_sender[Filter] error: Execution error: output_with_sender[ParquetScan] error: Execution error: output_with_sender[ParquetScan]: output() returns error: Arrow error: External error: External: External error: Java exception thrown at native-engine/datafusion-ext-commons/src/hadoop_fs.rs:77: Filesystem closed
23/10/11 16:01:41 ERROR Executor: Exception in task 3.0 in stage 5.0 (TID 419)
org.apache.spark.util.TaskCompletionListenerException: native executing [partition=3] panics: Execution error: Execution error: output_with_sender[Filter] error: Execution error: output_with_sender[ParquetScan] error: Execution error: output_with_sender[ParquetScan]: output() returns error: Parquet error: External: External error: Java exception thrown at native-engine/datafusion-ext-commons/src/hadoop_fs.rs:77: Filesystem closed
	at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:206)
	at org.apache.spark.TaskContextImpl.invokeTaskCompletionListeners(TaskContextImpl.scala:143)
	at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:136)
	at org.apache.spark.scheduler.Task.run(Task.scala:152)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
23/10/11 16:01:41 INFO CoarseGrainedExecutorBackend: Got assigned task 446
23/10/11 16:01:41 INFO Executor: Running task 9.1 in stage 5.0 (TID 446)
23/10/11 16:01:41 INFO CoarseGrainedExecutorBackend: Got assigned task 447
23/10/11 16:01:41 INFO Executor: Running task 3.1 in stage 5.0 (TID 447)
23/10/11 16:01:41 INFO BlazeCallNativeWrapper: Start executing native plan
23/10/11 16:01:41 INFO BlazeCallNativeWrapper: Start executing native plan
23/10/11 16:01:41 INFO Executor: Finished task 3.1 in stage 5.0 (TID 447). 3033 bytes result sent to driver
23/10/11 16:01:41 ERROR Executor: Exception in task 9.1 in stage 5.0 (TID 446)
java.lang.RuntimeException: native executing [partition=9] panics: Execution error: Execution error: output_with_sender[Filter] error: Execution error: output_with_sender[ParquetScan] error: Execution error: output_with_sender[ParquetScan]: output() returns error: Arrow error: External error: External: External error: Java exception thrown at native-engine/datafusion-ext-commons/src/hadoop_fs.rs:77: Filesystem closed
23/10/11 16:01:41 INFO ShuffleBlockFetcherIterator: Getting 2000 (517.0 KiB) non-empty blocks including 0 (0.0 B) local and 2000 (517.0 KiB) host-local and 0 (0.0 B) push-merged-local and 0 (0.0 B) remote blocks
23/10/11 16:01:41 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 70 ms
23/10/11 16:01:41 INFO CoarseGrainedExecutorBackend: Got assigned task 451
23/10/11 16:01:41 INFO Executor: Running task 17.1 in stage 5.0 (TID 451)
23/10/11 16:01:41 INFO BlazeCallNativeWrapper: Start executing native plan
23/10/11 16:01:41 INFO Executor: Finished task 17.1 in stage 5.0 (TID 451). 3033 bytes result sent to driver
23/10/11 16:01:42 INFO Executor: Finished task 4.0 in stage 7.0 (TID 440). 381619 bytes result sent to driver

To Reproduce

My CPU information๏ผšIntel(R) Xeon(R) Gold 6130 CPU
software: Spark 3.3.3 Hive:3.1.2 Hadoop 3.2.0
Spark Conf is the base configuration๏ผ
DataFormat๏ผšParquet

output_with_sender[Shuffle] error

environment๏ผš
cdp:7.x
spark:3.3.3
Occasionally reporting an error today.
error message๏ผš

Error: org.apache.kyuubi.KyuubiSQLException: org.apache.kyuubi.KyuubiSQLException: Error operating ExecuteStatement: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 468.0 failed 4 times, most recent failure: Lost task 3.3 in stage 468.0 (TID 130034) (node76 executor 169): java.lang.RuntimeException: native executing [partition=3] panics: Execution error: Execution error: output_with_sender[Shuffle] error: Execution error: output_with_sender[Project] error: Execution error: output_with_sender[ParquetScan] error: Execution error: output_with_sender[ParquetScan]: output() returns error: Parquet error: External: Parquet error: External: External error: Java exception thrown at native-engine/datafusion-ext-commons/src/hadoop_fs.rs:134: Couldn't create proxy provider class org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
	
	Driver stacktrace:
		at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2668)
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2604)
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2603)
		at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
		at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
		at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
		at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2603)
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1178)
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1178)
		at scala.Option.foreach(Option.scala:407)
		at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1178)
		at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2856)
		at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2798)
		at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2787)
		at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	Caused by: java.lang.RuntimeException: native executing [partition=3] panics: Execution error: Execution error: output_with_sender[Shuffle] error: Execution error: output_with_sender[Project] error: Execution error: output_with_sender[ParquetScan] error: Execution error: output_with_sender[ParquetScan]: output() returns error: Parquet error: External: Parquet error: External: External error: Java exception thrown at native-engine/datafusion-ext-commons/src/hadoop_fs.rs:134: Couldn't create proxy provider class org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
	
		at org.apache.kyuubi.KyuubiSQLException$.apply(KyuubiSQLException.scala:69)
		at org.apache.kyuubi.engine.spark.operation.SparkOperation$$anonfun$onError$1.$anonfun$applyOrElse$1(SparkOperation.scala:189)
		at org.apache.kyuubi.Utils$.withLockRequired(Utils.scala:395)
		at org.apache.kyuubi.operation.AbstractOperation.withLockRequired(AbstractOperation.scala:51)
		at org.apache.kyuubi.engine.spark.operation.SparkOperation$$anonfun$onError$1.applyOrElse(SparkOperation.scala:177)
		at org.apache.kyuubi.engine.spark.operation.SparkOperation$$anonfun$onError$1.applyOrElse(SparkOperation.scala:172)
		at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
		at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.$anonfun$executeStatement$1(ExecuteStatement.scala:88)
		at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
		at org.apache.kyuubi.engine.spark.operation.SparkOperation.$anonfun$withLocalProperties$1(SparkOperation.scala:155)
		at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
		at org.apache.kyuubi.engine.spark.operation.SparkOperation.withLocalProperties(SparkOperation.scala:139)
		at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.executeStatement(ExecuteStatement.scala:78)
		at org.apache.kyuubi.engine.spark.operation.ExecuteStatement$$anon$1.run(ExecuteStatement.scala:100)
		at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
		at java.util.concurrent.FutureTask.run(FutureTask.java:266)
		at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
		at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
		at java.lang.Thread.run(Thread.java:748)
	Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 468.0 failed 4 times, most recent failure: Lost task 3.3 in stage 468.0 (TID 130034) (node76 executor 169): java.lang.RuntimeException: native executing [partition=3] panics: Execution error: Execution error: output_with_sender[Shuffle] error: Execution error: output_with_sender[Project] error: Execution error: output_with_sender[ParquetScan] error: Execution error: output_with_sender[ParquetScan]: output() returns error: Parquet error: External: Parquet error: External: External error: Java exception thrown at native-engine/datafusion-ext-commons/src/hadoop_fs.rs:134: Couldn't create proxy provider class org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
	
	Driver stacktrace:
		at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2668)
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2604)
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2603)
		at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
		at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
		at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
		at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2603)
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1178)
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1178)
		at scala.Option.foreach(Option.scala:407)
		at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1178)
		at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2856)
		at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2798)
		at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2787)
		at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	Caused by: java.lang.RuntimeException: native executing [partition=3] panics: Execution error: Execution error: output_with_sender[Shuffle] error: Execution error: output_with_sender[Project] error: Execution error: output_with_sender[ParquetScan] error: Execution error: output_with_sender[ParquetScan]: output() returns error: Parquet error: External: Parquet error: External: External error: Java exception thrown at native-engine/datafusion-ext-commons/src/hadoop_fs.rs:134: Couldn't create proxy provider class org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
	
		at org.apache.kyuubi.KyuubiSQLException$.apply(KyuubiSQLException.scala:69)
		at org.apache.kyuubi.operation.ExecuteStatement.waitStatementComplete(ExecuteStatement.scala:129)
		at org.apache.kyuubi.operation.ExecuteStatement.$anonfun$runInternal$1(ExecuteStatement.scala:161)
		at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
		at java.util.concurrent.FutureTask.run(FutureTask.java:266)
		at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
		at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
		at java.lang.Thread.run(Thread.java:748) (state=,code=0)

Support sort-merge join skew optimization

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

currently, blaze hangs on TPC-DS q40 case (1TB) because of a severely skewed join (1,463,474,467 join 5 records). spark supports an option skew=true in SMJ which blaze does not.
without this feature, we should not convert skewed SMJ to native. otherwise the joining is very slow or even hangs.

Describe the solution you'd like
understand how spark performs skew join optimization. implement the same logic in blaze.

Describe alternatives you've considered

Additional context

Spark 3.2 Support

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

I would like to run blaze on Spark 3.2

Describe the solution you'd like

Building blaze against 3.2, or having an option to choose a Spark version between 3.0 and 3.2

Describe alternatives you've considered

Remaining on 3.0

Additional context

I found this while benchmarking some SQL queries, I had to make sure that I use 3.0, but my local stack runs on 3.2

VerifyError exception

Describe the bug
I got the verify error exception when I run spark 3.2.1

To Reproduce

  1. Spark 3.2.1
  2. submit job with yarn client mode
  3. run tpcds sql with spark-sql-perf tool

Expected behavior
run tpcds successfully.

Additional context
log as blow

2022-10-06 17:25:18,329 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@13bdf540{/static/sql,null,AVAILABLE,@spark}
Exception in thread "main" java.lang.VerifyError: class org.apache.spark.sql.execution.blaze.plan.ArrowBroadcastExchangeExec overrides final method withNewChildren.(Lscala/collection/Seq;)Lorg/apache/spark/sql/catalyst/trees/TreeNode;
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:473)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at org.apache.spark.sql.blaze.BlazeConvertStrategy$.$anonfun$apply$4(BlazeConvertStrategy.scala:95)
at org.apache.spark.sql.blaze.BlazeConvertStrategy$.$anonfun$apply$4$adapted(BlazeConvertStrategy.scala:94)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreach(TreeNode.scala:253)
at org.apache.spark.sql.blaze.BlazeConvertStrategy$.apply(BlazeConvertStrategy.scala:94)
at org.apache.spark.sql.blaze.BlazeColumnarOverrides.org$apache$spark$sql$blaze$BlazeColumnarOverrides$$$anonfun$preColumnarTransitions$1(BlazeSparkSessionExtension.scala:43)
at org.apache.spark.sql.blaze.BlazeColumnarOverrides$$anonfun$preColumnarTransitions$3.apply(BlazeSparkSessionExtension.scala:42)
at org.apache.spark.sql.blaze.BlazeColumnarOverrides$$anonfun$preColumnarTransitions$3.apply(BlazeSparkSessionExtension.scala:42)
at org.apache.spark.sql.execution.ApplyColumnarRulesAndInsertTransitions.$anonfun$apply$1(Columnar.scala:545)
at org.apache.spark.sql.execution.ApplyColumnarRulesAndInsertTransitions.$anonfun$apply$1$adapted(Columnar.scala:544)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.sql.execution.ApplyColumnarRulesAndInsertTransitions.apply(Columnar.scala:544)
at org.apache.spark.sql.execution.ApplyColumnarRulesAndInsertTransitions.apply(Columnar.scala:505)
at org.apache.spark.sql.execution.QueryExecution$.$anonfun$prepareForExecution$1(QueryExecution.scala:449)
at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:91)
at org.apache.spark.sql.execution.QueryExecution$.prepareForExecution(QueryExecution.scala:448)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$2(QueryExecution.scala:170)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:196)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:196)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:170)
at org.apache.spark.sql.execution.QueryExecution.withCteMap(QueryExecution.scala:73)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:163)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:163)
at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:214)
at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:259)
at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:228)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:98)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
at org.apache.spark.sql.Dataset.(Dataset.scala:219)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)
at com.databricks.spark.sql.perf.tpcds.TPCDS_Bench_RunAllQuery$.main(TPCDS_Bench_RunAllQuery.scala:25)
at com.databricks.spark.sql.perf.tpcds.TPCDS_Bench_RunAllQuery.main(TPCDS_Bench_RunAllQuery.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
2022-10-06 17:25:20,207 INFO spark.SparkContext: Invoking stop() from shutdown hook

Adding TPC-DS benchmarking code

Hey,

I am trying the run benchmarking locally, and its a huge effort to write all the TPC-DS queries and relevant spark code and analysis of performance.

Can you please add to the repo the benchmarking code?

Thanks!

implement native BroadcastNestedLoopJoin

Is your feature request related to a problem? Please describe.
some queries uses BroadcastNestedLoopJoin for cartesian joining, which is not yet supported in blaze and cause falling back to spark.

Describe the solution you'd like
implement NativeBroadcastNestedLoopJoinExec which transforms spark's operator to datafusion's NestedLoopJoinExec.

Describe alternatives you've considered

Additional context

Consider falling back to Spark on unsupported data sources

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

It appears that when Blaze encounters a data source that's not yet supported, it panics. An example is the Databricks file system (dbfs), which is interpreted as an object store. I haven't yet tested JDBC sources.

Describe the solution you'd like

If Blaze encounters an unsupported object store or other data source, it should defer back to Spark reading the data.
I appreaciate that this might not result in good performance gains on some workloads, but it at least prevents the entire query from failing.

Describe alternatives you've considered

Supporting custom data sources directly, which will take time. I have a DataFusion RDBMS WIP that I'd be able to extend Blaze with, but it's still far away from being generally usable.

Additional context

I was trying out some TPC-H queries with Blaze, so i'm opening issues for things that I observed.

Support more file format

Is your feature request related to a problem? Please describe.
Blaze only support parquet file format so far, and it is cusotmerize, but in fact datafusion have implement parquet source

Describe the solution you'd like
Can we use datafusion reader interface? I think it more easier to extend, btw datafusion have provided multiple reader so far

Supports apache-celeborn

Is your feature request related to a problem? Please describe.
apache-celeborn is widely used as an RSS (remote shuffle service) for spark. currently blaze has supported an internal RSS implementation from Kuaishou.Inc, however celeborn is not yet supported.

Describe the solution you'd like
integrate blaze with celeborn, this should be done by extending rss-xxx-repartitioner.rs. furthermore, we can support an individual rss shim module for supporting different RSS implementations.

Describe alternatives you've considered

Additional context

Implement native broadcast-join

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

In recent profiling we find that BHJ has become a bottleneck to achieve better performance. a BHJ takes twice columnar <-> row conversions which severely slow down the execution.

Describe the solution you'd like

since implementing the whole broadcast logic is complicated, we can first implement a native BHJ which only consumes a native streamed input. the broadcasted input can still executing on JVM because it should always be small.

Describe alternatives you've considered

Additional context

can the parquet reader read local data files ?

I am trying to let blaze run some of my UTs, with a plan like this:

input {
  filter {
    input {
      parquet_scan {
        base_conf {
          num_partitions: 1
          file_group {
            files {
              path: "file:/var/folders/lg/c5s_ql996m55ktdwp3y63r7w0000gn/T/hive5923654099953933335/default.db/table/data/00001-1-32c4054e-5a89-4a21-9f8d-6c8901318f18-00001.parquet"
              size: 813
              range {
                start: 4
                end: 817
              }
            }
          }
          schema {
            columns {
              name: "id"
              arrow_type {
                INT64 {
                }
              }
              nullable: true
            }
            columns {
              name: "data"
              arrow_type {
                UTF8 {
                }
              }
              nullable: true
            }
            columns {
              name: "float"
              arrow_type {
                FLOAT32 {
                }
              }
              nullable: true
            }
          }
          projection: 0
          projection: 1
          statistics {
          }
          partition_schema {
          }
        }
        pruning_predicates {
          binary_expr {
            l {
              is_not_null_expr {
                expr {
                  column {
                    name: "id"
                  }
                }
              }
            }
            r {
              binary_expr {
                l {
                  column {
                    name: "id"
                  }
                }
                r {
                  literal {
                    int64_value: 2
                  }
                }
                op: "Eq"
              }
            }
            op: "And"
          }
        }
        fsResourceId: "NativeParquetScanExec:eee3064d-ce28-40f3-9477-118475c03c34"
      }
    }
    expr {
      is_not_null_expr {
        expr {
          column {
            name: "id"
          }
        }
      }
    }
    expr {
      binary_expr {
        l {
          column {
            name: "id"
          }
        }
        r {
          literal {
            int64_value: 2
          }
        }
        op: "Eq"
      }
    }
  }
}
expr {
  column {
    name: "data"
  }
}
expr_name: "data"

is complaining with :

        java.lang.RuntimeException: native executing [partition=0] panics: Execution error: Execution error: output_with_sender[ParquetScan] error: Execution error: output_with_sender[ParquetScan]: output() returns error: Parquet error: Parquet error: Invalid Parquet file. Corrupt footer
        

release version v2.0.7

Features

  • Supports native BroadcastNestedLoopJoinExec.
  • Supports multithread UDF evaluation.
  • Supports spark.files.ignoreCorruptFiles.
  • Supports input batch statistics.

Performance

  • Improves get_json_object() performance by reducing duplicated json parsing.
  • Improves parquet reading performance by skipping utf-8 validation.
  • Supports cached expression evaluator in native AggExec.
  • Supports column pruning during native evaluation.
  • Prefer native sort even if child is non-native.

Bugfix

  • Fix missing outputPartitioning in NativeParquetExec.
  • Fix missing native converting checks in parquet scan.
  • Fix inconsistency: implement spark-compatible float to int casting.
  • Avoid closing hadoop fs for reusing in cache.

Implement native ShuffledHashJoin

Is your feature request related to a problem? Please describe.
spark uses SortMergeJoin for symmetric large data by default and ShuffledHashJoin is optional. can we implement a native ShuffledHashJoin for better performance?

Describe the solution you'd like
Implement native ShuffledHashJoin.

Describe alternatives you've considered

Additional context
It is easy to implement native ShuffledHashJoin by integrating datafusion's HashJoin operator. but due to low off-heap memory configuration in production environment, building an in-memory hashmap is likely to cause OOM.

Compress in-memory shuffled record batches

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

currently, shuffle writer keeps record batches uncompressed until they are spilled or flushed to next stage. if we compress them before spilling, we can reduce memory usage and the amount of spilled data.

Describe the solution you'd like

when a record batch is finished, instead of buffering it into a Vec<RecordBatch>, we compress it into aVec[u8] which is later spilled or flushed.

Describe alternatives you've considered

Additional context

`spark.blaze.enable.shuffle` doesn't work

Describe the bug
I try to disable coverting ShuffleExchangeExec, but it doesn't work. And i found the code

def convertSparkPlan(exec: SparkPlan): SparkPlan = {
    exec match {
      case e: ShuffleExchangeExec => tryConvert(e, convertShuffleExchangeExec)
    .....

It's not dependent on the configuration spark.blaze.enable.shuffle.
To Reproduce
Add configuration --conf spark.blaze.enable.shuffle=false when submit

Expected behavior
Blaze extension should not covert ShuffleExchangeExec if spark.blaze.enable.shuffle is true.

Additional context

Simplify build to `./gradlew buildRelease` and `./gradlew buildDebug`

Currently, we are building libblaze.so and blaze-engine-1.0-SNAPSHOT.jar separately and rely on submitting them both through the Spark command line:

"$SPARK_HOME"/bin/spark-sql \
....
--jars /path/to/blaze-engine-1.0-SNAPSHOT.jar \
--driver-java-options "-Djava.library.path=$JAVA_HOME:/path/to/libblaze.so" \
....
$@

It would be nice to have two customized Gradle tasks that build and bundle the lib file into the blaze-engine jar.

Refer to https://github.com/datafusion-contrib/datafusion-java/blob/main/datafusion-java/build.gradle for an example.

Implement BHJ by building the hashmap on driver side.

Is your feature request related to a problem? Please describe.
currently this is not supported in v2.0.6. we have to broadcast the original small table data to executor and build the hashmap in each task, which significantly slows down BHJ operator.

Describe the solution you'd like
a better approach is implementing the hashmap construction on driver side and broadcast the built data (just like what we do in spark). however we have to re-implement a HashJoin because datafusion has no support to use a prebuilt hashmap for joining.

Describe alternatives you've considered
currently we have a fallback strategy for executing BHJ with moderate/big size table. broadcasted data exceeds threshold are fallen-back to SMJ. with such strategy the query can be successfully executed, but the performance is bad.

Additional context

Support splitting parquet input files

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

A big parquet input file should be splitted into smaller partitions, which is not currently supported.

Describe the solution you'd like

Implement ParquetExec with filename, offset and length parameters.

Describe alternatives you've considered

does parquet2 provide better supports?

Additional context

use lossy utf-8 parser in parquet reading

Is your feature request related to a problem? Please describe.
when reading a parquet file containing invalid utf-8 strings. arrow-rs will fail with Arrow: Parquet argument error: Parquet error: encountered non UTF-8 data.
in such situation, it is better to just return a junk string instead of failing the whole data processing in production usage.

Describe the solution you'd like
replace String::from_utf8 with String::from_utf8_lossy in arrow-rs/parquet codes.

Describe alternatives you've considered
provide both implementation by using a swith configuration like spark.blaze.enableLossyUtf8=true.

Does blaze or datafusion support Orc/Parquet?

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
(This section helps Arrow developers understand the context and why for this feature, in addition to the what)

Describe the solution you'd like
A clear and concise description of what you want to happen.

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.

Implement batch updating/merging in aggregates

Is your feature request related to a problem? Please describe.
currently aggregating values (aka AggBuf) are updated one by one. this introduces overheads like datatype parsing and downcasting. we will gain some performance profits if we do this batch by batch.

Describe the solution you'd like

  1. add partial_batch_update()/partial_batch_merge() in Agg trait.
  2. while updating, we first get/create agg buffers of current batch and collect them in a vector, then call partial_batch_update()/partial_batch_merge() once to update all buffers.

Describe alternatives you've considered

Additional context

this feature shows significant improvement in some tpcds big cases:

without this feature:
q25 40509927424 181.5s
q64 69013241856 350.2s

with this feature:
q25 26700609536 119.2s
q64 57453035520 260.4s

Running TPCDS query1.sql yields errors

Describe the bug
When run TPCDS query1.sql I got the error:
Error: Error running query: java.lang.RuntimeException: called Result::unwrap() on an Err value: External("Java exception thrown at native-engine/datafusion-ext/src/ipc_reader_exec.rs:141") (state=,code=0)

To Reproduce
Run the TPCDS query1.sql :
with customer_total_return as
(
select sr_customer_sk as ctr_customer_sk, sr_store_sk as ctr_store_sk, sum(SR_FEE) as ctr_total_return
from store_returns_orc,date_dim_orc
where sr_returned_date_sk = d_date_sk and d_year =2000
group by sr_customer_sk,sr_store_sk
)
select /+MAPJOIN(store)/ c_customer_id
from (select ctr_total_return, ctr_store_sk,ctr_customer_sk from customer_total_return ctr0 ,store_orc
where s_store_sk = ctr0.ctr_store_sk
and s_state = 'TN'
) ctr1
,customer_orc
where ctr1.ctr_total_return > (
select avg(ctr_total_return)*1.2
from customer_total_return ctr2
where ctr1.ctr_store_sk = ctr2.ctr_store_sk)
and ctr1.ctr_customer_sk = c_customer_sk
order by c_customer_id
limit 100;

Expected behavior
Expected behavior: the query is executed without error.

Additional context
Run spark with this command:
sbin/start-thriftserver.sh --master yarn
--num-executors 1
--conf spark.executor.cores=12
--conf spark.executor.memory=20g
--driver-memory 6g --jars "/usr/local/blaze-engine-1.0-SNAPSHOT.jar"
--conf spark.sql.extensions=org.apache.spark.sql.blaze.BlazeSparkSessionExtension
--conf spark.executor.extraClassPath="./blaze-engine-1.0-SNAPSHOT.jar"
--conf spark.speculation=false
--conf spark.executor.memoryOverhead=16384
--conf spark.blaze.memoryFraction=0.55
--conf spark.blaze.batchSize=5000
--conf spark.sql.autoBroadcastJoinThreshold=-1

Use a newer protoc version for aarch64

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

When trying to build blaze inside Docker on an M1, I get an error that protoc for linux aarch64 is not avaialbe.
Blaze uses protoc:3.0.0 which might be before aarch64 builds were generated.

Describe the solution you'd like

To be able to use a newer protoc version

Describe alternatives you've considered

I couldn't find any alternative.

Additional context

My local Spark environment is on aarch64, so I can't easily test blaze without starting up some x64 cluster.

performance: native get_json_object() should reuse parsed json value.

Is your feature request related to a problem? Please describe.
in many production queries, get_json_object() is used more than one times to decode the same json string with different paths. for example:

select
  get_json_object(json, '$.key') as key,
  get_json_object(json, '$.value') as value
from
  json_table

in hive/spark, the UDFJson is implemented with an internal cache for the recently parsed json objects. since hive/spark's execution is row-based, the second get_json_object call can use json object cached by the first call.

however this strategy is hard to implement in blaze because the execution is column based. we have to cache parsed values of the whole column, which will use a lot of memory and may cause OOM.

Describe the solution you'd like
in blaze, we have already supported expression caching. so we can rewrite get_json_object(jsonstr, path) into get_parsed_json_object(parse_json(jsonstr), path), then the result of inner call parse_json(jsonstr) can be cached an reused across multiple get_json_object calls.

cons: get_json_object inside short circuiting expressions like if/casewhen cannot be reused yet.

Describe alternatives you've considered

Additional context

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.