pyflink / playgrounds Goto Github PK
View Code? Open in Web Editor NEWProvide docker environment and examples for PyFlink
License: Apache License 2.0
Provide docker environment and examples for PyFlink
License: Apache License 2.0
当我运行:
docker-compose exec jobmanager ./bin/flink run -py /opt/examples/1-word_count.py
报错:
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:87) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
尝试过将python切换成python3,没有解决问题,有没有人遇到同样错误的?
docker 安装失败
Hi,
I followed the readme. After start docker, run example, there is an error occurred.
Anyone can help ?
ubuntu@VM-0-8-ubuntu:~/playgrounds$ docker-compose exec jobmanager ./bin/flink run -py /opt/examples/table/1-word_count.py
Job has been submitted with JobID 0a2a94133f5b8d1216e99b59f8e6d843
Traceback (most recent call last):
File "/opt/examples/table/1-word_count.py", line 34, in <module>
t_env.execute("1-word_count")
File "/opt/flink/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 1315, in execute
File "/opt/flink/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__
File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 147, in deco
File "/opt/flink/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o4.execute.
: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 0a2a94133f5b8d1216e99b59f8e6d843)
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:123)
at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
at org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:50)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1459)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 0a2a94133f5b8d1216e99b59f8e6d843)
at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:125)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:665)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:394)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:123)
... 19 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.table.api.TableException: Exception in writeRecord
at org.apache.flink.table.filesystem.FileSystemOutputFormat.writeRecord(FileSystemOutputFormat.java:131)
at org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.invoke(OutputFormatSinkFunction.java:87)
at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:49)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:112)
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:93)
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at HashAggregateWithKeys$71.endInput(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:91)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.endInput(OperatorChain.java:423)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:419)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:661)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Mkdirs failed to create /opt/examples/table/output/word_count_output/.staging_1627632423381/cp-0/task-0
at org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem.java:262)
at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:130)
at org.apache.flink.table.filesystem.FileSystemTableSink$4.open(FileSystemTableSink.java:456)
at org.apache.flink.table.filesystem.PartitionWriter$Context.createNewOutputFormat(PartitionWriter.java:61)
at org.apache.flink.table.filesystem.SingleDirectoryWriter.createFormat(SingleDirectoryWriter.java:57)
at org.apache.flink.table.filesystem.SingleDirectoryWriter.write(SingleDirectoryWriter.java:67)
at org.apache.flink.table.filesystem.FileSystemOutputFormat.writeRecord(FileSystemOutputFormat.java:129)
... 19 more
org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: java.lang.RuntimeException: Python process exits with code: 1
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
... 13 more
(base) @MacBook-Pro-2 playgrounds % docker-compose exec jobmanager ./bin/flink run -py /opt/examples/2-from_kafka_to_kafka.py
Traceback (most recent call last):
File "/opt/examples/2-from_kafka_to_kafka.py", line 98, in
from_kafka_to_kafka_demo()
File "/opt/examples/2-from_kafka_to_kafka.py", line 21, in from_kafka_to_kafka_demo
register_rides_source(st_env)
File "/opt/examples/2-from_kafka_to_kafka.py", line 34, in register_rides_source
Kafka()
File "/opt/flink/opt/python/pyflink.zip/pyflink/table/descriptors.py", line 900, in init
File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 184, in wrapped_call
TypeError: Could not found the Java class 'Kafka'. The Java dependencies could be specified via command line argument '--jarfile' or the config option 'pipeline.jars'
org.apache.flink.client.program.ProgramAbortException
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
There is an errors "RROR [13/15] ADD ./flink-1.13.3 /opt/flink " when I execute docker build. would you like to how to fix the issues. thanks!
[internal] load metadata for docker.io/library/openjdk:8-jre:
[13/15] ADD ./flink-1.13.3 /opt/flink:
[14/15] COPY ./flink-sql-connector-kafka_2.11-1.13.3.jar /opt/flink/lib/:
[13/15] ADD ./flink-1.13.3 /opt/flink:
[14/15] COPY ./flink-sql-connector-kafka_2.11-1.13.3.jar /opt/flink/lib/:
failed to compute cache key: "/flink-sql-connector-kafka_2.11-1.13.3.jar" not found: not found
I noticed there are better enchainment for Flink 1.11 especially for Kubernetes deployment. Is it possible upgrade current playground to support flink 1.11 version?
I am looking forward to get your feedback soon.
George Hu
[email protected]
When trying to run the examples that use the file system as a sink I get permission errors:
Caused by: java.io.FileNotFoundException: /opt/examples/data/word_count_output (Permission denied)
This happens for examples 1,3 and 4.
But the other examples work fine, (i.e. 5-word_count-mysql.py ).
I have not done anything other than docker-compose up and run the example code.
It looks like the flink
user, which the java process runs as does not have access ot the mounted volumes.
2020-05-25 17:30:26
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.FileNotFoundException: /opt/examples/data/word_count_output (Permission denied)
at java.io.FileOutputStream.open0(Native Method)
at java.io.FileOutputStream.open(FileOutputStream.java:270)
at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
at java.io.FileOutputStream.<init>(FileOutputStream.java:162)
at org.apache.flink.core.fs.local.LocalDataOutputStream.<init>(LocalDataOutputStream.java:47)
at org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem.java:273)
at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:126)
at org.apache.flink.api.common.io.FileOutputFormat.open(FileOutputFormat.java:248)
at org.apache.flink.api.java.io.TextOutputFormat.open(TextOutputFormat.java:88)
at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:205)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
container Info:
(base) [root@iz8vb6evwfagx3tyjx4fl8z data]# docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
efc1d310eb14 pyflink/playgrounds:1.10.0 "/docker-entrypoint.…" 3 weeks ago Up 3 weeks 6121-6123/tcp, 8081/tcp playgrounds_taskmanager_1
af466f98e914 wurstmeister/kafka:2.12-2.2.1 "start-kafka.sh" 3 weeks ago Up 3 weeks 0.0.0.0:32768->9092/tcp playgrounds_kafka_1
637b020df406 mysql "docker-entrypoint.s…" 3 weeks ago Up 2 days 0.0.0.0:3306->3306/tcp, 33060/tcp playgrounds_db_1
6cc0b28bb45a pyflink/playgrounds:1.10.0 "/docker-entrypoint.…" 3 weeks ago Up 3 weeks 6123/tcp, 8081/tcp, 0.0.0.0:8003->8088/tcp playgrounds_jobmanager_1
2ef57cfa9494 docker.elastic.co/elasticsearch/elasticsearch-oss:6.3.1 "/usr/local/bin/dock…" 3 weeks ago Up 3 weeks 0.0.0.0:9200->9200/tcp, 0.0.0.0:9300->9300/tcp playgrounds_elasticsearch_1
8087b3554674 wurstmeister/zookeeper:3.4.6 "/bin/sh -c '/usr/sb…" 3 weeks ago Up 3 weeks 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp playgrounds_zookeeper_1
cfbfb51770b8 adminer "entrypoint.sh docke…" 3 weeks ago Up 3 weeks 0.0.0.0:8080->8080/tcp playgrounds_adminer_1
42a9f050933b puckel/docker-airflow "/entrypoint.sh webs…" 4 weeks ago Up 4 weeks 5555/tcp, 8793/tcp, 0.0.0.0:8004->8080/tcp gallant_villani
sql for table and data:
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for stu
-- ----------------------------
DROP TABLE IF EXISTS `stu`;
CREATE TABLE `stu` (
`id` int(10) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '自增id',
`name` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '学生名字',
`school` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '学校名字',
`nickname` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '学生小名',
`age` int(11) NOT NULL COMMENT '学生年龄',
`class_num` int(11) NOT NULL COMMENT '班级人数',
`score` decimal(4, 2) NOT NULL COMMENT '成绩',
`phone` bigint(20) NOT NULL COMMENT '电话号码',
`email` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '家庭网络邮箱',
`ip` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT 'IP地址',
`address` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '家庭地址',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1000001 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of stu
-- ----------------------------
INSERT INTO `stu` VALUES (1, 'Ukq', '复旦附中', '敌法师', 12, 59, 38.89, 15573293938, '[email protected]', '203.0.117.157', '宁夏回族自治区帆县静安何街f座 126696');
INSERT INTO `stu` VALUES (2, 'Ukq', '上海中学', '幽鬼', 12, 59, 38.89, 15573293938, '[email protected]', '203.0.117.157', '宁夏回族自治区帆县静安何街f座 126696');
INSERT INTO `stu` VALUES (3, 'Ukq', '人和中心', '敌法师', 12, 59, 38.89, 15573293938, '[email protected]', '203.0.117.157', '宁夏回族自治区帆县静安何街f座 126696');
INSERT INTO `stu` VALUES (4, 'Ukq', '广东中学', '影魔', 12, 59, 38.89, 15573293938, '[email protected]', '203.0.117.157', '宁夏回族自治区帆县静安何街f座 126696');
INSERT INTO `stu` VALUES (5, 'kulwWkwsZ', '广东中学', '鬼泣', 12, 55, 99.59, 15789193091, '[email protected]', '192.4.223.99', '**维吾尔自治区郑州市梁平丁路a座 750587');
INSERT INTO `stu` VALUES (6, 'kulwWkwsZ', '猪场', '影魔', 12, 55, 99.59, 15789193091, '[email protected]', '192.4.223.99', '**维吾尔自治区郑州市梁平丁路a座 750587');
INSERT INTO `stu` VALUES (7, 'kulwWkwsZ', '鹅厂', '影魔', 12, 55, 99.59, 15789193091, '[email protected]', '192.4.223.99', '**维吾尔自治区郑州市梁平丁路a座 750587');
INSERT INTO `stu` VALUES (8, 'kulwWkwsZ', '上海中学', '影魔', 12, 55, 99.59, 15789193091, '[email protected]', '192.4.223.99', '**维吾尔自治区郑州市梁平丁路a座 750587');
INSERT INTO `stu` VALUES (9, 'eHFOyHtIGfiduV', '旧大院', '高小王子', 82, 43, 90.96, 18506504233, '[email protected]', '198.51.173.171', '湖南省辛集市海港王路a座 115439');
INSERT INTO `stu` VALUES (10, 'eHFOyHtIGfiduV', '人和中心', '影魔', 82, 43, 90.96, 18506504233, '[email protected]', '198.51.173.171', '湖南省辛集市海港王路a座 115439');
INSERT INTO `stu` VALUES (11, 'eHFOyHtIGfiduV', '上海中学', '歌神', 82, 43, 90.96, 18506504233, '[email protected]', '198.51.173.171', '湖南省辛集市海港王路a座 115439');
INSERT INTO `stu` VALUES (12, 'eHFOyHtIGfiduV', '华师大附中', '影魔', 82, 43, 90.96, 18506504233, '[email protected]', '198.51.173.171', '湖南省辛集市海港王路a座 115439');
INSERT INTO `stu` VALUES (13, 'kWZjrT', '猪场', '逗比', 94, 63, 94.24, 18870125400, '[email protected]', '192.88.93.128', '宁夏回族自治区南京县涪城辛集街C座 115529');
INSERT INTO `stu` VALUES (14, 'kWZjrT', '华师大二附中', '鬼泣', 94, 63, 94.24, 18870125400, '[email protected]', '192.88.93.128', '宁夏回族自治区南京县涪城辛集街C座 115529');
INSERT INTO `stu` VALUES (15, 'kWZjrT', '清华中学', '影魔', 94, 63, 94.24, 18870125400, '[email protected]', '192.88.93.128', '宁夏回族自治区南京县涪城辛集街C座 115529');
INSERT INTO `stu` VALUES (16, 'Jec', '华师大二附中', '高小王子', 38, 68, 51.16, 13248136245, '[email protected]', '198.51.103.171', '云南省西安市西峰张家港路j座 882348');
INSERT INTO `stu` VALUES (17, 'kWZjrT', '猪场', '高小王子', 94, 63, 94.24, 18870125400, '[email protected]', '192.88.93.128', '宁夏回族自治区南京县涪城辛集街C座 115529');
mysql_transfer.py:
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
s_env.set_parallelism(1)
# use blink table planner
st_env = StreamTableEnvironment \
.create(s_env, environment_settings=EnvironmentSettings
.new_instance()
.in_streaming_mode()
.use_blink_planner().build())
source_ddl = """CREATE TABLE StuSourceTable (id int , name varchar, school varchar, nickname varchar, age int, class_num int, score decimal, phone int, email varchar, ip varchar) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://db:3306/flink-test',
'connector.table' = 'stu',
'connector.driver' = 'com.mysql.jdbc.Driver',
'connector.username' = 'root',
'connector.password' = 'example')
"""
sink_ddl = """CREATE TABLE StuSinkTable (
id int,
name varchar,
school varchar,
age int
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://db:3306/flink-test',
'connector.table' = 'stu_result',
'connector.driver' = 'com.mysql.jdbc.Driver',
'connector.username' = 'root',
'connector.password' = 'example')
"""
st_env.sql_update(source_ddl)
st_env.sql_update(sink_ddl)
t = st_env.from_path('StuSourceTable')
data = t.select("id,name,school,age")
data.insert_into('StuSinkTable')
st_env.execute("mysql_transfer")
call function:
docker-compose exec jobmanager ./bin/flink run -py /opt/examples/mysql_transfer.py
logs for bug report:
2020-07-08 09:54:41
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.Integer
at org.apache.flink.api.common.typeutils.base.IntSerializer.copy(IntSerializer.java:32)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:639)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:93)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
不清楚是什么原因
I am trying to get about 14million data and want this process to work faster.Is there any way PyFlink could help?
I do not see an open to use license on the image - I am doing a POC with PYFLINK and found the image to be exactly sufficing my use case - hence checking if this image could be used and does this have an open source license ? Please could it be possible to add the license in the repo and the image on docker.
py4j.protocol.Py4JError: org.apache.flink.streaming.api.utils.PythonTypeUtils.getCollectionInputFormat does not exist in the JVM
我不知道这种报错到底什么意思,初学者,py文件如下:
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
def data_stream_api_demo():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
ds = env.from_collection(collection=[(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')],type_info=Types.ROW([Types.INT(), Types.STRING()]))
def split(s):
splits = s[1].split("|")
for sp in splits:
yield s[0], sp
ds = ds.map(lambda i: (i[0] + 1, i[1])).flat_map(split).key_by(lambda i: i[1]).reduce(lambda i, j: (i[0] + j[0], i[1]))
ds.print()
env.execute()
if name == 'main':
data_stream_api_demo()
应该是我使用的有问题
Job has been submitted with JobID 6cc03bd88ca1d89828729a826dc71ee1
Traceback (most recent call last):
File "/opt/examples/table/2-from_kafka_to_kafka.py", line 96, in
from_kafka_to_kafka_demo()
File "/opt/examples/table/2-from_kafka_to_kafka.py", line 26, in from_kafka_to_kafka_demo
st_env.execute("2-from_kafka_to_kafka")
File "/opt/flink/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 1315, in execute
File "/opt/flink/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in call
File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 147, in deco
File "/opt/flink/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o7.execute.
: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 6cc03bd88ca1d89828729a826dc71ee1)
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:123)
at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
at org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:50)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1459)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 6cc03bd88ca1d89828729a826dc71ee1)
at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:125)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:665)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:394)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:123)
... 19 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: java.lang.RuntimeException: Python process exits with code: 1
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
... 13 more
C:\playgrounds>
docker-compose exec db mysql -u root -pexample
mysql: [Warning] Using a password on the command line interface can be insecure.
ERROR 2002 (HY000): Can't connect to local MySQL server through socket '/var/run/mysqld/mysqld.sock' (2)
Hi
i wish to check if there are support for Pyflink for Pattern API which is used for the Java equivalent? thank you
taskmanager_1 | 2020-10-29 05:26:36,073 main ERROR Could not create plugin of type class org.apache.logging.log4j.core.appender.RollingFileAppender for element RollingFile: java.lang.IllegalStateException: ManagerFactory [org.apache.logging.log4j.core.appender.rolling.RollingFileManager$RollingFileManagerFactory@64c87930] unable to create manager for [/opt/flink/log/flink--taskexecutor-0-e7be7ed60128.log] with data [org.apache.logging.log4j.core.appender.rolling.RollingFileManager$FactoryData@400cff1a[pattern=/opt/flink/log/flink--taskexecutor-0-e7be7ed60128.log.%i, append=false, bufferedIO=true, bufferSize=8192, policy=CompositeTriggeringPolicy(policies=[SizeBasedTriggeringPolicy(size=104857600)]), strategy=DefaultRolloverStrategy(min=1, max=10, useMax=true), advertiseURI=null, layout=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n, filePermissions=null, fileOwner=null]] java.lang.IllegalStateException: ManagerFactory [org.apache.logging.log4j.core.appender.rolling.RollingFileManager$RollingFileManagerFactory@64c87930] unable to create manager for [/opt/flink/log/flink--taskexecutor-0-e7be7ed60128.log] with data [org.apache.logging.log4j.core.appender.rolling.RollingFileManager$FactoryData@400cff1a[pattern=/opt/flink/log/flink--taskexecutor-0-e7be7ed60128.log.%i, append=false, bufferedIO=true, bufferSize=8192, policy=CompositeTriggeringPolicy(policies=[SizeBasedTriggeringPolicy(size=104857600)]), strategy=DefaultRolloverStrategy(min=1, max=10, useMax=true), advertiseURI=null, layout=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n, filePermissions=null, fileOwner=null]]
docker-compose exec jobmanager ./bin/flink run -py /opt/examples/2-from_kafka_to_kafka.py
Traceback (most recent call last):
File "/opt/examples/2-from_kafka_to_kafka.py", line 98, in
from_kafka_to_kafka_demo()
File "/opt/examples/2-from_kafka_to_kafka.py", line 21, in from_kafka_to_kafka_demo
register_rides_source(st_env)
File "/opt/examples/2-from_kafka_to_kafka.py", line 34, in register_rides_source
Kafka()
File "/opt/flink/opt/python/pyflink.zip/pyflink/table/descriptors.py", line 900, in init
File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 184, in wrapped_call
TypeError: Could not found the Java class 'Kafka'. The Java dependencies could be specified via command line argument '--jarfile' or the config option 'pipeline.jars'
org.apache.flink.client.program.ProgramAbortException
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
The example program reads from a Kafka topic named "Rides". However, there is no code for a producer to write messages to that topic. Is there supposed to be a producer in the playground repo or does the user need to produce these messages?
Job has been submitted with JobID d6cca72a392a0ff68ac8cc285ebed1b9
Traceback (most recent call last):
File "/opt/examples/table/1-word_count.py", line 34, in
t_env.execute("1-word_count")
File "/opt/flink/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 1315, in execute
File "/opt/flink/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in call
File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 147, in deco
File "/opt/flink/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o4.execute.
: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: d6cca72a392a0ff68ac8cc285ebed1b9)
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:123)
at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
at org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:50)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1459)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: d6cca72a392a0ff68ac8cc285ebed1b9)
at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:125)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:665)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:394)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:123)
... 19 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)
at sun.reflect.GeneratedMethodAccessor40.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.table.api.TableException: Exception in writeRecord
at org.apache.flink.table.filesystem.FileSystemOutputFormat.writeRecord(FileSystemOutputFormat.java:131)
at org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.invoke(OutputFormatSinkFunction.java:87)
at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:49)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:112)
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:93)
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at HashAggregateWithKeys$71.endInput(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:91)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.endInput(OperatorChain.java:423)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:419)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:661)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Mkdirs failed to create /opt/examples/table/output/word_count_output/.staging_1621992750053/cp-0/task-0
at org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem.java:262)
at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:130)
at org.apache.flink.table.filesystem.FileSystemTableSink$4.open(FileSystemTableSink.java:456)
at org.apache.flink.table.filesystem.PartitionWriter$Context.createNewOutputFormat(PartitionWriter.java:61)
at org.apache.flink.table.filesystem.SingleDirectoryWriter.createFormat(SingleDirectoryWriter.java:57)
at org.apache.flink.table.filesystem.SingleDirectoryWriter.write(SingleDirectoryWriter.java:67)
at org.apache.flink.table.filesystem.FileSystemOutputFormat.writeRecord(FileSystemOutputFormat.java:129)
... 19 more
org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: java.lang.RuntimeException: Python process exits with code: 1
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
... 13 more
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.