googlecloudplatform / flink-on-k8s-operator Goto Github PK
View Code? Open in Web Editor NEW[DEPRECATED] Kubernetes operator for managing the lifecycle of Apache Flink and Beam applications.
License: Apache License 2.0
[DEPRECATED] Kubernetes operator for managing the lifecycle of Apache Flink and Beam applications.
License: Apache License 2.0
Provide a property in the custom resource to enable creating savepoints periodically.
Currently we determine JM service status purely based on itself, it could be considered as ready when its target pods are not, i.e., there is no corresponding endpoints. It would be more accurate to make sure corresponding endpoints are available.
Currently the operator uses env variable EXTRA_FLINK_PROPERTIES
to pass Flink properties to the container, but the Flink image actuall uses FLINK_PROPERTIES
, see docker-flink/docker-flink#82.
We should rename EXTRA_FLINK_PROPERTIES
to FLINK_PROPERTIES
.
Currently when the job of a job cluster finishes or fails, the controller immediately starts deleting the JM/TM resources, we want to delay the process for a while (e.g., 5 mins by default) to allow checking logs and metrics. Maybe make the delay configurable as a property of the CR.
To make it easier to setup the development environment, we can add a Dockerfile which creates an image with required build dependencies (Go 1.12+, Kustomize, Kubebuilder...). Then developers can simply mount the current source directory into the container and run build and tests.
Sometimes I notice after deleting and recreating a CR very fast, the job pod from the old CR was in a crash loop for a while, and there were 2 flink jobs found from the Flink get jobs API, which was unexpected. We need to avoid this race condition. The approach I am thinking now is to generate a random ID suffix for child resources of the CR, like what deployment does for pods.
When following the developer guide, I created cluster using Minikube and registered image on GCR, then I ran into this problem where pod status will always be ImagePullbackError. This is the current limitation and it should work to create cluster with GKE.
Sometimes I notice that JM/TM deployment status changes from Ready
to NotReady
, this is unexpected, need to investigate why.
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal StatusUpdate 2m47s FlinkOperator Cluster status: Creating
Normal StatusUpdate 2m45s FlinkOperator JobManager deployment status: Ready
Normal StatusUpdate 2m45s FlinkOperator JobManager service status: Ready
Normal StatusUpdate 2m45s FlinkOperator TaskManager deployment status: Ready
Normal StatusUpdate 2m45s FlinkOperator Cluster status changed: Creating -> Running
Normal StatusUpdate 2m45s FlinkOperator JobManager deployment status changed: Ready -> NotReady
Normal StatusUpdate 2m45s FlinkOperator Cluster status changed: Running -> Reconciling
Normal StatusUpdate 2m43s FlinkOperator TaskManager deployment status changed: Ready -> NotReady
Normal StatusUpdate 2m26s FlinkOperator JobManager deployment status changed: NotReady -> Ready
Normal StatusUpdate 2m23s (x2 over 2m23s) FlinkOperator (combined from similar events): Cluster status changed: Reconciling -> Running
We want to support cancel job with auto savepoint. The approach I am thinking about is to add a property cancelled
in jobSpec and support updating CR with the property. After detecting the cancelled
set to true, the controller calls Flink API to take savepoint, then cancel the job. We might introduce another property to control whether to force cancel the job if taking savepoint failed.
Sometimes job submission fails for akka.pattern.AskTimeoutException, then retry succeeded.
+exec /docker-entrypoint.sh /opt/flink/bin/flink run --jobmanager flinkjobcluster-sample-jobmanager:8081 --class org.apache.beam.examples.WordCount --parallelism 2 /opt/flink/job/word-count-beam-bundled-0.1.jar --runner=FlinkRunner --inputFile=./README.txt --output=gs://dagang-test/beam/wordcount/output
Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.
Starting execution of program
------------------------------------------------------------
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Pipeline execution failed
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
Caused by: java.lang.RuntimeException: Pipeline execution failed
at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:116)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:301)
at org.apache.beam.examples.WordCount.runWordCount(WordCount.java:185)
at org.apache.beam.examples.WordCount.main(WordCount.java:192)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
... 12 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the execution result. (JobID: 606c01ee993b6870c27a7e9a703e2f93)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:255)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326)
at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:139)
at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:113)
... 21 more
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:382)
at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:263)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side:
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#-1594246162]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.
at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279)
at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283)
at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235)
at java.lang.Thread.run(Thread.java:748)
End of exception on server side>]
at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389)
at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373)
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
... 4 more
Currently after a session cluster is up and running, we have to submit jobs to the cluster through Flink's API endpoint, which usually means the user has to install Flink CLI on their local machine.
We could consider adding a new CRD and controller for job submission, after a job CR is created the controller gets notified and submit the job to the cluster automatically, it also polls the job status and update the CR. We might also migrate the current job submission to job cluster to the approach as well.
Add liveness probe for JM/TM containers, make the component status reflected in the CR more reliable.
Currently when taking savepoint, users specify a savepoints (parent) dir, Flink automatically generate a savepoint ID and put data in the corresponding subdir. When starting a job from a savepoint, you have to specify the savepoint ID, which means when using the operator, the job args part of the CR has to be changed to point to the specific savepoint.
We want to allow users to only specify savepoints dir in jobSpec, and the operator automatically selects (the latest) savepoint in the dir. This will in turn allow us to update job seamlessly, because savepoint ID becomes transparent.
Provide a Dockerfile with GCS connector integration based on the official Flink image, then the image could be used to support checkpoint and savepoint to GCS.
Now reconcile is triggered by watched resources status change only, we want to trigger reconcile periodically as well to make sure there is not missed events.
When setting JobManager service access scope to VPC, GKE creates a set of GCE resources (forwarding rule, target http proxy, url map, backend service) for the service. I noticed after deleting the service (along with the cluster CR), the GCE resources are not deleted automatically. Need to investigate.
Support scaling TaskManagers up/down by updating the replicas
property of the CR.
Currently, the operator doesn't reject unknown fields in the CR which makes it error-prone. We need to validate this.
Currently, the Flink job submission client runs as a Kubernetes job, which uses the same container as the Flink JobManager and TaskManager. The job jar has to be locally available in the job container, which means users need to create a custom image based on the Flink official image and add their job jar.
We want to support remote job jar and let the job submission client automatically downloads the job jar before submitting it to Flink.
Currently, the Flink job client runs as a Kubernetes job, which uses the same container as the Flink JobManager and TaskManager. There is a benefit of separate the two. Some functionalities such as downloading remote Job jar can be supported in the job client image, while using the official Flink image for JM/TM.
We could create a dedicated Docker image for the job client, we could also use the operator image (by adding the capability of job submission) for the job client. The latter approach seems to be easier to manage for end users.
No need to get Flink Job ID from Flink API again once it is already set in the custom resource.
Currently, there are 4 steps in the Reconcile() method:
The sequence is actually not quite right. A better sequence would be:
It would be nice to be given the option to handle the flink cluster when the job finishes. Three options will be possible.
Delete entire cluster
If the cluster is no longer needed after the job finishes.
Leave only jobmanager
If you want to leave only the job manager for checking the dashboard after the job is finished. If all logs are being collected in a separate repository, only the job manager may be left to check the dashboard.
Leave the entire cluster
If you want to apply the task-local recovery feature when replaying a job or restarting a failed job, you need to preserve the cluster.
And you may want to preserve the cluster to check the running environment and logs for debugging purposes.
We need an easy way to get an infinite input stream to test long-running stream processing. Sometimes setting up Kafka or Pub/Sub is too complex for test purposes. Flink socketTextStream seems to be an option. We can consider provide a simple TCP server which serves infinite text streams.
Noticed a strange behavior in the log, in the controller for job clusters, getting Flink job ID always fails when job is running and succeeds when job is finished.
We can do a test with session cluster, launch a client container manually calls the jobs endpoint of the Flink REST API.
Currently when the user issues a request to delete the session/job cluster CR, the controller simply deletes all the resources. We need to call Flink API to cancel jobs and (optionally) take savepoints before actually deleting the resources.
Looking at a docker image, it downloads provided jar file from GCP using gsutil and it looks like it is using default permissions of the kube node.
There should be a way to provide a key instead (as it is possible for task and job managers).
Can be as simple as:
gcloud auth activate-service-account --key-file=...
Sometimes we found error updating status in the controller, saying "the object has been modified; please apply your changes to the latest version and try again". Need to investigate the cause and fix it.
example:
2019-10-30T20:31:49.207Z ERROR controllers.FlinkCluster Failed to update cluster status {"cluster": "default/flinkjobcluster-sample", "error": "Operation cannot be fulfilled on flinkclusters.flinkoperator.k8s.io \"flinkjobcluster-sample\": the object has been modified; please apply your changes to the latest version and try again"}
github.com/go-logr/zapr.(*zapLogger).Error
/root/go/pkg/mod/github.com/go-logr/[email protected]/zapr.go:128
github.com/googlecloudplatform/flink-operator/controllers.(*FlinkClusterHandler).reconcile
/workspace/controllers/flinkcluster_controller.go:141
github.com/googlecloudplatform/flink-operator/controllers.(*FlinkClusterReconciler).Reconcile
/workspace/controllers/flinkcluster_controller.go:74
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).reconcileHandler
/root/go/pkg/mod/sigs.k8s.io/[email protected]/pkg/internal/controller/controller.go:216
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).processNextWorkItem
/root/go/pkg/mod/sigs.k8s.io/[email protected]/pkg/internal/controller/controller.go:192
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).worker
/root/go/pkg/mod/sigs.k8s.io/[email protected]/pkg/internal/controller/controller.go:171
k8s.io/apimachinery/pkg/util/wait.JitterUntil.func1
/root/go/pkg/mod/k8s.io/[email protected]/pkg/util/wait/wait.go:152
k8s.io/apimachinery/pkg/util/wait.JitterUntil
/root/go/pkg/mod/k8s.io/[email protected]/pkg/util/wait/wait.go:153
k8s.io/apimachinery/pkg/util/wait.Until
/root/go/pkg/mod/k8s.io/[email protected]/pkg/util/wait/wait.go:88
2019-10-30T20:31:49.208Z ERROR controller-runtime.controller Reconciler error {"controller": "flinkcluster", "request": "default/flinkjobcluster-sample", "error": "Operation cannot be fulfilled on flinkclusters.flinkoperator.k8s.io \"flinkjobcluster-sample\": the object has been modified; please apply your changes to the latest version and try again"}
github.com/go-logr/zapr.(*zapLogger).Error
/root/go/pkg/mod/github.com/go-logr/[email protected]/zapr.go:128
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).reconcileHandler
/root/go/pkg/mod/sigs.k8s.io/[email protected]/pkg/internal/controller/controller.go:218
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).processNextWorkItem
/root/go/pkg/mod/sigs.k8s.io/[email protected]/pkg/internal/controller/controller.go:192
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).worker
/root/go/pkg/mod/sigs.k8s.io/[email protected]/pkg/internal/controller/controller.go:171
k8s.io/apimachinery/pkg/util/wait.JitterUntil.func1
/root/go/pkg/mod/k8s.io/[email protected]/pkg/util/wait/wait.go:152
k8s.io/apimachinery/pkg/util/wait.JitterUntil
/root/go/pkg/mod/k8s.io/[email protected]/pkg/util/wait/wait.go:153
k8s.io/apimachinery/pkg/util/wait.Until
/root/go/pkg/mod/k8s.io/[email protected]/pkg/util/wait/wait.go:88
https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/pull/2/files added sidecars to TM pods, but the doc was not updated.
Flink job can take various forms. It would be nice if the Flink job spec had extensibility to support various submission types. One example would be if apache beam python is supported as well as flink jars.
Next, it would be nice if various types of resource staging were supported. It would be good to download the resource if it is located remotely, or extract it if it is archived.
In summary:
Various Job submit type support
- Flink application jar
- Apache beam python
Staging processing by job resource type
- Remote file download
- Archived file extraction
We found a case that the first job submission was successful but the job client timed out somehow, then the operator submitted the job again which caused 2 jobs running in a job cluster.
2019-11-01T21:49:18.413Z INFO controllers.FlinkCluster Observed Flink job status list {"cluster": "default/flinkjobcluster-sample", "jobs": [{"ID":"cd8dca05e24e8c8f55b56bb9e8b40e61","Status":"RUNNING"},{"ID":"8ba52ec4edb730bdcc340b1f07d30a05","Status":"RUNNING"}]}
2019-11-01T21:49:18.413Z ERROR controllers.FlinkCluster {"cluster": "default/flinkjobcluster-sample", "jobs": [{"ID":"cd8dca05e24e8c8f55b56bb9e8b40e61","Status":"RUNNING"},{"ID":"8ba52ec4edb730bdcc340b1f07d30a05","Status":"RUNNING"}], "error": "more than one Flink job were found"}
Some users are asking for using Pub/Sub with Flink on SO. We need to test and document the integration with this operator.
Currently, to upgrade a streaming job cluster or session cluster, you must delete the existing job or cluster and then create a new CR. In this case, the order is as follows.
If the operator can automate the upgrade, it will automate the process.
Considerations by upgrade types:
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.