Code Monkey home page Code Monkey logo

apache / submarine Goto Github PK

View Code? Open in Web Editor NEW
689.0 59.0 252.0 30 MB

Submarine is Cloud Native Machine Learning Platform.

Home Page: https://submarine.apache.org/

License: Apache License 2.0

Shell 5.06% Dockerfile 0.61% Python 14.52% Java 60.43% JavaScript 1.04% TypeScript 8.49% HTML 3.24% Makefile 0.18% Go 4.17% Jupyter Notebook 0.50% SCSS 0.96% Less 0.24% Smarty 0.02% Roff 0.21% CSS 0.35%
machine-learning deep-learning ai notebook kubernetes docker

submarine's Introduction

Colored_logo_with_text

Submarine workflow python-sdk workflow License PyPI version

What is Apache Submarine?

Apache Submarine (Submarine for short) is an End-to-End Machine Learning Platform to allow data scientists to create end-to-end machine learning workflows. On Submarine, data scientists can finish each stage in the ML model lifecycle, including data exploration, data pipeline creation, model training, serving, and monitoring.

Why Submarine?

Some open-source and commercial projects are trying to build an end-to-end ML platform. What's the vision of Submarine?

Problems

  1. Many platforms lack easy-to-use user interfaces (API, SDK, and IDE, etc.)
  2. In the same company, data scientists in different teams usually spend much time on developments of existing feature sets and models.
  3. Data scientists put emphasis on domain-specific tasks (e.g. Click-Through-Rate), but they need to implement their models from scratch with SDKs provided by existing platforms.
  4. Many platforms lack a unified workbench to manage each component in the ML lifecycle.

Theodore Levitt once said:

“People don’t want to buy a quarter-inch drill. They want a quarter-inch hole.”

Goals of Submarine

Model Training (Experiment)

  • Run/Track distributed training experiment on prem or cloud via easy-to-use UI/API/SDK.
  • Easy for data scientists to manage versions of experiment and dependencies of environment.
  • Support popular machine learning frameworks, including TensorFlow, PyTorch, Horovod, and MXNet
  • Provide pre-defined template for data scientists to implement domain-specific tasks easily (e.g. using DeepFM template to build a CTR prediction model)
  • Support many compute resources (e.g. CPU and GPU, etc.)
  • Support Kubernetes and YARN
  • Pipeline is also on the backlog, we will look into pipeline for training in the future.

Notebook Service

  • Submarine aims to provide a notebook service (e.g. Jupyter notebook) which allows users to manage notebook instances running on the cluster.

Model Management (Serving/versioning/monitoring, etc.)

  • Model management for model-serving/versioning/monitoring is on the roadmap.

Easy-to-use User Interface

As mentioned above, Submarine attempts to provide Data-Scientist-friendly UI to make data scientists have a good user experience. Here're some examples.

Example: Submit a distributed Tensorflow experiment via Submarine Python SDK

Run a Tensorflow Mnist experiment

# New a submarine client of the submarine server
submarine_client = submarine.ExperimentClient(host='http://localhost:8080')

# The experiment's environment, could be Docker image or Conda environment based
environment = EnvironmentSpec(image='apache/submarine:tf-dist-mnist-test-1.0')

# Specify the experiment's name, framework it's using, namespace it will run in,
# the entry point. It can also accept environment variables. etc.
# For PyTorch job, the framework should be 'Pytorch'.
experiment_meta = ExperimentMeta(name='mnist-dist',
                                 namespace='default',
                                 framework='Tensorflow',
                                 cmd='python /var/tf_dist_mnist/dist_mnist.py --train_steps=100')
# 1 PS task of 2 cpu, 1GB
ps_spec = ExperimentTaskSpec(resources='cpu=2,memory=1024M',
                             replicas=1)
# 1 Worker task
worker_spec = ExperimentTaskSpec(resources='cpu=2,memory=1024M',
                                 replicas=1)

# Wrap up the meta, environment and task specs into an experiment.
# For PyTorch job, the specs would be "Master" and "Worker".
experiment_spec = ExperimentSpec(meta=experiment_meta,
                                 environment=environment,
                                 spec={'Ps':ps_spec, 'Worker': worker_spec})

# Submit the experiment to submarine server
experiment = submarine_client.create_experiment(experiment_spec=experiment_spec)

# Get the experiment ID
id = experiment['experimentId']

Query a specific experiment

submarine_client.get_experiment(id)

Wait for finish

submarine_client.wait_for_finish(id)

Get the experiment's log

submarine_client.get_log(id)

Get all running experiment

submarine_client.list_experiments(status='running')

For a quick-start, see Submarine On K8s

Example: Submit a pre-defined experiment template job

Example: Submit an experiment via Submarine UI

(Available on 0.5.0, see Roadmap)

Architecture, Design and requirements

If you want to know more about Submarine's architecture, components, requirements and design doc, they can be found on Architecture-and-requirement

Detailed design documentation, implementation notes can be found at: Implementation notes

Apache Submarine Community

Read the Apache Submarine Community Guide

How to contribute Contributing Guide

Login Submarine slack channel: https://join.slack.com/t/asf-submarine/shared_invite

Issue Tracking: https://issues.apache.org/jira/projects/SUBMARINE

User Document

See User Guide Home Page

Developer Document

See Developer Guide Home Page

Roadmap

What to know more about what's coming for Submarine? Please check the roadmap out: https://cwiki.apache.org/confluence/display/SUBMARINE/Roadmap

Changelog

From here, you can know the changelog and the issue tracker of different version of Apache Submarine.

Resources

Apache submarine: a unified machine learning platform made simple at EuroMLSys '22

License

The Apache Submarine project is licensed under the Apache 2.0 License. See the LICENSE file for details.

submarine's People

Contributors

aeioulisa avatar atosystem avatar byronhsu avatar cchung100m avatar cdmikechen avatar charlie0220 avatar dependabot[bot] avatar eroschang avatar fatallin avatar featherchen avatar jasoonn avatar jeff-901 avatar jiwq avatar johnting avatar jojochuang avatar joshvictor1024 avatar kenchu123 avatar kevin85421 avatar kobe860219 avatar kuan-hsun-li avatar linhaozhu avatar lowc1012 avatar mortalhappiness avatar noidname01 avatar pingsutw avatar tangzhankun avatar wangdatan avatar xunliu avatar yaooqinn avatar yuanzac avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

submarine's Issues

Migrate to the next version of Python requests when released

Hello Maintainers,

I am a PMC member of Apache Airflow, and I wanted to give you a bit of heads-up with rather important migration to the upcoming version of requests library in your Python release.

Since you are using requests library in your project, you are affected.

As discussed at length in https://issues.apache.org/jira/browse/LEGAL-572 we found out that the 'chardetlibrary used byrequests` library was a mandatory dependency to requests and since it has LGPL licence, we should not release any Apache Software with it.

Since then (and since in Airflow we rely on requests heavily) we have been working with the requests maintainers and "charset-normalizer" maintainer to make it possible to replace chardet with MIT-licensed charset-normalizer instead so that requests library can be used in Python releases by Apache projects.

This was a bumpy road but finally the PR by @ashb has been merged: psf/requests#5797 and we hope soon a new version of requests library will be released.

This is just a heads-up. I will let you know when it is released, but I have a kind requests as well - I might ask the maintainers to release a release candidate of requests and maybe you could help to test it before it is released, that would be some re-assurance for the maintainers of requests who are very concerned about stability of their releases.

Let me know if you need any more information and whether you would like to help in testing the candidate when it is out.

Error getting ranger policies

I built the submarine security plugin for ranger 1.2 and spark 2.3. When I'm using pyspark and spark-submit it is able to retrieve the hive policy file from ranger and store it to the file system. However, when I'm using Jupyter and livy to connect to an HDP 3.1.5 cluster it does not succeed in getting the policy file. Instead I get this error

WARN RangerAdminRESTClient: Error getting policies. secureMode=true, user=xxxxxxx (auth:SIMPLE), response={"httpStatusCode":401,"statusCode":0}, serviceName=XXXX_hive

The user id that is being used is the id that owns the spark application in yarn. If its trying to pass a password for this user to ranger admin, I don't know where it would be getting it. What concerns me is that my cluster is kerberized and the error above shows auth:SIMPLE and not auth:KERBEROS. I would expect it to authenticate to ranger with kerberos. Can you offer any advice on why I'm not able to retrieve the policy in this situation but I am in others? Is there a configuration I can add to set the userid/password that is used to pull the policy from ranger?

submarine-security[release-0.6.0] can not manager different users with different databases or tables priviledge

In release-0.6.0 ,submarine-security can not realize control priviledge of different users(except spark) with different databases or tables when we use spark-thrift to connect the hive.

I once tried to modify the source code of submarine-security/spark-security but it did not work.
The file i once tried to modify is \submarine\submarine-security\spark-security\src\main\scala\org\apache\submarine\spark\security\RangerSparkAuthorizer.scala

Can anyone give some advices to me,thansk.
release-0.6.0 information as below
submarine-security/spark-security

spark-security Ranger plugin is not masking when a function is used around the column

When a column has masking policies configured and the column is called inside a function. The output of the function is not returning masked data.

For example, let's say if a column abc from table default_table has HASH_MASK policy configured for a user XYZ. Below are a couple of queries where the user might get actual abc value instead of masked value.

select substr(abc, 0, 100000000000)  from default_table
select concat(abc, '-', 'dummy') from default_table

how to run it on yarn

I didn't find submarine usage guide for running on yarn. Whether its ver 0.7 does support to run task on yarn? Any guide documents? Thanks.

[DESIGN] Submarine CLI need to support k8s native connection

Submarine CLI currently only supports connection based on REST.
If we have connected remote k8s cluster or deployed a local minikube, we should be able to directly access the submarine server through k8s client for data interaction.

For example. We deploy submarine by a CR with submarine operator, so Submarine CLI can get the deployment / pod information of submarine server through submarine customized resources. In this way, we can get relevant information about submarine/notebook in a more cloud native way.

Config

This is submarine client config by using submarine config list

{                                                          
   "connection": {                                          
     "hostname": "localhost",                               
     "port": 32080                                          
   }                                                         
} 

We can change this config Native Mode like this:

{
  "type": "native",
  "connection": {
    "operator": "submarine-example"
  }
}

or

TYPE:     native
CURRENT   CONNECTION            NAMESPACE
*         submarine-example-1   default
          submarine-example-2   test

If we have just one submarine server by oprator, we can set this CR default.

Design

The design idea is very simple ~
Data interaction based on native mode can directly call k8s client API to execute commands in pod. We can use java command (like submarine-k8s-submmiter jars)/ REST to get any information we want in pod.

Double-Checked Locking

if (conf == null) {
synchronized (SubmarineConfiguration.class) {
if (conf == null) {
conf = newInstance();

double-Checked Locking is widely cited and used as an efficient method for implementing lazy initialization in a multithreaded environment.
Unfortunately, it will not work reliably in a platform independent way when implemented in Java, without additional synchronization.

[Submarine Spark Security] "Drop Table" Access Control Bypassed / Ignored

Hi, I am using Spark 2.4.5 and Ranger 1.2.0, and built the submarine-spark-security plugin on commit 2ff3339 with mvn clean package -Dmaven.javadoc.skip=true -DskipTests -pl :submarine-spark-security -Pspark-2.4 -Pranger-1.2.

Upon creating a user in Ranger with no permissions (or in my case, precisely, I created an user in OpenLDAP, synced it via ranger-usersync, and did not assign any permissions for the new user), it is expected the user will get permission denied error (e.g. SparkAccessControlException) for all SQL operations (e.g. SELECT, INSERT, DROP).

However, the permission denial only works for SELECT & INSERT.
"DROP TABLE" was still allowed despite the user having no permissions at all, and the table was dropped as a result.

I am setting spark.sql.extensions=org.apache.submarine.spark.security.api.RangerSparkSQLExtensionif it matters. hive.server2.authentication is also set to LDAP in /spark/conf/hive-site.xml.

Can't build helm chart submarine from quick start guide

W0709 19:38:47.901294 57381 warnings.go:70] apiextensions.k8s.io/v1beta1 CustomResourceDefinition is deprecated in v1.16+, unavailable in v1.22+; use apiextensions.k8s.io/v1 CustomResourceDefinition
Error: INSTALLATION FAILED: unable to build kubernetes objects from release manifest: resource mapping not found for name: "submarine-gateway" namespace: "" from "": no matches for kind "Gateway" in version "networking.istio.io/v1alpha3"
ensure CRDs are installed first

URI is not absolute

I meet two errors.

  • First, URI is not absolute

I use beeline to connect and return below error log but does not affect the use but does not affect the use.

21/03/02 22:31:22 INFO policyengine.RangerPolicyRepository: This policy engine contains 1 policy evaluators
21/03/02 22:31:23 ERROR contextenricher.RangerTagEnricher$RangerTagRefresher: Encountered unexpected exception. Ignoring
submarine_spark_ranger_project.com.sun.jersey.api.client.ClientHandlerException: java.lang.IllegalArgumentException: URI is not absolute
        at submarine_spark_ranger_project.com.sun.jersey.client.urlconnection.URLConnectionClientHandler.handle(URLConnectionClientHandler.java:155)
        at submarine_spark_ranger_project.com.sun.jersey.api.client.Client.handle(Client.java:652)
        at submarine_spark_ranger_project.com.sun.jersey.api.client.WebResource.handle(WebResource.java:682)
        at submarine_spark_ranger_project.com.sun.jersey.api.client.WebResource.access$200(WebResource.java:74)
        at submarine_spark_ranger_project.com.sun.jersey.api.client.WebResource$Builder.get(WebResource.java:509)
        at org.apache.ranger.admin.client.RangerAdminRESTClient.getServiceTagsIfUpdated(RangerAdminRESTClient.java:311)
        at org.apache.ranger.plugin.contextenricher.RangerAdminTagRetriever.retrieveTags(RangerAdminTagRetriever.java:57)
        at org.apache.ranger.plugin.contextenricher.RangerTagEnricher$RangerTagRefresher.populateTags(RangerTagEnricher.java:606)
        at org.apache.ranger.plugin.contextenricher.RangerTagEnricher$RangerTagRefresher.access$000(RangerTagEnricher.java:524)
        at org.apache.ranger.plugin.contextenricher.RangerTagEnricher.init(RangerTagEnricher.java:127)
        at org.apache.ranger.plugin.policyengine.RangerPolicyRepository.buildContextEnricher(RangerPolicyRepository.java:783)
        at org.apache.ranger.plugin.policyengine.RangerPolicyRepository.init(RangerPolicyRepository.java:712)
        at org.apache.ranger.plugin.policyengine.RangerPolicyRepository.<init>(RangerPolicyRepository.java:187)
        at org.apache.ranger.plugin.policyengine.RangerPolicyEngineImpl.<init>(RangerPolicyEngineImpl.java:128)
        at org.apache.ranger.plugin.service.RangerBasePlugin.setPolicies(RangerBasePlugin.java:264)
        at org.apache.ranger.plugin.util.PolicyRefresher.loadPolicy(PolicyRefresher.java:222)
        at org.apache.ranger.plugin.util.PolicyRefresher.startRefresher(PolicyRefresher.java:149)
        at org.apache.ranger.plugin.service.RangerBasePlugin.init(RangerBasePlugin.java:222)
        at org.apache.submarine.spark.security.RangerSparkPlugin$.init(RangerSparkPlugin.scala:42)
        at org.apache.submarine.spark.security.RangerSparkPlugin$.<init>(RangerSparkPlugin.scala:57)
        at org.apache.submarine.spark.security.RangerSparkPlugin$.<clinit>(RangerSparkPlugin.scala)
        at org.apache.submarine.spark.security.RangerSparkAuthorizer$.org$apache$submarine$spark$security$RangerSparkAuthorizer$$getSparkResource(RangerSparkAuthorizer.scala:257)
        at org.apache.submarine.spark.security.RangerSparkAuthorizer$$anonfun$addAccessRequest$1$1.apply(RangerSparkAuthorizer.scala:75)
        at org.apache.submarine.spark.security.RangerSparkAuthorizer$$anonfun$addAccessRequest$1$1.apply(RangerSparkAuthorizer.scala:74)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.submarine.spark.security.RangerSparkAuthorizer$.addAccessRequest$1(RangerSparkAuthorizer.scala:74)
        at org.apache.submarine.spark.security.RangerSparkAuthorizer$.checkPrivileges(RangerSparkAuthorizer.scala:98)
        at org.apache.spark.sql.catalyst.optimizer.SubmarineSparkRangerAuthorizationExtension.apply(SubmarineSparkRangerAuthorizationExtension.scala:65)
        at org.apache.spark.sql.catalyst.optimizer.SubmarineSparkRangerAuthorizationExtension.apply(SubmarineSparkRangerAuthorizationExtension.scala:40)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:87)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:84)
        at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
        at scala.collection.immutable.List.foldLeft(List.scala:84)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:84)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:76)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:76)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:67)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:67)
        at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:73)
        at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:69)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:78)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:78)
        at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withAction(Dataset.scala:3365)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:194)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:79)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:643)
        at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:694)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLSessionManager.openSession(SparkSQLSessionManager.scala:68)
        at org.apache.hive.service.cli.CLIService.openSessionWithImpersonation(CLIService.java:202)
        at org.apache.hive.service.cli.thrift.ThriftCLIService.getSessionHandle(ThriftCLIService.java:351)
        at org.apache.hive.service.cli.thrift.ThriftCLIService.OpenSession(ThriftCLIService.java:246)
        at org.apache.hive.service.cli.thrift.TCLIService$Processor$OpenSession.getResult(TCLIService.java:1253)
        at org.apache.hive.service.cli.thrift.TCLIService$Processor$OpenSession.getResult(TCLIService.java:1238)
        at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
        at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
        at org.apache.hive.service.auth.TSetIpAddressProcessor.process(TSetIpAddressProcessor.java:53)
        at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: URI is not absolute
        at java.net.URI.toURL(URI.java:1088)
        at submarine_spark_ranger_project.com.sun.jersey.client.urlconnection.URLConnectionClientHandler._invoke(URLConnectionClientHandler.java:163)
        at submarine_spark_ranger_project.com.sun.jersey.client.urlconnection.URLConnectionClientHandler.handle(URLConnectionClientHandler.java:153)
        ... 61 more
21/03/02 22:31:23 INFO util.RangerResourceTrie: builderThreadCount is set to [1]
21/03/02 22:31:23 INFO resourcetrie.init: builderThreadCount is set to [1]
21/03/02 22:31:23 INFO service.RangerBasePlugin: Policies will NOT be reordered based on number of evaluations
21/03/02 22:31:23 INFO security.RangerSparkPlugin$: Policy cache directory successfully set to /opt/spark/spark-2.4.7-bin-hadoop2.6/policycache
  • Second, java.lang.ClassNotFoundException: org.apache.ranger.authorization.hive.authorizer.RangerHiveAuthorizerFactory
    I execute sparksql like select * from table limit 3, and return an error. But when I did it the second time, it was fine and return the correct result:
21/03/02 22:34:14 ERROR session.SessionState: Error setting up authorization: java.lang.ClassNotFoundException: org.apache.ranger.authorization.hive.authorizer.RangerHiveAuthorizerFactory
org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.ClassNotFoundException: org.apache.ranger.authorization.hive.authorizer.RangerHiveAuthorizerFactory
        at org.apache.hadoop.hive.ql.metadata.HiveUtils.getAuthorizeProviderManager(HiveUtils.java:391)
        at org.apache.hadoop.hive.ql.session.SessionState.setupAuth(SessionState.java:720)
        at org.apache.hadoop.hive.ql.session.SessionState.getAuthenticator(SessionState.java:1391)
        at org.apache.hadoop.hive.ql.session.SessionState.getUserFromAuthenticator(SessionState.java:984)
        at org.apache.hadoop.hive.ql.metadata.Table.getEmptyTable(Table.java:177)
        at org.apache.hadoop.hive.ql.metadata.Table.<init>(Table.java:119)
        at org.apache.spark.sql.hive.client.HiveClientImpl$.toHiveTable(HiveClientImpl.scala:922)
        at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getPartitions$1.apply(HiveClientImpl.scala:665)
        at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getPartitions$1.apply(HiveClientImpl.scala:664)
        at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:277)
        at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:215)
        at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:214)
        at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:260)
        at org.apache.spark.sql.hive.client.HiveClientImpl.getPartitions(HiveClientImpl.scala:664)
        at org.apache.spark.sql.hive.client.HiveClient$class.getPartitions(HiveClient.scala:210)
        at org.apache.spark.sql.hive.client.HiveClientImpl.getPartitions(HiveClientImpl.scala:84)
        at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$listPartitions$1.apply(HiveExternalCatalog.scala:1195)
        at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$listPartitions$1.apply(HiveExternalCatalog.scala:1193)
        at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:97)
        at org.apache.spark.sql.hive.HiveExternalCatalog.listPartitions(HiveExternalCatalog.scala:1193)
        at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.listPartitions(ExternalCatalogWithListener.scala:246)
        at org.apache.spark.sql.catalyst.catalog.SessionCatalog.listPartitions(SessionCatalog.scala:948)
        at org.apache.spark.sql.hive.execution.HiveTableScanExec.rawPartitions$lzycompute(HiveTableScanExec.scala:178)
        at org.apache.spark.sql.hive.execution.HiveTableScanExec.rawPartitions(HiveTableScanExec.scala:166)
        at org.apache.spark.sql.hive.execution.HiveTableScanExec$$anonfun$11.apply(HiveTableScanExec.scala:192)
        at org.apache.spark.sql.hive.execution.HiveTableScanExec$$anonfun$11.apply(HiveTableScanExec.scala:192)
        at org.apache.spark.util.Utils$.withDummyCallSite(Utils.scala:2470)
        at org.apache.spark.sql.hive.execution.HiveTableScanExec.doExecute(HiveTableScanExec.scala:191)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
        at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:339)
        at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
        at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3389)
        at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2788)
        at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2788)
        at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
        at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
        at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withAction(Dataset.scala:3369)
        at org.apache.spark.sql.Dataset.collect(Dataset.scala:2788)
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:246)
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$1$$anon$2.run(SparkExecuteStatementOperation.scala:175)
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$1$$anon$2.run(SparkExecuteStatementOperation.scala:171)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$1.run(SparkExecuteStatementOperation.scala:185)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.ranger.authorization.hive.authorizer.RangerHiveAuthorizerFactory
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at org.apache.hadoop.hive.ql.metadata.HiveUtils.getAuthorizeProviderManager(HiveUtils.java:381)
        ... 57 more

[DESIGN] Add a CRD to install and control jupyter image in k8s

At present, the juypter image size is very large, so that when users deploy juypter service in a new k8s cluster or node, there will be a long waiting process.
This issue is mainly to discuss the design idea of building an operator based on CRD that can connect with existing submarine services and has certain controllability / predictability. Based on a new CRD, we can automatically call the image pull action in every suitable node before the juypter service is deployed, so that every node in k8s has the corresponding image.

In this case, we need to create a CRD which contains a list of images to be obtained, the refresh time, and the pull secret key of each image (if necessary). Examples of CRD are as follows:

apiVersion: org.apache.submarine/v1
kind: JupyterImagePuller
metadata:
  name: example-image-puller
  namespace: submarine
spec:
  images: # the list of images to pre-pull
    - name: jupyter # environment name
      image: apache/submarine:jupyter-notebook-0.7.0 # image name
    - name: jupyter-gpu
      image: xxx.harbor.com/5000/apache/submarine:jupyter-notebook-gpu-0.7.0
      auth: # docker registry authentication
        username: xxxx
        password: xxxx
        email: [email protected] # Optional
    - name: jupyter 
      image: apache/submarine:jupyter-notebook-0.7.0-chinese
      auth: 
        secret: xxxx # If there is already a specified secret, we can fill in the secret name 
  nodeSelector: {} # node selector applied to pods created by the daemonset
  refreshHours: '2' # number of hours between health checks
status:
  images:
    - name: apache/submarine:jupyter-notebook-0.7.0
      state: success/failure/pulling
      message: Reasons for pull failure ...
      digest: sha256:f04468d5ec5bdcda7a6ebdd65b20a7b363f348f1caef915df4a6cc8d1eb09029
      nodes:
        - worker1.xxxx.com

Every time submarine updates the environments, it will update the image list in CRD. After reading the spec of CRD and triggering the addition / modification, the operator can create a DaemonSet in the specified namespace (with nodeSelector). The DaemonSet will contain N (images list size) containers which can pull every image by CRD.
This operation will modify the entrypiont script in the docker image and output words like "Pulling complete", so it's a lightweight task.

spec:
 initContainers:
    - name: image-pull-{image-name}
      command:
        - /bin/sh
        - -c
        - echo "Pulling complete"

Docker image registry authorization

Docker authentication should be provided in environment. We should consider some private clouds or private image registry (like harbor). In some cases, we need to provide the docker authentication for downloading.
We can support users to directly enter the user name and password or use the authentication information already on k8s's Secret.

{
  "name": "notebook-gpu-env",
  "dockerImage": "apache/submarine:jupyter-notebook-gpu-0.7.0",
  "dockerAuthSpec": {
    "type": "password",
    "username": "xxxx",
    "password": "xxxx",
    "email": null
  }
} 

Image nodeSelector

At present, this chapter mainly considers GPU image. We need to consider that some k8s GPU resources may only exist on some exclusive nodes. Therefore, we need to add a nodeselector to the deployment of pod. Meanwhile, the environment also needs to add nodeselector. In this way, the GPU image pod can be deployed on the correct node.

Docker image version update strategy

Build a ConfigMap to save the image and tag information.
When the refresh hour is reached, the updated image will be compared with the latest tag / hash to distinguish whether there is an image update. If there is an update, a new pull operation is automatically triggered.

kind: ConfigMap
apiVersion: v1
metadata:
  name: submarine-image-pull-operator-records
  namespace: submarine
data:
  images.list: |-
    apache/submarine:jupyter-notebook-0.7.0@sha256:f04468d5ec5bdcda7a6ebdd65b20a7b363f348f1caef915df4a6cc8d1eb09029
    xxx.harbor.com/5000/apache/submarine:jupyter-notebook-gpu-0.7.0@sha256:1ccc0a0ca577e5fb5a0bdf2150a1a9f842f47c8865e861fa0062c5d343eb8cac

It should be noted that there may be jupyter and jupyter-gpu images under each tenant. Since the repeated image pull operation will not bring too much additional burden, we allow different tenants to have the same image resources.


There are still some contents to be designed, which will be explained later.

  • TODO 1: How to init/replace docker image name and authorization when deploy a new submarine service.

restart hadoop services occurred an error when I finished the GPU setting for RM、NM and container-executor.cfg

version:
submarine-v0.3.0
hadoop-v3.2.1

I am following the documentation to set up GPU for ResourceManager, NodeManager and container-executor.cfg in my environment.
Then I turned to restart hadoop with the following code:
ARN_LOGFILE=resourcemanager.log ./sbin/yarn-daemon.sh start resourcemanager
YARN_LOGFILE=nodemanager.log ./sbin/yarn-daemon.sh start nodemanager
YARN_LOGFILE=timeline.log ./sbin/yarn-daemon.sh start timelineserver
YARN_LOGFILE=mr-historyserver.log ./sbin/mr-jobhistory-daemon.sh start historyserver

I used the ** jps ** command to see if the service was running. Unfortunately, I found that the nodemanager service was not started. Then I found some errors in hadoop-root-nodemanager-71192c388b55.log

2020-03-01 09:52:38,744 ERROR org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler: Failed to bootstrap configured resource subsystems! org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException: Controller devices not mounted. You either need to mount it with yarn.nodemanager.linux-container-executor.cgroups.mount or mount cgroups before launching Yarn at org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandlerImpl.initializePreMountedCGroupController(CGroupsHandlerImpl.java:392) at org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandlerImpl.initializeCGroupController(CGroupsHandlerImpl.java:370) at org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.gpu.GpuResourceHandlerImpl.bootstrap(GpuResourceHandlerImpl.java:93) at org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerChain.bootstrap(ResourceHandlerChain.java:58) at org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler.serviceInit(ContainerScheduler.java:146) at org.apache.hadoop.service.AbstractService.init(AbstractService.java:164) at org.apache.hadoop.service.CompositeService.serviceInit(CompositeService.java:108) at org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl.serviceInit(ContainerManagerImpl.java:323) at org.apache.hadoop.service.AbstractService.init(AbstractService.java:164) at org.apache.hadoop.service.CompositeService.serviceInit(CompositeService.java:108) at org.apache.hadoop.yarn.server.nodemanager.NodeManager.serviceInit(NodeManager.java:516) at org.apache.hadoop.service.AbstractService.init(AbstractService.java:164) at org.apache.hadoop.yarn.server.nodemanager.NodeManager.initAndStartNodeManager(NodeManager.java:974) at org.apache.hadoop.yarn.server.nodemanager.NodeManager.main(NodeManager.java:1054) 2020-03-01 09:52:38,744 INFO org.apache.hadoop.service.AbstractService: Service org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler failed in state INITED java.io.IOException: Failed to bootstrap configured resource subsystems! at org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler.serviceInit(ContainerScheduler.java:150) at org.apache.hadoop.service.AbstractService.init(AbstractService.java:164) at org.apache.hadoop.service.CompositeService.serviceInit(CompositeService.java:108) at org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl.serviceInit(ContainerManagerImpl.java:323) at org.apache.hadoop.service.AbstractService.init(AbstractService.java:164) at org.apache.hadoop.service.CompositeService.serviceInit(CompositeService.java:108) at org.apache.hadoop.yarn.server.nodemanager.NodeManager.serviceInit(NodeManager.java:516) at org.apache.hadoop.service.AbstractService.init(AbstractService.java:164) at org.apache.hadoop.yarn.server.nodemanager.NodeManager.initAndStartNodeManager(NodeManager.java:974) at org.apache.hadoop.yarn.server.nodemanager.NodeManager.main(NodeManager.java:1054) 2020-03-01 09:52:38,745 INFO org.apache.hadoop.service.AbstractService: Service org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl failed in state INITED org.apache.hadoop.service.ServiceStateException: java.io.IOException: Failed to bootstrap configured resource subsystems! at org.apache.hadoop.service.ServiceStateException.convert(ServiceStateException.java:105) at org.apache.hadoop.service.AbstractService.init(AbstractService.java:173) at org.apache.hadoop.service.CompositeService.serviceInit(CompositeService.java:108) at org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl.serviceInit(ContainerManagerImpl.java:323) at org.apache.hadoop.service.AbstractService.init(AbstractService.java:164) at org.apache.hadoop.service.CompositeService.serviceInit(CompositeService.java:108) at org.apache.hadoop.yarn.server.nodemanager.NodeManager.serviceInit(NodeManager.java:516) at org.apache.hadoop.service.AbstractService.init(AbstractService.java:164) at org.apache.hadoop.yarn.server.nodemanager.NodeManager.initAndStartNodeManager(NodeManager.java:974) at org.apache.hadoop.yarn.server.nodemanager.NodeManager.main(NodeManager.java:1054) Caused by: java.io.IOException: Failed to bootstrap configured resource subsystems! at org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler.serviceInit(ContainerScheduler.java:150) at org.apache.hadoop.service.AbstractService.init(AbstractService.java:164) ... 8 more

It seems the env didn't mount "/sys/fs/cgroup",here's my docker started command:
➜ Downloads docker run -it -v /data/docker-images/:/sys/fs/cgroup -m 10G 968d612886ee bash
somebody can help me ?

Spark security does not use the beeline connected user.

I am using ranger 2.1 and spark 3.1.

I use (e.g. user: root) spark submit for running HiveThriftServer2. After successful submit I use the beeline to connect to thriftserver but it does not use the user (e.g.user: test) which I connected while making connection with beeline.

Expected result: spark security should check permissions for user test which I used for beeline connection
Actual: spark security is using root which I have executed the the spark submit for running thrift server. I can change it with --proxy-user option but it will still use the same user for all connection.

Username and Password

I perform a ./submarine.sh and pointed my browser to the URL, however I'm having issues with logging on.
I try to create an account, however after I enter my username and password, it never lets me log on.

how run it by source code

how run it by source code ,not docker ,not minikube.
i develop by IDE idea ,i want run it by IDE ,which service should run
image

Failed to import project Maven dependency

When I introduced Maven dependencies for a project, the following error occurred:

[INFO] ------------------------------------------------------------------------ [INFO] BUILD FAILURE [INFO] ------------------------------------------------------------------------ [INFO] Total time: 08:56 min [INFO] Finished at: 2020-03-04T01:34:24Z [INFO] ------------------------------------------------------------------------ [ERROR] Failed to execute goal on project submarine-client: Could not resolve dependencies for project org.apache.submarine:submarine-client:jar:0.4.0-SNAPSHOT: Failed to collect dependencies at org.apache.submarine:commons-runtime:jar:0.4.0-SNAPSHOT: Failed to read artifact descriptor for org.apache.submarine:commons-runtime:jar:0.4.0-SNAPSHOT: Could not transfer artifact org.apache.submarine:commons-runtime:pom:0.4.0-SNAPSHOT from/to apache.snapshots (http://repository.apache.org/snapshots): Connect to repository.apache.org:80 [repository.apache.org/207.244.88.140] failed: Connection timed out (Connection timed out) -> [Help 1] [ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch. [ERROR] Re-run Maven using the -X switch to enable full debug logging. [ERROR] [ERROR] For more information about the errors and possible solutions, please read the following articles: [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/DependencyResolutionException

How can we introduce the necessary dependency correctly? Or which online warehouse to use?
Looking forward to your answer,Thanks a lot!

org.apache.submarine.spark.security.SparkAccessControlException: Permission denied: user [tools] does not have [USE] privilege on [spark_catalog]

spark version: 3.0
ranger version: 1.2
execute ./bin/beeline -u jdbc:hive2://ip:50033/tools -n tools, and error log:

21/03/11 22:08:19 WARN thrift.ThriftCLIService: Error opening session: 
org.apache.hive.service.cli.HiveSQLException: Failed to open new session: org.apache.submarine.spark.security.SparkAccessControlException: Permission denied: user [tools] does not have [USE] privilege on [spark_catalog]
        at org.apache.spark.sql.hive.thriftserver.SparkSQLSessionManager.openSession(SparkSQLSessionManager.scala:85)
        at org.apache.hive.service.cli.CLIService.openSession(CLIService.java:196)
        at org.apache.hive.service.cli.thrift.ThriftCLIService.getSessionHandle(ThriftCLIService.java:374)
        at org.apache.hive.service.cli.thrift.ThriftCLIService.OpenSession(ThriftCLIService.java:243)
        at org.apache.hive.service.rpc.thrift.TCLIService$Processor$OpenSession.getResult(TCLIService.java:1497)
        at org.apache.hive.service.rpc.thrift.TCLIService$Processor$OpenSession.getResult(TCLIService.java:1482)
        at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:38)
        at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
        at org.apache.hive.service.auth.TSetIpAddressProcessor.process(TSetIpAddressProcessor.java:53)
        at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:310)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.submarine.spark.security.SparkAccessControlException: Permission denied: user [tools] does not have [USE] privilege on [spark_catalog]
        at org.apache.submarine.spark.security.RangerSparkAuthorizer$.$anonfun$checkPrivileges$3(RangerSparkAuthorizer.scala:126)
        at org.apache.submarine.spark.security.RangerSparkAuthorizer$.$anonfun$checkPrivileges$3$adapted(RangerSparkAuthorizer.scala:100)
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at org.apache.submarine.spark.security.RangerSparkAuthorizer$.checkPrivileges(RangerSparkAuthorizer.scala:100)
        at org.apache.spark.sql.catalyst.optimizer.SubmarineSparkRangerAuthorizationExtension.apply(SubmarineSparkRangerAuthorizationExtension.scala:65)
        at org.apache.spark.sql.catalyst.optimizer.SubmarineSparkRangerAuthorizationExtension.apply(SubmarineSparkRangerAuthorizationExtension.scala:40)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:216)
        at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
        at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
        at scala.collection.immutable.List.foldLeft(List.scala:89)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:213)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:205)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:205)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:183)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:183)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:87)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:143)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
        at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:143)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:84)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:84)
        at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:95)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:113)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:110)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$simpleString$2(QueryExecution.scala:161)
        at org.apache.spark.sql.execution.ExplainUtils$.processPlan(ExplainUtils.scala:115)
        at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:161)
        at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:206)
        at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:175)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:98)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:228)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
        at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:615)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:610)
        at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:650)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLSessionManager.openSession(SparkSQLSessionManager.scala:73)
        ... 12 more
21/03/11 22:08:19 ERROR server.TThreadPoolServer: Thrift error occurred during processing of message.
org.apache.thrift.transport.TTransportException
        at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
        at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
        at org.apache.thrift.transport.TSaslTransport.readLength(TSaslTransport.java:374)
        at org.apache.thrift.transport.TSaslTransport.readFrame(TSaslTransport.java:451)
        at org.apache.thrift.transport.TSaslTransport.read(TSaslTransport.java:433)
        at org.apache.thrift.transport.TSaslServerTransport.read(TSaslServerTransport.java:43)
        at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
        at org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:425)
        at org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:321)
        at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:225)
        at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:27)
        at org.apache.hive.service.auth.TSetIpAddressProcessor.process(TSetIpAddressProcessor.java:53)
        at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:310)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

[DESIGN] Submarine can support a service to customize new images for jupyter notebook/ experiment

At present, the jupyter image is relatively large, if the user wants to add some dependencies about jupyter or Python based on the existing image, it is very troublesome.
I think submarine can provide a service similar to S2i (source to image) or using buildah, which directly builds a new image based on the existing jupyter image and registers it in submarine's environment. In this way, users can easily use the package they want to develop on the new notebook service.

The specific details will be discussed later ..

when I do association query,throw an error

@yaooqinn @liuxunorg

First of all, thank you for your help,but There is also a small problem:

When I do single table query, it works well, but when I do association query, such as “select a.name, a.idno from test.table_user a join table_user B on a.idno = b.idno”, the throw an error:“Error:org.apache.spark.sql.AnalysisException :Reference name is ambiguous,could be: a.name,b.name.;line 1 pos 0 (state=,code=0)”
https://github.com/apache/submarine/issues/313#issue-639673105

[Submarine Spark Security] Permission denied: user [xxx] does not have [USE] privilege on [spark_catalog]

we use Submarine Spark Security to enable to enable kyuubi acl, and we build with -Pspark3.1 on v0.6.0 like #774, but when we submit sqls we always enconter errors bellow, like issue: #533
our spark version is 3.1.2 and ranger version is 2.1, when we try to add policy to spark_catalog and it doesn't work

21/11/18 18:22:49 WARN service.ThriftFrontendService: Error executing statement:
org.apache.kyuubi.KyuubiSQLException: Error operating EXECUTE_STATEMENT: org.apache.submarine.spark.security.SparkAccessControlException: Permission denied: user [xxx] does not have [USE] privilege on [spark_catalog]
        at org.apache.submarine.spark.security.RangerSparkAuthorizer$.$anonfun$checkPrivileges$3(RangerSparkAuthorizer.scala:126)
        at org.apache.submarine.spark.security.RangerSparkAuthorizer$.$anonfun$checkPrivileges$3$adapted(RangerSparkAuthorizer.scala:100)
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at org.apache.submarine.spark.security.RangerSparkAuthorizer$.checkPrivileges(RangerSparkAuthorizer.scala:100)
        at org.apache.spark.sql.catalyst.optimizer.SubmarineSparkRangerAuthorizationExtension.apply(SubmarineSparkRangerAuthorizationExtension.scala:65)
        at org.apache.spark.sql.catalyst.optimizer.SubmarineSparkRangerAuthorizationExtension.apply(SubmarineSparkRangerAuthorizationExtension.scala:40)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:216)
        at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
        at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
        at scala.collection.immutable.List.foldLeft(List.scala:89)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:213)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:205)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:205)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:183)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:183)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:87)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:143)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:143)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:84)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:84)
        at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:95)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:113)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:110)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$simpleString$2(QueryExecution.scala:161)
        at org.apache.spark.sql.execution.ExplainUtils$.processPlan(ExplainUtils.scala:115)
        at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:161)
        at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:206)
        at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:175)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:98)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:228)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
        at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)
        at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.$anonfun$executeStatement$1(ExecuteStatement.scala:92)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.withLocalProperties(ExecuteStatement.scala:143)
        at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.org$apache$kyuubi$engine$spark$operation$ExecuteStatement$$executeStatement(ExecuteStatement.scala:86)
        at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.runInternal(ExecuteStatement.scala:129)
        at org.apache.kyuubi.operation.AbstractOperation.run(AbstractOperation.scala:129)
        at org.apache.kyuubi.session.AbstractSession.runOperation(AbstractSession.scala:95)
        at org.apache.kyuubi.engine.spark.session.SparkSessionImpl.runOperation(SparkSessionImpl.scala:44)
        at org.apache.kyuubi.session.AbstractSession.$anonfun$executeStatement$1(AbstractSession.scala:123)
        at org.apache.kyuubi.session.AbstractSession.withAcquireRelease(AbstractSession.scala:78)
        at org.apache.kyuubi.session.AbstractSession.executeStatement(AbstractSession.scala:120)
        at org.apache.kyuubi.service.AbstractBackendService.executeStatement(AbstractBackendService.scala:61)
        at org.apache.kyuubi.service.ThriftFrontendService.ExecuteStatement(ThriftFrontendService.scala:256)
        at org.apache.hive.service.rpc.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1557)
        at org.apache.hive.service.rpc.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1542)
        at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:38)
        at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
        at org.apache.kyuubi.service.authentication.TSetIpAddressProcessor.process(TSetIpAddressProcessor.scala:36)
        at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:310)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
        
        at org.apache.kyuubi.KyuubiSQLException$.apply(KyuubiSQLException.scala:68)
        at org.apache.kyuubi.engine.spark.operation.SparkOperation$$anonfun$onError$1.applyOrElse(SparkOperation.scala:97)
        at org.apache.kyuubi.engine.spark.operation.SparkOperation$$anonfun$onError$1.applyOrElse(SparkOperation.scala:81)
        at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
        at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.$anonfun$executeStatement$1(ExecuteStatement.scala:100)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.withLocalProperties(ExecuteStatement.scala:143)
        at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.org$apache$kyuubi$engine$spark$operation$ExecuteStatement$$executeStatement(ExecuteStatement.scala:86)
        at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.runInternal(ExecuteStatement.scala:129)
        at org.apache.kyuubi.operation.AbstractOperation.run(AbstractOperation.scala:129)
        at org.apache.kyuubi.session.AbstractSession.runOperation(AbstractSession.scala:95)
        at org.apache.kyuubi.engine.spark.session.SparkSessionImpl.runOperation(SparkSessionImpl.scala:44)
        at org.apache.kyuubi.session.AbstractSession.$anonfun$executeStatement$1(AbstractSession.scala:123)
        at org.apache.kyuubi.session.AbstractSession.withAcquireRelease(AbstractSession.scala:78)
        at org.apache.kyuubi.session.AbstractSession.executeStatement(AbstractSession.scala:120)
        at org.apache.kyuubi.service.AbstractBackendService.executeStatement(AbstractBackendService.scala:61)
        at org.apache.kyuubi.service.ThriftFrontendService.ExecuteStatement(ThriftFrontendService.scala:256)
        at org.apache.hive.service.rpc.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1557)
        at org.apache.hive.service.rpc.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1542)
        at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:38)
        at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
        at org.apache.kyuubi.service.authentication.TSetIpAddressProcessor.process(TSetIpAddressProcessor.scala:36)
        at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:310)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.submarine.spark.security.SparkAccessControlException: Permission denied: user [xxx] does not have [USE] privilege on [spark_catalog]
        at org.apache.submarine.spark.security.RangerSparkAuthorizer$.$anonfun$checkPrivileges$3(RangerSparkAuthorizer.scala:126)
        at org.apache.submarine.spark.security.RangerSparkAuthorizer$.$anonfun$checkPrivileges$3$adapted(RangerSparkAuthorizer.scala:100)
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at org.apache.submarine.spark.security.RangerSparkAuthorizer$.checkPrivileges(RangerSparkAuthorizer.scala:100)
        at org.apache.spark.sql.catalyst.optimizer.SubmarineSparkRangerAuthorizationExtension.apply(SubmarineSparkRangerAuthorizationExtension.scala:65)
        at org.apache.spark.sql.catalyst.optimizer.SubmarineSparkRangerAuthorizationExtension.apply(SubmarineSparkRangerAuthorizationExtension.scala:40)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:216)
        at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
        at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
        at scala.collection.immutable.List.foldLeft(List.scala:89)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:213)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:205)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:205)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:183)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:183)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:87)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:143)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:143)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:84)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:84)
        at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:95)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:113)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:110)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$simpleString$2(QueryExecution.scala:161)
        at org.apache.spark.sql.execution.ExplainUtils$.processPlan(ExplainUtils.scala:115)
        at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:161)
        at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:206)
        at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:175)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:98)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:228)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
        at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)
        at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.$anonfun$executeStatement$1(ExecuteStatement.scala:92)

[Submarine Spark Security] ClassCastException

reference : https://submarine.apache.org/docs/userDocs/submarine-security/spark-security/build-submarine-spark-security-plugin/

spark beeline -> kyuubi -> spark sql
spark 3.1.2
kyuubi 1.2.0
submarine 0.6.0

one:
21/09/16 19:07:33 ERROR spark.SparkSQLEngine: Error start SparkSQLEngine
java.lang.ClassCastException: java.util.ArrayList cannot be cast to scala.collection.Seq
at org.apache.spark.sql.catalyst.optimizer.SubmarineSparkRangerAuthorizationExtension.apply(SubmarineSparkRangerAuthorizationExtension.scala:63)
at org.apache.spark.sql.catalyst.optimizer.SubmarineSparkRangerAuthorizationExtension.apply(SubmarineSparkRangerAuthorizationExtension.scala:40)

two:
Error: Error operating EXECUTE_STATEMENT: java.lang.ClassCastException: org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject cannot be cast to org.apache.submarine.spark.security.SparkPrivilegeObject
at scala.collection.immutable.Stream.foreach(Stream.scala:533)
at org.apache.submarine.spark.security.RangerSparkAuthorizer$.addAccessRequest$1(RangerSparkAuthorizer.scala:74)
at org.apache.submarine.spark.security.RangerSparkAuthorizer$.checkPrivileges(RangerSparkAuthorizer.scala:98)

[Submarine Spark Security] Policy Download Performance

In a large cluster with hundreds of Spark jobs running simultaneously and tens of thousands of ranger policies created, when restarting the cluster, these Spark jobs will try to download ranger policies at the same time and Ranger Admin will be under a huge pressure. Can Ranger Admin withstand such a huge load ? Does anyone know how to deal with this special situation?

Combine k8s and database transactions

The current k8s and database transactions are separate, which will result in situations where a commit/processing failure of k8s will not trigger a rollback of a database transaction.
We need to design a unified transaction management or processing logic to ensure submarine-server to be able to handle exceptions and transaction rollbacks correctly.

For example, by using the @Transactional, or by putting the relevant operations codes/methods into one of our transaction services.

[DESIGN] Submarine can provide the function of notebook startup and shutdown

由于篇幅问题,考虑到英文可能不一定能表达清楚很多执行的细节,暂时以中文进行描述。

目前的 Submarine 每次创建一个新的 NoteBook 都会在 K8s 的CRD资源上创建一个新的实例。基于 notebook-controller 的服务发现的机制,会创建一个 Jupyter 的 Statefuleset 资源和 Service 资源。这是一个正常的处理流程,用户在 NoteBook 启动后可在 Submarine 上点击名称访问这个实例。
但是每一个实例在不同的使用者创建并使用后并不是一直在运行代码的,很多情况下在执行了一个idea或者验证了一些预想的情景和数据后,这个 NoteBook 实例就会被搁置。虽然用户稍后退出了 Submarine,但是这个实例仍然存在,这对于集群来讲就是一个被占用的无用的 Pod。

我目前的考虑是希望 Submarine 能提供一个针对 NoteBook 启动和停止的功能,这个功能会分为2个部分来实现。

  1. 在目前 NoteBook 功能的列表操作部分添加 启动/停止 按钮。用户在确认不使用该 NoteBook 时候可关闭 Pod,但是相关的 workspace 等资源不进行回收;再次启用后可继承原来的资源重启服务。
  2. 确认一种自动停止的机制,在用户指定时间不操作后,Submarine 自动停止该实例。在用户再次需要时可再启动 NoteBook。

针对 1 方案,可如描述中提到的添加 启动/停止 按钮,让用户自行控制。
针对 2 方案,目前已知的条件下,可基于 Jupyter 的 metric 指标利用 Promethues 进行采集,然后 Submarine 服务会轮询指标情况,针对指定时长(假定为30分钟)没有过执行的 Pod 做资源回收。我在自己的测试环境下针对既有的 Jupyter 服务做了如下的测试和方案验证:

Jupyter 容器基于 notebook-controller 做定制化资源注册,但是 Service 缺少 lables,暂无法使用 Promethues OperatorServiceMonitor 做服务发现,故使用 PodMonitor做验证。在这里,我创建了一个名称为 test 的 NoteBook 实例。PodMonitor 的描述文件如下:

apiVersion: monitoring.coreos.com/v1
kind: PodMonitor
metadata:
  name: notebook-test
  namespace: submarine
  labels:
    k8s-app-pod: 'true'
spec: 
  podMetricsEndpoints:
    - path: /notebook/submarine/test/metrics
      port: notebook-port
  namespaceSelector:
    matchNames:
      - submarine
  selector:
    matchLabels:
      statefulset: test

Xnip2021-12-30_20-32-16

如图可以看到,采集的指标内,kernel_currently_running_total{type="python3"} 资源可以作为系统运行时的一个判断。
同时,我们也可以添加独立的 Service 做到相同的效果:

kind: Service
apiVersion: v1
metadata:
  name: notebook-test-metric
  namespace: submarine
  labels:
    statefulset: test
spec:
  ports:
    - name: http-test
      protocol: TCP
      port: 8888
      targetPort: 8888
  selector:
    statefulset: test
  type: ClusterIP
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
  name: notebook-test
  namespace: submarine
  labels:
    k8s-app-service: 'true'
spec:
  endpoints:
    - path: /notebook/submarine/test/metrics
      port: http-test
  namespaceSelector:
    matchNames:
      - submarine
  selector:
    matchLabels:
      statefulset: test

基于服务发现后的 Promethues 指标如下如所示:
Xnip2021-12-30_20-33-44

[Submarine Spark Security] some error with build support spark-3.1

edit the pom file of spark-security project:

    <profile>
      <id>spark-3.1</id>
      <properties>
        <spark.version>3.1.1</spark.version>
        <scala.version>2.12.10</scala.version>
        <scala.binary.version>2.12</scala.binary.version>
        <!--<scalatest.version>3.2.0</scalatest.version>-->
        <spark.compatible.version>3</spark.compatible.version>
        <commons-lang3.version>3.10</commons-lang3.version>
        <jackson-databind.version>2.10.5</jackson-databind.version>
        <jackson-annotations.version>2.10.5</jackson-annotations.version>
      </properties>
    </profile>

build command:

 mvn clean package -Pspark-3.1 -Pranger-2.1 -DskipTests

error message:

[ERROR] /spark-security/spark-3/src/main/scala/org/apache/submarine/spark/security/parser/SubmarineSqlParser.scala:113: value parseRawDataType is not a member of org.apache.spark.sql.catalyst.parser.ParserInterface
[ERROR]     delegate.parseRawDataType(sqlText)
[ERROR]              ^
[ERROR] /spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineSparkRangerAuthorizationExtension.scala:60: pattern type is incompatible with expected type;
 found   : org.apache.spark.sql.execution.command.ResetCommand.type
 required: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
Note: if you intended to match against the class, try `case ResetCommand(_)`
[ERROR]       case ResetCommand => SubmarineResetCommand
[ERROR]            ^
[ERROR] two errors found

Ranger admin returning 401 when trying to pull policy in kerberized cluster using livy and yarn

I built the submarine security plugin for ranger 1.2 and spark 2.3. When I'm using pyspark and spark-submit it is able to retrieve the hive policy file from ranger and store it to the file system. However, when I'm using Jupyter and livy to connect to an HDP 3.1.5 cluster it does not succeed in getting the policy file. Instead I get this error

WARN RangerAdminRESTClient: Error getting policies. secureMode=true, user=xxxxxxx (auth:SIMPLE), response={"httpStatusCode":401,"statusCode":0}, serviceName=XXXX_hive

My cluster is kerberized and when livy starts a yarn application it uses a proxy id to do it. So I believe there is some level of impersonation occurring. In the yarn application there is no tgt created for the userid that is running the application. Incidentally when I run in the scenario above, where I'm using a pyspark shell or spark-submit I do have a tgt for the userid. I believe that because I have a tgt in the scenario that works, I am then able to authenticate to ranger and successfully pull the policy. However in the scenario where the tgt does not exist I'm not able to authenticate to Ranger and it gives a 401. So my question is, how is ranger admin called to pull the policy, and what credentials are used to log in? Can you point me to the code that does the authentication? Have any ideas on how to fix this?

[DESIGN] Add k8s mock client and server test case

At present, submarine's tests on k8s services are mainly in submarine-k8s packages. Because k8s service is a complex service composition, it is difficult to write unit tests of many k8s related functions (like submarine-server-submitter package) in a simple way.

I think that since most of the methods in submarine-server-submitter package are processed by k8s API similar to CRUD, it should be necessary to keep/create enough test cases to test and verify these methods. I have just search some k8s client related test cases informations. In fact, I think the main problem is how to support a mock server, so that we can break away from the constraints of k8s and write test cases freely.
I think in this issue kubernetes-client/java#1842 we can get some inspiration. Most of k8s apis can simulate the return messages based on the http request, so we can refer to the methods mentioned in it to change the existing test cases.

Mock Client and Server

I've write a test case about how to test k8s client by a mock client and server (WireMockRule). The core of the whole test case is to use fake message to response the client request.
It refers to k8s java official test case

The following codes can be found in https://github.com/shangyuantech/submarine/blob/SUBMARINE-1174/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/SubmitterK8sMockApiTest.java

public class SubmitterK8sMockApiTest {

  private K8sClient k8sClient;

  @Rule
  public WireMockRule wireMockRule = K8sMockClient.getWireMockRule();

  @Before
  public void setup() throws IOException {
    this.k8sClient = new K8sMockClient(post(urlPathEqualTo("/api/v1/namespaces/foo/configmaps"))
            .withHeader("Content-Type", new EqualToPattern("application/json; charset=UTF-8"))
            .willReturn(
                    aResponse()
                            .withStatus(200)
                            .withBody("{\"metadata\":{\"name\":\"bar\",\"namespace\":\"foo\"}}")));
  }

  @Test
  public void testApplyConfigMap() {
    V1ConfigMap configMap = new V1ConfigMap()
            .apiVersion("v1")
            .metadata(new V1ObjectMeta().namespace("foo").name("bar"))
            .data(Collections.singletonMap("key1", "value1"));

    KubernetesApiResponse<V1ConfigMap> configMapResp = k8sClient.getConfigMapClient().create(configMap);
    V1ConfigMap rtnConfigmap = configMapResp.getObject();

    assertNotNull(rtnConfigmap);
    assertEquals(rtnConfigmap.getMetadata().getNamespace(), "foo");
    assertEquals(rtnConfigmap.getMetadata().getName(), "bar");
  }
}

In this case, I create a K8sMockClient. In this class I did some initialization of URL and WireMockRule for the client.
The following codes can be found in https://github.com/shangyuantech/submarine/blob/SUBMARINE-1174/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/client/K8sMockClient.java

private static String getResourceFileContent(String resource) {
    File file = new File(Objects.requireNonNull(
            K8sMockClient.class.getClassLoader().getResource(resource)).getPath()
    );
    try {
      return new String(Files.readAllBytes(Paths.get(file.toString())));
    } catch (IOException e) {
      e.printStackTrace();
      return null;
    }
  }

  private static final String DISCOVERY_API = getResourceFileContent("client/discovery-api.json");
  private static final String DISCOVERY_APIV1 = getResourceFileContent("client/discovery-api-v1.json");
  private static final String DISCOVERY_APIS = getResourceFileContent("client/discovery-apis.json");

  private static final WireMockRule wireMockRule = new WireMockRule(8384);

  public static WireMockRule getWireMockRule() {
    return wireMockRule;
  }

  public K8sMockClient() throws IOException {
    apiClient = new ClientBuilder().setBasePath("http://localhost:" + 8384).build();
    wireMockRule.stubFor(
            get(urlPathEqualTo("/api"))
                    .willReturn(
                            aResponse()
                                    .withStatus(200)
                                    .withBody(DISCOVERY_API)));
    wireMockRule.stubFor(
            get(urlPathEqualTo("/apis"))
                    .willReturn(
                            aResponse()
                                    .withStatus(200)
                                    .withBody(DISCOVERY_APIS)));
    wireMockRule.stubFor(
            get(urlPathEqualTo("/api/v1"))
                    .willReturn(
                            aResponse()
                                    .withStatus(200)
                                    .withBody(DISCOVERY_APIV1)));
    coreApi = new CoreV1Api();
    appsV1Api = new AppsV1Api();
    customObjectsApi = new CustomObjectsApi();
    configMapClient =
            new GenericKubernetesApi<>(
                    V1ConfigMap.class, V1ConfigMapList.class,
                    "", "v1", "configmaps", apiClient);
  }

  public K8sMockClient(MappingBuilder... mappingBuilders) throws IOException {
    this();
    addMappingBuilders(mappingBuilders);
  }

  public void addMappingBuilders(MappingBuilder... mappingBuilders) {
    // add MappingBuilder to WireMockRule
    for (MappingBuilder mappingBuilder : mappingBuilders) {
      wireMockRule.stubFor(mappingBuilder);
    }
  }

When we add a new test case, we can first confirm the response information (usually the JSON information of the resource) according to the resources we need, and then define and register the URL, RequestMethod, ResponseCode and ResponseBody into WireMockRule.

operation not allowed when using spark-security in my thrift server, maybe lost the table owner?

I import spark-security into my thrift server with ranger 2.0 and spark 2.4.5。
But I can not insert into a table created by myself even select.

Error Msg:

2021-05-13 17:59:08,180 DEBUG [OperationManager-Background-Pool: Thread-164] org.apache.ranger.plugin.policyengine.RangerPolicyEngineImpl:516  - <== RangerPolicyEngineImpl.evaluatePolicies(RangerAccessRequestImpl={resource={RangerResourceImpl={ownerUser={null} elements={database=default; table=t1; } }} accessType={select} user={user1} userGroups={hive hadoop ficommon default_1000 } accessTime={Thu May 13 17:59:08 CST 2021} clientIPAddress={null} forwardedAddresses={} remoteIPAddress={null} clientType={null} action={QUERY} requestData={null} sessionId={null} resourceMatchingScope={SELF} clusterName={} clusterType={} context={token:USER={user1} } }, policyType=0): RangerAccessResult={isAccessDetermined={false} isAllowed={false} isAuditedDetermined={true} isAudited={true} policyType={0} policyId={-1} zoneName={null} auditPolicyId={3} policyVersion={null} evaluatedPoliciesCount={4} reason={null} additionalInfo={}}

....

org.apache.ranger.authorization.spark.authorizer.SparkAccessControlException: Permission denied: user [user1] does not have [SELECT] privilege on [default/t1]
        at org.apache.ranger.authorization.spark.authorizer.RangerSparkAuthorizer$$anonfun$checkPrivileges$1.apply(RangerSparkAuthorizer.scala:123)
        at org.apache.ranger.authorization.spark.authorizer.RangerSparkAuthorizer$$anonfun$checkPrivileges$1.apply(RangerSparkAuthorizer.scala:98)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.ranger.authorization.spark.authorizer.RangerSparkAuthorizer$.checkPrivileges(RangerSparkAuthorizer.scala:98)
        at org.apache.spark.sql.catalyst.optimizer.RangerSparkAuthorizerExtension.apply(RangerSparkAuthorizerExtension.scala:62)
        at org.apache.spark.sql.catalyst.optimizer.RangerSparkAuthorizerExtension.apply(RangerSparkAuthorizerExtension.scala:36)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:87)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:84)
        at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
        at scala.collection.immutable.List.foldLeft(List.scala:84)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:84)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:76)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:76)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$optimizedPlan$1.apply(QueryExecution.scala:74)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$optimizedPlan$1.apply(QueryExecution.scala:74)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:789)

I set the table owner into the RangerSparkResource , then the problem disappeard.

[Submarine Spark Security] when run sql have expression will return ”UnresolvedAttribute.exprId“ error

env:
spark-3.1.1
ranger-2.1
submarine-security-0.6.0

sql code:

spark-sql> create or replace temporary view user_city_temp as 
select id,case when if(city_zip_active >0,city_zip_active,if(city_zip_check >0 ,city_zip_check,city_zip_phone)) in ('0100','0200','0300','0400') then if(city_zip_active >0,city_zip_active,if(city_zip_check >0 ,city_zip_check,city_zip_phone)) else 'other' end as city from user_city;

colum_name           type
city_zip_phone	int	NULL
city_zip_check	int	NULL
city_zip_active	int	NULL

it will return a error

org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to exprId on unresolved object, tree: 'city_zip_active
	at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.exprId(unresolved.scala:157)
	at org.apache.spark.sql.catalyst.optimizer.SubmarineDataMaskingExtension$$anonfun$collectAllAliases$1$$anonfun$applyOrElse$2.$anonfun$applyOrElse$3(SubmarineDataMaskingExtension.scala:191)
	at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
	at org.apache.spark.sql.catalyst.optimizer.SubmarineDataMaskingExtension$$anonfun$collectAllAliases$1$$anonfun$applyOrElse$2.applyOrElse(SubmarineDataMaskingExtension.scala:191)
	at org.apache.spark.sql.catalyst.optimizer.SubmarineDataMaskingExtension$$anonfun$collectAllAliases$1$$anonfun$applyOrElse$2.applyOrElse(SubmarineDataMaskingExtension.scala:189)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$2(TreeNode.scala:352)

it is ok run in spark-3.1.1 with not spark-security

how to authorize SparkSQL with submarine/spark-security?

Here is my question,I was using spark security plugin to authorize SparkSQL,but when i execute command "show tables " or any sql command ,it report below errors:

22/01/11 10:23:26 WARN PolicyRefresher: cache file does not exist or not readable '/etc/ranger/sparkServer/policycache/sparkSql_sparkServer.json'
image

I have no idea why would this happen even though i configured a sparkService in hive in ranger?

image

image

Any sugestions could give me and will be appreciated!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.