Code Monkey home page Code Monkey logo

kinesis-sql's People

Contributors

abhishekd0907 avatar arnehuang avatar chadlagore avatar dependabot[bot] avatar ggeorgiadis avatar guoming-xu avatar hyeonseop-lee avatar itsvikramagr avatar mayankahuja avatar ms32035 avatar nhampiholi avatar roncemer avatar sjatlyft avatar snorochevskiy avatar vrajat avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kinesis-sql's Issues

Certificate Verify Failed

It seems that an aws deprecation is causing failures while using the sink! You can see the following errors by the KPL:
[2018-08-16 12:21:06.864637] [0x0000700009927000] [error] [http_client.cc:148] Failed to open connection to kinesis.eu-west-1.amazonaws.com:443 : certificate verify failed [2018-08-16 12:21:06.964754] [0x0000700009927000] [error] [http_client.cc:148] Failed to open connection to kinesis.eu-west-1.amazonaws.com:443 : certificate verify failed

As far as I can see the versions have been updated in the master branch of Spark 4d8ae0d1c846560e1cac3480d73f8439968430a6 but they are not pulled in this library's pom therefore causing the errors.

I fixed it by hardcoding the latest kpl and aws sdk versions in branch 2.2.1 which fixed the issue.

Shall I make a PR for this?

Encryption/Decryption mechanism for awsSecretKey and awsAccessKeyId

Hi @itsvikramagr

I'm using qubole connector for reading from amazonkinesis streams in spark structured streaming mode.
If i change the log level from INFO to DEBUG in log4j.properties, i see the physical plan getting dump in target/unit-tests.log. Physical plan contains sensitive information awsSecretKey and awsAccessKeyId . It the security wise concern as is it coming as plain text.

Snippet of Physical plan :

`
18/10/25 20:04:05.023 main TRACE BaseSessionStateBuilder$$anon$1:
=== Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences ===
!'Project [unresolvedalias(cast('approximateArrivalTimestamp as timestamp), None)] 'Project [unresolvedalias(cast(approximateArrivalTimestamp#4 as timestamp), None)]
+- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@4d0b0fd4,kinesis,List(),None,List(),None,Map(awsAccessKeyId -> AKIAF6LDVCA3FMAD5FCV, endpointUrl -> kinesis.us-west-1.amazonaws.com, awsSecretKey -> 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, startingposition -> LATEST, streamName -> bdstest),None), kinesis, [data#0, streamName#1, partitionKey#2, sequenceNumber#3, approximateArrivalTimestamp#4] +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@4d0b0fd4,kinesis,List(),None,List(),None,Map(awsAccessKeyId -> AKIAF6LDVCA3FMAD5FCV, endpointUrl -> kinesis.us-west-1.amazonaws.com, awsSecretKey -> 80vlwE5oxcvar9XPlFr1hooYuG9At47nB/ujbgKi8, startingposition -> LATEST, streamName -> bdstest),None), kinesis, [data#0, streamName#1, partitionKey#2, sequenceNumber#3, approximateArrivalTimestamp#4]

18/10/25 20:04:05.031 main TRACE BaseSessionStateBuilder$$anon$1:
`

Drivers pass the awsSecretKey and awsSecretKeyId in plain text to spark, so it is getting dump in physical plan of spark logs. Ideally drivers should take care of encrypting and decrypting the passwords, which is not done .
Can encryption/decryption mechanism be added for awsSecretKey and awsAccessKeyId before passing to Spark ?

Thanks,
Anuja

mvn install -DskipTests fails

This is similar to #40.
On my laptop and then a fresh Ubuntu Server 18.04, I did:

sudo apt-get update && sudo apt install maven
git clone https://github.com/qubole/kinesis-sql.git
cd kinesis-sql/
git checkout 2.4.0
mvn install -DskipTests -e

This is the output:

WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by com.google.inject.internal.cglib.core.$ReflectUtils$1 (file:/usr/share/maven/lib/guice.jar) to method java.lang.ClassLoader.defineClass(java.lang.String,byte[],int,int,java.security.ProtectionDomain)
WARNING: Please consider reporting this to the maintainers of com.google.inject.internal.cglib.core.$ReflectUtils$1
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
[INFO] Error stacktraces are turned on.
[INFO] Scanning for projects...
[INFO] 
[INFO] --------------< org.apache.spark:spark-sql-kinesis_2.11 >---------------
[INFO] Building Kinesis Integration for Structured Streaming 2.4.0
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (enforce-versions) @ spark-sql-kinesis_2.11 ---
[INFO] 
[INFO] --- scala-maven-plugin:3.2.2:add-source (eclipse-add-source) @ spark-sql-kinesis_2.11 ---
[INFO] Add Source directory: /home/ubuntu/kinesis-sql/src/main/scala
[INFO] Add Test Source directory: /home/ubuntu/kinesis-sql/src/test/scala
[INFO] 
[INFO] --- maven-dependency-plugin:3.0.2:build-classpath (default-cli) @ spark-sql-kinesis_2.11 ---
[INFO] Dependencies classpath:
/home/ubuntu/.m2/repository/com/amazonaws/aws-java-sdk-s3/1.11.271/aws-java-sdk-s3-1.11.271.jar:/home/ubuntu/.m2/repository/com/amazonaws/aws-java-sdk-dynamodb/1.11.271/aws-java-sdk-dynamodb-1.11.271.jar:/home/ubuntu/.m2/repository/log4j/log4j/1.2.17/log4j-1.2.17.jar:/home/ubuntu/.m2/repository/com/google/protobuf/protobuf-java/2.5.0/protobuf-java-2.5.0.jar:/home/ubuntu/.m2/repository/org/apache/httpcomponents/httpcore/4.4.10/httpcore-4.4.10.jar:/home/ubuntu/.m2/repository/org/apache/commons/commons-compress/1.8.1/commons-compress-1.8.1.jar:/home/ubuntu/.m2/repository/com/thoughtworks/paranamer/paranamer/2.8/paranamer-2.8.jar:/home/ubuntu/.m2/repository/org/apache/avro/avro/1.8.2/avro-1.8.2.jar:/home/ubuntu/.m2/repository/com/amazonaws/aws-java-sdk-cloudwatch/1.11.271/aws-java-sdk-cloudwatch-1.11.271.jar:/home/ubuntu/.m2/repository/org/apache/directory/server/apacheds-kerberos-codec/2.0.0-M15/apacheds-kerberos-codec-2.0.0-M15.jar:/home/ubuntu/.m2/repository/org/apache/curator/curator-recipes/2.6.0/curator-recipes-2.6.0.jar:/home/ubuntu/.m2/repository/org/apache/hadoop/hadoop-hdfs/2.6.5/hadoop-hdfs-2.6.5.jar:/home/ubuntu/.m2/repository/com/amazonaws/aws-java-sdk-kms/1.11.271/aws-java-sdk-kms-1.11.271.jar:/home/ubuntu/.m2/repository/org/apache/hadoop/hadoop-auth/2.6.5/hadoop-auth-2.6.5.jar:/home/ubuntu/.m2/repository/org/apache/parquet/parquet-common/1.10.0/parquet-common-1.10.0.jar:/home/ubuntu/.m2/repository/org/apache/hadoop/hadoop-mapreduce-client-shuffle/2.6.5/hadoop-mapreduce-client-shuffle-2.6.5.jar:/home/ubuntu/.m2/repository/org/fusesource/leveldbjni/leveldbjni-all/1.8/leveldbjni-all-1.8.jar:/home/ubuntu/.m2/repository/org/slf4j/slf4j-api/1.7.16/slf4j-api-1.7.16.jar:/home/ubuntu/.m2/repository/org/tukaani/xz/1.5/xz-1.5.jar:/home/ubuntu/.m2/repository/org/apache/orc/orc-shims/1.5.2/orc-shims-1.5.2.jar:/home/ubuntu/.m2/repository/org/mortbay/jetty/jetty-util/6.1.26/jetty-util-6.1.26.jar:/home/ubuntu/.m2/repository/com/amazonaws/aws-java-sdk-kinesis/1.11.271/aws-java-sdk-kinesis-1.11.271.jar:/home/ubuntu/.m2/repository/org/xerial/snappy/snappy-java/1.1.7.1/snappy-java-1.1.7.1.jar:/home/ubuntu/.m2/repository/com/amazonaws/jmespath-java/1.11.271/jmespath-java-1.11.271.jar:/home/ubuntu/.m2/repository/org/apache/orc/orc-core/1.5.2/orc-core-1.5.2-nohive.jar:/home/ubuntu/.m2/repository/org/apache/hadoop/hadoop-mapreduce-client-core/2.6.5/hadoop-mapreduce-client-core-2.6.5.jar:/home/ubuntu/.m2/repository/commons-io/commons-io/2.4/commons-io-2.4.jar:/home/ubuntu/.m2/repository/com/fasterxml/jackson/core/jackson-databind/2.6.7.1/jackson-databind-2.6.7.1.jar:/home/ubuntu/.m2/repository/org/codehaus/jackson/jackson-mapper-asl/1.9.13/jackson-mapper-asl-1.9.13.jar:/home/ubuntu/.m2/repository/org/codehaus/jackson/jackson-jaxrs/1.9.13/jackson-jaxrs-1.9.13.jar:/home/ubuntu/.m2/repository/io/netty/netty/3.9.9.Final/netty-3.9.9.Final.jar:/home/ubuntu/.m2/repository/org/apache/avro/avro-ipc/1.8.2/avro-ipc-1.8.2.jar:/home/ubuntu/.m2/repository/xmlenc/xmlenc/0.52/xmlenc-0.52.jar:/home/ubuntu/.m2/repository/org/codehaus/jackson/jackson-core-asl/1.9.13/jackson-core-asl-1.9.13.jar:/home/ubuntu/.m2/repository/org/apache/directory/api/api-util/1.0.0-M20/api-util-1.0.0-M20.jar:/home/ubuntu/.m2/repository/com/google/code/gson/gson/2.2.4/gson-2.2.4.jar:/home/ubuntu/.m2/repository/org/apache/directory/api/api-asn1-api/1.0.0-M20/api-asn1-api-1.0.0-M20.jar:/home/ubuntu/.m2/repository/joda-time/joda-time/2.9.3/joda-time-2.9.3.jar:/home/ubuntu/.m2/repository/com/amazonaws/aws-java-sdk-core/1.11.271/aws-java-sdk-core-1.11.271.jar:/home/ubuntu/.m2/repository/org/apache/curator/curator-client/2.6.0/curator-client-2.6.0.jar:/home/ubuntu/.m2/repository/com/fasterxml/jackson/core/jackson-annotations/2.6.7/jackson-annotations-2.6.7.jar:/home/ubuntu/.m2/repository/org/apache/hadoop/hadoop-mapreduce-client-common/2.6.5/hadoop-mapreduce-client-common-2.6.5.jar:/home/ubuntu/.m2/repository/org/apache/parquet/parquet-format/2.4.0/parquet-format-2.4.0.jar:/home/ubuntu/.m2/repository/commons-configuration/commons-configuration/1.6/commons-configuration-1.6.jar:/home/ubuntu/.m2/repository/com/google/code/findbugs/jsr305/1.3.9/jsr305-1.3.9.jar:/home/ubuntu/.m2/repository/commons-net/commons-net/3.1/commons-net-3.1.jar:/home/ubuntu/.m2/repository/commons-digester/commons-digester/1.8/commons-digester-1.8.jar:/home/ubuntu/.m2/repository/org/apache/parquet/parquet-hadoop/1.10.0/parquet-hadoop-1.10.0.jar:/home/ubuntu/.m2/repository/org/apache/hadoop/hadoop-mapreduce-client-jobclient/2.6.5/hadoop-mapreduce-client-jobclient-2.6.5.jar:/home/ubuntu/.m2/repository/io/airlift/aircompressor/0.10/aircompressor-0.10.jar:/home/ubuntu/.m2/repository/org/apache/directory/server/apacheds-i18n/2.0.0-M15/apacheds-i18n-2.0.0-M15.jar:/home/ubuntu/.m2/repository/org/scala-lang/scala-library/2.11.12/scala-library-2.11.12.jar:/home/ubuntu/.m2/repository/com/fasterxml/jackson/dataformat/jackson-dataformat-cbor/2.6.7/jackson-dataformat-cbor-2.6.7.jar:/home/ubuntu/.m2/repository/commons-beanutils/commons-beanutils/1.7.0/commons-beanutils-1.7.0.jar:/home/ubuntu/.m2/repository/com/amazonaws/amazon-kinesis-producer/0.12.8/amazon-kinesis-producer-0.12.8.jar:/home/ubuntu/.m2/repository/javax/xml/bind/jaxb-api/2.2.2/jaxb-api-2.2.2.jar:/home/ubuntu/.m2/repository/org/htrace/htrace-core/3.0.4/htrace-core-3.0.4.jar:/home/ubuntu/.m2/repository/commons-lang/commons-lang/2.6/commons-lang-2.6.jar:/home/ubuntu/.m2/repository/org/apache/parquet/parquet-jackson/1.10.0/parquet-jackson-1.10.0.jar:/home/ubuntu/.m2/repository/org/apache/zookeeper/zookeeper/3.4.6/zookeeper-3.4.6.jar:/home/ubuntu/.m2/repository/org/apache/hadoop/hadoop-client/2.6.5/hadoop-client-2.6.5.jar:/home/ubuntu/.m2/repository/javax/activation/activation/1.1.1/activation-1.1.1.jar:/home/ubuntu/.m2/repository/com/amazonaws/amazon-kinesis-client/1.8.10/amazon-kinesis-client-1.8.10.jar:/home/ubuntu/.m2/repository/commons-cli/commons-cli/1.2/commons-cli-1.2.jar:/home/ubuntu/.m2/repository/commons-codec/commons-codec/1.10/commons-codec-1.10.jar:/home/ubuntu/.m2/repository/xerces/xercesImpl/2.9.1/xercesImpl-2.9.1.jar:/home/ubuntu/.m2/repository/org/slf4j/slf4j-log4j12/1.7.16/slf4j-log4j12-1.7.16.jar:/home/ubuntu/.m2/repository/org/apache/hadoop/hadoop-yarn-api/2.6.5/hadoop-yarn-api-2.6.5.jar:/home/ubuntu/.m2/repository/org/spark-project/spark/unused/1.0.0/unused-1.0.0.jar:/home/ubuntu/.m2/repository/org/apache/hadoop/hadoop-common/2.6.5/hadoop-common-2.6.5.jar:/home/ubuntu/.m2/repository/org/apache/spark/spark-tags_2.11/2.4.0/spark-tags_2.11-2.4.0.jar:/home/ubuntu/.m2/repository/commons-beanutils/commons-beanutils-core/1.8.0/commons-beanutils-core-1.8.0.jar:/home/ubuntu/.m2/repository/com/fasterxml/jackson/core/jackson-core/2.6.7/jackson-core-2.6.7.jar:/home/ubuntu/.m2/repository/org/apache/orc/orc-mapreduce/1.5.2/orc-mapreduce-1.5.2-nohive.jar:/home/ubuntu/.m2/repository/org/apache/commons/commons-math3/3.4.1/commons-math3-3.4.1.jar:/home/ubuntu/.m2/repository/commons-collections/commons-collections/3.2.2/commons-collections-3.2.2.jar:/home/ubuntu/.m2/repository/javax/xml/stream/stax-api/1.0-2/stax-api-1.0-2.jar:/home/ubuntu/.m2/repository/com/amazonaws/aws-java-sdk-sts/1.11.271/aws-java-sdk-sts-1.11.271.jar:/home/ubuntu/.m2/repository/org/apache/hadoop/hadoop-yarn-client/2.6.5/hadoop-yarn-client-2.6.5.jar:/home/ubuntu/.m2/repository/org/codehaus/jackson/jackson-xc/1.9.13/jackson-xc-1.9.13.jar:/home/ubuntu/.m2/repository/org/apache/parquet/parquet-column/1.10.0/parquet-column-1.10.0.jar:/home/ubuntu/.m2/repository/commons-logging/commons-logging/1.1.3/commons-logging-1.1.3.jar:/home/ubuntu/.m2/repository/org/apache/httpcomponents/httpclient/4.5.6/httpclient-4.5.6.jar:/home/ubuntu/.m2/repository/org/apache/hadoop/hadoop-mapreduce-client-app/2.6.5/hadoop-mapreduce-client-app-2.6.5.jar:/home/ubuntu/.m2/repository/org/apache/hadoop/hadoop-yarn-server-common/2.6.5/hadoop-yarn-server-common-2.6.5.jar:/home/ubuntu/.m2/repository/software/amazon/ion/ion-java/1.0.2/ion-java-1.0.2.jar:/home/ubuntu/.m2/repository/org/apache/hadoop/hadoop-yarn-common/2.6.5/hadoop-yarn-common-2.6.5.jar:/home/ubuntu/.m2/repository/org/apache/parquet/parquet-encoding/1.10.0/parquet-encoding-1.10.0.jar:/home/ubuntu/.m2/repository/org/apache/avro/avro-mapred/1.8.2/avro-mapred-1.8.2-hadoop2.jar:/home/ubuntu/.m2/repository/org/apache/hadoop/hadoop-annotations/2.6.5/hadoop-annotations-2.6.5.jar:/home/ubuntu/.m2/repository/org/apache/curator/curator-framework/2.6.0/curator-framework-2.6.0.jar:/home/ubuntu/.m2/repository/commons-httpclient/commons-httpclient/3.1/commons-httpclient-3.1.jar
[INFO] 
[INFO] --- maven-remote-resources-plugin:1.5:process (process-resource-bundles) @ spark-sql-kinesis_2.11 ---
[INFO] 
[INFO] --- maven-resources-plugin:2.7:resources (default-resources) @ spark-sql-kinesis_2.11 ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Copying 1 resource
[INFO] Copying 3 resources
[INFO] 
[INFO] --- maven-compiler-plugin:3.7.0:compile (default-compile) @ spark-sql-kinesis_2.11 ---
[INFO] Not compiling main sources
[INFO] 
[INFO] --- scala-maven-plugin:3.2.2:compile (scala-compile-first) @ spark-sql-kinesis_2.11 ---
[WARNING] Zinc server is not available at port 3030 - reverting to normal incremental compile
[INFO] Using incremental compilation
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  13.343 s
[INFO] Finished at: 2019-07-12T12:45:19Z
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal net.alchim31.maven:scala-maven-plugin:3.2.2:compile (scala-compile-first) on project spark-sql-kinesis_2.11: Execution scala-compile-first of goal net.alchim31.maven:scala-maven-plugin:3.2.2:compile failed.: NullPointerException -> [Help 1]
org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal net.alchim31.maven:scala-maven-plugin:3.2.2:compile (scala-compile-first) on project spark-sql-kinesis_2.11: Execution scala-compile-first of goal net.alchim31.maven:scala-maven-plugin:3.2.2:compile failed.
    at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:215)
    at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:156)
    at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:148)
    at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:117)
    at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:81)
    at org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build (SingleThreadedBuilder.java:56)
    at org.apache.maven.lifecycle.internal.LifecycleStarter.execute (LifecycleStarter.java:128)
    at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:305)
    at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:192)
    at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:105)
    at org.apache.maven.cli.MavenCli.execute (MavenCli.java:956)
    at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:288)
    at org.apache.maven.cli.MavenCli.main (MavenCli.java:192)
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
    at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:566)
    at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced (Launcher.java:289)
    at org.codehaus.plexus.classworlds.launcher.Launcher.launch (Launcher.java:229)
    at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode (Launcher.java:415)
    at org.codehaus.plexus.classworlds.launcher.Launcher.main (Launcher.java:356)
Caused by: org.apache.maven.plugin.PluginExecutionException: Execution scala-compile-first of goal net.alchim31.maven:scala-maven-plugin:3.2.2:compile failed.
    at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo (DefaultBuildPluginManager.java:148)
    at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:210)
    at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:156)
    at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:148)
    at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:117)
    at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:81)
    at org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build (SingleThreadedBuilder.java:56)
    at org.apache.maven.lifecycle.internal.LifecycleStarter.execute (LifecycleStarter.java:128)
    at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:305)
    at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:192)
    at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:105)
    at org.apache.maven.cli.MavenCli.execute (MavenCli.java:956)
    at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:288)
    at org.apache.maven.cli.MavenCli.main (MavenCli.java:192)
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
    at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:566)
    at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced (Launcher.java:289)
    at org.codehaus.plexus.classworlds.launcher.Launcher.launch (Launcher.java:229)
    at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode (Launcher.java:415)
    at org.codehaus.plexus.classworlds.launcher.Launcher.main (Launcher.java:356)
Caused by: java.lang.NullPointerException
    at java.util.regex.Matcher.getTextLength (Matcher.java:1770)
    at java.util.regex.Matcher.reset (Matcher.java:416)
    at java.util.regex.Matcher.<init> (Matcher.java:253)
    at java.util.regex.Pattern.matcher (Pattern.java:1133)
    at java.util.regex.Pattern.split (Pattern.java:1261)
    at java.util.regex.Pattern.split (Pattern.java:1334)
    at sbt.IO$.pathSplit (IO.scala:727)
    at sbt.IO$.parseClasspath (IO.scala:825)
    at sbt.compiler.CompilerArguments.extClasspath (CompilerArguments.scala:64)
    at sbt.compiler.AggressiveCompile.withBootclasspath (AggressiveCompile.scala:51)
    at sbt.compiler.AggressiveCompile.compile2 (AggressiveCompile.scala:84)
    at sbt.compiler.AggressiveCompile.compile1 (AggressiveCompile.scala:71)
    at com.typesafe.zinc.Compiler.compile (Compiler.scala:184)
    at com.typesafe.zinc.Compiler.compile (Compiler.scala:164)
    at sbt_inc.SbtIncrementalCompiler.compile (SbtIncrementalCompiler.java:92)
    at scala_maven.ScalaCompilerSupport.incrementalCompile (ScalaCompilerSupport.java:303)
    at scala_maven.ScalaCompilerSupport.compile (ScalaCompilerSupport.java:119)
    at scala_maven.ScalaCompilerSupport.doExecute (ScalaCompilerSupport.java:99)
    at scala_maven.ScalaMojoSupport.execute (ScalaMojoSupport.java:482)
    at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo (DefaultBuildPluginManager.java:137)
    at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:210)
    at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:156)
    at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:148)
    at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:117)
    at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:81)
    at org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build (SingleThreadedBuilder.java:56)
    at org.apache.maven.lifecycle.internal.LifecycleStarter.execute (LifecycleStarter.java:128)
    at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:305)
    at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:192)
    at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:105)
    at org.apache.maven.cli.MavenCli.execute (MavenCli.java:956)
    at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:288)
    at org.apache.maven.cli.MavenCli.main (MavenCli.java:192)
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
    at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:566)
    at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced (Launcher.java:289)
    at org.codehaus.plexus.classworlds.launcher.Launcher.launch (Launcher.java:229)
    at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode (Launcher.java:415)
    at org.codehaus.plexus.classworlds.launcher.Launcher.main (Launcher.java:356)
[ERROR] 
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR] 
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/PluginExecutionException

What could be the issue?

NullPointerException on cluster

Application is successfully working in local mode but it fails on cluster in client mode with an error:

java.lang.NullPointerException at org.apache.spark.sql.kinesis.KinesisSource.prevBatchShardInfo(KinesisSource.scala:216)

Full stack-trace:

`2018-08-10 12:52:37 ERROR MicroBatchExecution:91 - Query [id = a1c2934d-a65e-4b4c-b111-e0ecb2d62133, runId = bb95b4e8-6ce4-4bd9-b4b5-61c0a44939ae] terminated with error
java.lang.NullPointerException
at org.apache.spark.sql.kinesis.KinesisSource.prevBatchShardInfo(KinesisSource.scala:216)
at org.apache.spark.sql.kinesis.KinesisSource.getOffset(KinesisSource.scala:105)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$5$$anonfun$apply$5.apply(MicroBatchExecution.scala:268)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$5$$anonfun$apply$5.apply(MicroBatchExecution.scala:268)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$5.apply(MicroBatchExecution.scala:267)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$5.apply(MicroBatchExecution.scala:264)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:264)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:128)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: null
=== Streaming Query ===
Identifier: [id = a1c2934d-a65e-4b4c-b111-e0ecb2d62133, runId = bb95b4e8-6ce4-4bd9-b4b5-61c0a44939ae]
Current Committed Offsets: {KinesisSource[testing-kinesis-spark]: {"metadata":{"streamName":"testing-kinesis-spark","batchId":"0"},"shardId-000000000000":{"iteratorType":"AT_TIMESTAMP","iteratorPosition":"1533905549906"}}}
Current Available Offsets: {KinesisSource[testing-kinesis-spark]: {"metadata":{"streamName":"testing-kinesis-spark","batchId":"0"},"shardId-000000000000":{"iteratorType":"AT_TIMESTAMP","iteratorPosition":"1533905549906"}}}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
SerializeFromObject [encodeusingserializer(input[0, java.lang.Object, true], false) AS value#44]
+- MapPartitions , obj#43: com.test.app.AppTask$AppOutput
+- DeserializeToObject decodeusingserializer(cast(value#41 as binary), com.tr.common.messagebroker.MessageType$ComputationMessage, false), obj#42: com.tr.common.messagebroker.MessageType$ComputationMessage
+- SerializeFromObject [encodeusingserializer(input[0, java.lang.Object, true], false) AS value#41]
+- MapPartitions , obj#40: com.tr.common.messagebroker.MessageType$ComputationMessage
+- DeserializeToObject decodeusingserializer(cast(value#37 as binary), com.tr.common.messagebroker.MessageType$ComputationMessage, false), obj#39: com.tr.common.messagebroker.MessageType$ComputationMessage
+- SerializeFromObject [encodeusingserializer(input[0, java.lang.Object, true], false) AS value#37]
+- MapElements , class scala.Tuple4, [StructField(_1,StringType,true), StructField(_2,StringType,true), StructField(_3,StringType,true), StructField(_4,DecimalType(38,0),true)], obj#36: com.tr.common.messagebroker.MessageType$ComputationMessage
+- DeserializeToObject newInstance(class scala.Tuple4), obj#35: scala.Tuple4
+- Aggregate [streamName#15, partitionKey#16, data#17], [streamName#15, partitionKey#16, data#17, count(1) AS count#25L]
+- Project [cast(streamName#6 as string) AS streamName#15, cast(partitionKey#7 as string) AS partitionKey#16, cast(data#5 as string) AS data#17]
+- StreamingExecutionRelation KinesisSource[testing-kinesis-spark], [data#5, streamName#6, partitionKey#7, sequenceNumber#8, approximateArrivalTimestamp#9]

    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)

Caused by: java.lang.NullPointerException
at org.apache.spark.sql.kinesis.KinesisSource.prevBatchShardInfo(KinesisSource.scala:216)
at org.apache.spark.sql.kinesis.KinesisSource.getOffset(KinesisSource.scala:105)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$5$$anonfun$apply$5.apply(MicroBatchExecution.scala:268)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$5$$anonfun$apply$5.apply(MicroBatchExecution.scala:268)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$5.apply(MicroBatchExecution.scala:267)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$5.apply(MicroBatchExecution.scala:264)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:264)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:128)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
... 1 more
`

Consuming from multiple regions with different records per second

Hi,

First of all, thanks for this wonderful library. It is awesome.

We have a kinesis stream which is spread across 5 different regions. Number of records in each of these regions vary drastically. For instance, one region has 20,000 records/sec while another region has just 500 records/sec.

I am creating these 5 streams and union-ing them together so I could treat it as single stream and perform required operations.

  def createStreams(config: Config)(implicit spark: SparkSession): DataFrame = {
    config.getStringList("regions").asScala
      .map(region => createStreamForRegion(region, config))
      .reduce((df1, df2) => df1.union(df2))
  }

  def createStreamForRegion(region: String, config: Config)(implicit spark: SparkSession): DataFrame = {
    spark.readStream
      .format("kinesis")
      .option("streamName", config.getString("stream.name"))
      .option("kinesis.executor.metadata.committer", "hdfs")
      .option("kinesis.executor.metadata.path", s"${config.getString("checkPoints.kinesis")}/$region")
      .option("endpointUrl", s"https://kinesis.$region.amazonaws.com")
      .option("startingPosition", config.getString("stream.startPosition"))
      .option("kinesis.client.describeShardInterval", config.getString("kinesis.describeShardInterval"))
      .option("kinesis.executor.maxRecordPerRead", config.getString("kinesis.maxRecordsPerRead"))
      .option("kinesis.executor.maxFetchRecordsPerShard", config.getString("kinesis.maxRecordsReadPerShard"))
      .option("kinesis.client.numRetries", config.getString("kinesis.numberOfRetries"))
      .option("kinesis.client.retryIntervalMs", config.getString("kinesis.retryIntervalMs"))
      .load
  }

With this approach, we see sometimes the KinesisReader lags behind (from current time) on some particular region while other regions are doing just fine.

With Spark Streaming approach, we could configure number of Kinesis Readers per region. So we would configure higher number of readers for regions with more volume and less for other regions. Is there something similar here? How do we make sure that all Kinesis Readers from all regions are up-to-date in consumption and are not lagging behind?

Thanks,
Jegan

NullPointerException when output dir already exists for CSV sink

Hi folks. Thanks for writing this awesome library. It's been really useful. I wanted to report a small problem I'm seeing. Here's a small fragment of my code:

def main(args: Array[String]): Unit = {

  import sparkSession.implicits._

  val kinesis = sparkService.getKinesisStructuredStream(
    config.getString("kinesis.streamName"),
    config.getString("kinesis.endpointURL"),
    config.getString("kinesis.startingPosition")
  )

  val processedEvent = kinesis.as[Event]
    .map(SNSEventProcessor.process)
    .filter(col("eventId").isNotNull)

  processedEvent.writeStream
    .outputMode("append")
    .format("csv")
    .option("checkpointLocation", config.getString("spark.app.checkpointDir"))
    .option("path", "./output/csv_output")
    .start()
    .awaitTermination()
}

When the ./output doesn't exist, the stream runs without problems. However, when I terminate it and then start it again I get the following exception:

Caused by: java.lang.NullPointerException
	at org.apache.spark.sql.kinesis.KinesisSource.prevBatchShardInfo(KinesisSource.scala:216)
	at org.apache.spark.sql.kinesis.KinesisSource.getOffset(KinesisSource.scala:105)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$5$$anonfun$apply$5.apply(MicroBatchExecution.scala:268)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$5$$anonfun$apply$5.apply(MicroBatchExecution.scala:268)
	at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$5.apply(MicroBatchExecution.scala:267)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$5.apply(MicroBatchExecution.scala:264)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:264)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:128)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
	at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
	... 1 more

It may be possible that there's something I'm doing wrong. If that's the case I'd be very grateful if you point it out. If you need more info to find out what might be the cause of this let me know. Thanks!

Empty batches

Is it normal for KinesisSource to generate empty microbatches when there is no new data in Kinesis? On Spark 2.4.0, when running a simple query with no trigger and console or CSV writer, a new batch is generated about every second. I remember that it wasn't the case with Kafka source.

Like awsAccessKeyId option its good to have proxy and port option

For Ex.

kinesisDF = spark \
  .readStream \
  .format("kinesis") \
  **.option("proxyHost", "host") \
  .option("proxyPort", "port") \
  .option("proxyUser", "user") \
  .option("proxyPass", "pass") \**
  .option("awsAccessKeyId", "mykey") \
  .option("awsSecretKey", "mykey") \
  .option("streamName", "TEST_STREAM") \
  .option("initialPosition", "earliest") \
  .option("endpointUrl", "https://kinesis.eu-central-1.amazonaws.com") \
  .option("region", "eu-central-1") \
  .load()

How to increase records read per second?

Thanks for the great library. My current stream consists 50 shards, processing around 500k+ records per minute. I've setup this library in a Spark job, but it's only reading ~ 150k records per minute, how can I increase this number? Already changed all the kinesis.executor settings, but this didn't had any effect.

passing aws_iam_role instead of awsSecretKey/awsAccessKeyId

Not in the docs so I'm asking here... could we use an option with aws_iam_role instead of the aws credentials? As in:

val kinesisDF = spark.readStream
    .format("kinesis")
    .option("streamName",streamName)
    .option("initialPosition",initialPosition)
    .option("region", regionName)
    .option("aws_iam_role", roleArn)
    .load()

There's another similar issue that's been asked before but was closed without providing a solution.

Thanks.

reprocessing of already processed records after scaling the shards.

We are currently working on a streaming job that reads data from the Kinesis stream using the connector library of version 2.4.0. We have observed that the records which were already processed within the previous batches are processed again whenever the shards are scaled up or down. This happens irrespective of the offset check-pointing. Whenever new shards are created, the records of the closed/parent shards are read again using the start position as TRIM_HORIZON.

Do we have support foreachbatch sinks

Hi,

I am using this repo to build structured streaming kinesis data streams using spark 2.4.0.

I wanted to understand if we have support for foreachbatch sinks.

https://spark.apache.org/docs/2.4.0/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch

To explain further,

My use case is

  1. read the stream from kinesis
  2. Batch the data using foreachBatch and then write the data into S3.
  3. Should be using S3 as checkpoint location but using HDFS is also possible,

The problem i am facing here is, when i am using foreachbatch the shard-commits folder is not getting updated for each batch, its not writing anything more than shardt-commit/0

attached is the checkpoint dump file its generating when writing to local.
The behaviour is same when i am writing the checkpoint to S3 as well.

checkpoint.zip

Sample code is below

''' val roundStreamDf = sc.readStream
.format("kinesis")
.option("streamName",streamName)
.option("endpointUrl",s"https://kinesis.${region}.amazonaws.com")
.option("awsAccessKeyId", acceskeyId)
.option("awsSecretKey", secretAccessKey)
.option("startingposition",START_POS)
.option("kinesis.client.describeShardInterval","3600")
.option("kinesis.client.avoidEmptyBatches",true)
.load()
val roundStreamIntermediateDf = roundStreamDf
.selectExpr("cast (data as STRING) jsonData")
.select(from_json(col("jsonData"), ScalaReflection.schemaFor[Round].dataType.asInstanceOf[StructType]).as("round"))
.select("round.*")
roundStreamIntermediateDf
.writeStream
.foreachBatch{(batchDf: DataFrame, batchId: Long ) =>
val RoundDf = commonRoundDataProcess(batchDf)
RoundDf.persist()
while(opNameIterator.hasNext){
val opName = opNameIterator.next()
val finalRoundDf = RoundDf.filter(col("OperatorShortName") === opName)
if(!finalRoundDf.head(1).isEmpty){ accessDataS3.writeDataToRefinedHudiS3(sc,finalRoundDf,extraOptions,opName,ROUND_TYPE_OF_DATA)
}
}
RoundDf.unpersist()
}
.outputMode("Update")
.trigger(Trigger.ProcessingTime("60 seconds"))
.option("checkpointLocation",s"${checkPointBucket}/checkpoint/${ROUND_TYPE_OF_DATA}/")
.start()
.awaitTermination()'''

Th code basically read the stream and have some processes that needs to be carried out in foreachBatch Sink and then be pushed to S3.

attached is the error log.
error.log

I have tried increasing the number of attempts to 20 thiniking it might be a problem with EC of S3 but when tried on my local or HDFS I have the same problem

Quick help will be much appreciated.

Minimize use of TimesStamp as Kinesis position

We use timestamp, as next Kinesis position for next trigger, if there is no new data for a shard in current execution. Instead of that, can we always use last fetched Sequence Number from earlier triggers? Timestamp might not be unique across records per shards. Less we rely on it, the less would be the chances to make mistakes.

Spark structured streaming goes in infinite loop

When using this library to read stream from kinesis, spark goes in infinite loop.

In our code - we are using spark structured streaming to read published events from kinesis. Although, we have published only one event - kinesis-sql code reads the same event again and again and never finish reading the same message which makes our executor node goes in recurssion.

Any idea, what could be the problem?

@itsvikramagr

Multiple Sinks

Hi guys,

Does the library support multiple sinks?

I am using two sinks (KinesisSink & FileSink) in the same application. Both sinks use their own checkpoint locations.

  1. Read a stream.
  2. Apply some transformations on the DataFrame.
  3. writeStream to a FileSink.
  4. writeStream to a KinesisSink.
  5. spark.streams.awaitAnyTermination()

However, I encountered weird behaviors like

  • slow processing on KinesisSink
  • jumping iterator_age on Kinesis stream metrics
  • data loss

Could you give me some information about the functionalities of the library on multiple sinks?

Thanks.

Maven build error

I want to test your connector and first tried to build it with maven. JDK version is 13.0.2. I did exactly like you decribed in the README:

git clone...
git checkout 2.4.0
mvn clean install -DskipTests

But I get the following error (also on other branches like master branch):

[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.7.0:compile (default-compile) on project spark-sql-kinesis_2.11: Compilation failure: Compilation failure:
[ERROR] Source option 5 is no longer supported. Use 7 or later.
[ERROR] Target option 5 is no longer supported. Use 7 or later.

It seems that the maven-compiler-plugin has Java version 5 as default setting. But I wonder if I'm the only one with this issue...

If I set those versions manually in the pom.xml like this:

...
<configuration>
  <source>1.8</source>
  <target>1.8</target>
</configuration>
...

I get the following error: java.lang.NoClassDefFoundError: javax/tools/ToolProvider

Can anyone point me to the right direction what's wrong here? Thanks!

It takes a very long time before consumption from the stream starts

Although there are elements in the stream, I keep receiving empty batches for an unspecified amount of time (many minutes) and then suddenly it starts working. After that, it works as expected, the app starts consuming events from the stream immediately, if I send new events after the stream has been emptied, it takes only a few seconds to start processing them once they reach the stream.

Could this be a problem with the library, kinesis itself or normal behavior I'm unaware of?

sink low write throughput

I have a spark job which is reading from kinesis input stream and simply writing it to kinesis output stream.
The write speed to input stream is >800rec/sec. The problem is that when I am consuming from final output stream the speed is just about ~20rec/sec.
When I change kinesis sink to s3 or console, I can see very high throughput.
Here is my code:

`val kinesisDF = spark
.readStream
.format("kinesis")
.option("streamName", "input_stream")
.option("startingposition", "trim_horizon")
.option("endpointUrl", "https://kinesis.eu-central-1.amazonaws.com")
.option("region", "eu-central-1")
.load()

val kinesis2 = kinesisDF
  .selectExpr("cast (data as STRING) jsonData")
  .withColumn("data", functions.from_json(functions.col("jsonData"), dataSchema))
  .select("data.*")


val kinesis_result = kinesis2
  .withColumn("data", functions.to_json(functions.struct("ts", "unit_name", "name", "val")))
  .withColumn("partitionKey", kinesis2 ("unit_name"))
  .selectExpr("cast (data as BINARY)", "partitionKey")
  .repartition(10)

kinesis_result
  .select("data", "partitionKey")
  .writeStream
  .partitionBy("partitionKey")
  .option("kinesis.executor.maxConnections", "20")
  .format("kinesis")
  .option("endpointUrl", "https://kinesis.eu-central-1.amazonaws.com")
  .outputMode("append")
  .option("checkpointLocation", "s3_path")
  .option("streamName", "output_stream")
  .start()
  .awaitTermination()

`

from the spark UI i can see the following:

kinesis

Another thing is that, when I run
aws kinesis get-records --shard-iterator "?????"
for the output stream, I can see that records are coming sequentially and not in batch (approximate arrival times is different)

I want to understand if there are any limitations in kinesis sink part, that causes low throughput?

Add project to maven repository

It would be nice to submit the project to the maven repository to make it easier to include it in existing projects without having use the --jars flag.

source startingposition doesn't work for LATEST

When I try to read stream from latest point with .option("startingposition", "LATEST") , it doesn't read from latest point but from some other point, seems from some checkpoint.
I reset the application name or .option("checkpointLocation", "hdfs_path"), but it doesn't help, consumption does not start from the latest point in stream.

In contrast, when I put "TRIM_HORIZON", I can observe records from the beginning of stream.

Here is my source code

val kinesisDF = spark .readStream .format("kinesis") .option("streamName", "input_stream") .option("startingposition", "LATEST") .option("region", "eu-central-1") .load()

How can I force to read from latest point?

Spark 2.4 support

Is support for Spark 2.4 planned? Already being worked on? Do you already have an overview of what would need to be done to run with Spark 2.4?

Spark Streaming doesn't receive messages

I followed your tutorial but my Spark application doesn't consume messages from Kinesis, however another Java application works fine. I even able to view stream schema with printSchema method:

root
|-- data: binary (nullable = true)
|-- streamName: string (nullable = true)
|-- partitionKey: string (nullable = true)
|-- sequenceNumber: string (nullable = true)
|-- approximateArrivalTimestamp: timestamp (nullable = true)

But that's it, and no errors. Please help

Error while Fetching Shard Iterator

18/07/27 10:38:14 WARN KinesisSourceRDD: Neither LastSequenceNumber nor LastTimeStamp was recorded.
18/07/27 10:38:14 ERROR Executor: Exception in task 0.0 in stage 120.0 (TID 330)
java.lang.IllegalStateException: Error while Fetching Shard Iterator
at org.apache.spark.sql.kinesis.KinesisReader.org$apache$spark$sql$kinesis$KinesisReader$$retryOrTimeout(KinesisReader.scala:222)
at org.apache.spark.sql.kinesis.KinesisReader$$anonfun$3.apply(KinesisReader.scala:106)
at org.apache.spark.sql.kinesis.KinesisReader$$anonfun$3.apply(KinesisReader.scala:106)
at org.apache.spark.sql.kinesis.KinesisReader.runUninterruptibly(KinesisReader.scala:188)
at org.apache.spark.sql.kinesis.KinesisReader.getShardIterator(KinesisReader.scala:104)
at org.apache.spark.sql.kinesis.KinesisSourceRDD$$anon$1.getShardIterator(KinesisSourceRDD.scala:123)
at org.apache.spark.sql.kinesis.KinesisSourceRDD$$anon$1.getNext(KinesisSourceRDD.scala:139)
at org.apache.spark.sql.kinesis.KinesisSourceRDD$$anon$1.getNext(KinesisSourceRDD.scala:115)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.sql.kinesis.shaded.amazonaws.services.kinesis.model.InvalidArgumentException: The timestampInMillis parameter cannot be greater than the currentTimestampInMillis. timestampInMillis: 1532659093716, currentTimestampInMillis: 1532659091607 (Service: AmazonKinesis; Status Code: 400; Error Code: InvalidArgumentException; Request ID: 370a77ae-1ec2-4e9d-a61e-abccb99485cd)
at org.apache.spark.sql.kinesis.shaded.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1586)
at org.apache.spark.sql.kinesis.shaded.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1254)
at org.apache.spark.sql.kinesis.shaded.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1035)
at org.apache.spark.sql.kinesis.shaded.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:747)
at org.apache.spark.sql.kinesis.shaded.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:721)
at org.apache.spark.sql.kinesis.shaded.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:704)
at org.apache.spark.sql.kinesis.shaded.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:672)
at org.apache.spark.sql.kinesis.shaded.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:654)
at org.apache.spark.sql.kinesis.shaded.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:518)
at org.apache.spark.sql.kinesis.shaded.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:1805)
at org.apache.spark.sql.kinesis.shaded.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:1781)
at org.apache.spark.sql.kinesis.shaded.amazonaws.services.kinesis.AmazonKinesisClient.getShardIterator(AmazonKinesisClient.java:980)
at org.apache.spark.sql.kinesis.KinesisReader$$anonfun$3$$anonfun$apply$1.apply(KinesisReader.scala:107)
at org.apache.spark.sql.kinesis.KinesisReader$$anonfun$3$$anonfun$apply$1.apply(KinesisReader.scala:107)
at org.apache.spark.sql.kinesis.KinesisReader.org$apache$spark$sql$kinesis$KinesisReader$$retryOrTimeout(KinesisReader.scala:212)
... 23 more

Kinesis sink bug: child process has been shutdown (including fix)

There is a merged PR, that after merging received some more commits fixing a bug.

The bug, as described in the PR, manifests itself like so:

The child process has been shutdown and can no longer accept messages.

As the PR mentions, the fix should be implemented, but does still need to be merged into the code.

pyspark.sql.utils.StreamingQueryException

I am trying to execute this code in Python in a very similar manner as described in the main README.md of this repository:

kinesis = spark \
    .readStream \
    .format("kinesis") \
    .option("streamName", streamName) \
    .option("endpointUrl", endpointUrl)\
    .option("initialPosition", "earliest") \
    .option("region", region) \
    .option("awsAccessKey", awsAccessKey) \
    .option("awsSecretKey", awsSecretKey) \
    .option("startingposition", "TRIM_HORIZON")\
    .load()\


kinesis\
    .writeStream\
    .outputMode("append")\
    .format("console")\
    .start()\
    .awaitTermination()

But unfortunately I get this weird error

ERROR MicroBatchExecution: Query [id = 677e8cd0-9cee-4120-8602-7e9335112f80, runId = 65b8213f-8aa2-44d7-853c-6c7e5ead986b] terminated with error
java.lang.AbstractMethodError
	at org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala:99)
	at org.apache.spark.sql.kinesis.KinesisSource.initializeLogIfNecessary(KinesisSource.scala:51)
	at org.apache.spark.internal.Logging$class.log(Logging.scala:46)
	at org.apache.spark.sql.kinesis.KinesisSource.log(KinesisSource.scala:51)
	at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
	at org.apache.spark.sql.kinesis.KinesisSource.logInfo(KinesisSource.scala:51)
	at org.apache.spark.sql.kinesis.KinesisSource.prevBatchShardInfo(KinesisSource.scala:211)
	at org.apache.spark.sql.kinesis.KinesisSource.getOffset(KinesisSource.scala:105)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$5$$anonfun$apply$5.apply(MicroBatchExecution.scala:268)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$5$$anonfun$apply$5.apply(MicroBatchExecution.scala:268)
	at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$5.apply(MicroBatchExecution.scala:267)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$5.apply(MicroBatchExecution.scala:264)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:264)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets(MicroBatchExecution.scala:237)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:124)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
	at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Exception in thread "stream execution thread for [id = 677e8cd0-9cee-4120-8602-7e9335112f80, runId = 65b8213f-8aa2-44d7-853c-6c7e5ead986b]" java.lang.AbstractMethodError
	at org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala:99)
	at org.apache.spark.sql.kinesis.KinesisSource.initializeLogIfNecessary(KinesisSource.scala:51)
	at org.apache.spark.internal.Logging$class.log(Logging.scala:46)
	at org.apache.spark.sql.kinesis.KinesisSource.log(KinesisSource.scala:51)
	at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
	at org.apache.spark.sql.kinesis.KinesisSource.logInfo(KinesisSource.scala:51)
	at org.apache.spark.sql.kinesis.KinesisSource.prevBatchShardInfo(KinesisSource.scala:211)
	at org.apache.spark.sql.kinesis.KinesisSource.getOffset(KinesisSource.scala:105)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$5$$anonfun$apply$5.apply(MicroBatchExecution.scala:268)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$5$$anonfun$apply$5.apply(MicroBatchExecution.scala:268)
	at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$5.apply(MicroBatchExecution.scala:267)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$5.apply(MicroBatchExecution.scala:264)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:264)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets(MicroBatchExecution.scala:237)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:124)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
	at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Traceback (most recent call last):
  File "/home/main.py", line 27, in <module>
    .format("console")\
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 106, in awaitTermination
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 75, in deco
pyspark.sql.utils.StreamingQueryException: u'null\n=== Streaming Query ===\nIdentifier: [id = 677e8cd0-9cee-4120-8602-7e9335112f80, runId = 65b8213f-8aa2-44d7-853c-6c7e5ead986b]\nCurrent Committed Offsets: {}\nCurrent Available Offsets: {}\n\nCurrent State: ACTIVE\nThread State: RUNNABLE\n\nLogical Plan:\nKinesisSource[streamName]'

Which I can't debug or understand what is wrong. Any chance could you please help me to understand what might be the problem related?

P.S. the issue is the same reported here as well, on the Apache Spark Jira ticket. And yes, I am using Spark 2.3.2. So I raised the issue here as requested in that Jira ticket.

UPDATE: I was using that code through spark-submit. When I used pyspark (the Python correspondent interactive CLI of spark-shell) it seems working. At this point I would say that the problem is in the use of spark-submit

I am waiting for a kind reply

Issue with connector when there is resharding and the streaming job is not running

We have the following setup on EMR

We have a streaming job that reads from the kinesis stream. This streaming job gets kicked off when there is put activity on the stream and if the stream is inactive for a certain period of time the streaming job is terminated. This is being done so that we reduce the cost.

The issue arises, when the stream is scaled (up or down) when the streaming job is down. We see the following exception, when the streaming job is restarted and tries to process events in the stream.

We currently are stuck because we have to delete the checkpoint directory completely, which is something that I don;t like. Would it be possible to ignore the unknown shards and continue?

at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.spark.sql.kinesis.shaded.amazonaws.services.kinesis.model.ResourceNotFoundException: Shard shardId-000000000000 in stream dmp-fico-field-tenant-dms-k8s-dev under account 780944133628 does not exist (Service: AmazonKinesis; Status Code: 400; Error Code: ResourceNotFoundException; Request ID: d6e60a00-9054-5187-830f-a66d56e735dc)
at org.apache.spark.sql.kinesis.shaded.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639)
at org.apache.spark.sql.kinesis.shaded.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
at org.apache.spark.sql.kinesis.shaded.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)
at org.apache.spark.sql.kinesis.shaded.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
at org.apache.spark.sql.kinesis.shaded.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
at org.apache.spark.sql.kinesis.shaded.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
at org.apache.spark.sql.kinesis.shaded.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
at org.apache.spark.sql.kinesis.shaded.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
at org.apache.spark.sql.kinesis.shaded.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
at org.apache.spark.sql.kinesis.shaded.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2276)
at org.apache.spark.sql.kinesis.shaded.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2252)
at org.apache.spark.sql.kinesis.shaded.amazonaws.services.kinesis.AmazonKinesisClient.executeGetShardIterator(AmazonKinesisClient.java:1162)
at org.apache.spark.sql.kinesis.shaded.amazonaws.services.kinesis.AmazonKinesisClient.getShardIterator(AmazonKinesisClient.java:1138)

    at org.apache.spark.sql.kinesis.KinesisReader$$anonfun$3$$anonfun$apply$1.apply(KinesisReader.scala:121)

    at org.apache.spark.sql.kinesis.KinesisReader$$anonfun$3$$anonfun$apply$1.apply(KinesisReader.scala:121)

    at org.apache.spark.sql.kinesis.KinesisReader.org$apache$spark$sql$kinesis$KinesisReader$$retryOrTimeout(KinesisReader.scala:226)

    ... 24 more

19/09/04 12:12:29 ERROR MicroBatchExecution: Query [id = 9e5246c7-7d57-4127-af32-0c9633bdc6f2, runId = d96d1f92-5cf3-4ea6-9ff7-0ba0eb98a980] terminated with error

org.apache.spark.SparkException: Job aborted.

    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:224)

    at org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch(FileStreamSink.scala:131)

    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$16.apply(MicroBatchExecution.scala:475)

    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)

    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:473)

    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)

    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)

    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:472)

    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)

    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)

    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)

    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)

    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)

    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)

    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)

    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)

    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)

    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 8, ip-10-15-135-71.us-west-2.compute.internal, executor 3): java.lang.IllegalStateException: Error while Fetching Shard Iterator

TimeOut errors are not retried at KinesisReader

Hi,
We often see below error on our application logs and the application stops immediately without any retry.

2019-05-24 11:17:37 ERROR MicroBatchExecution:91 - Query [id = 05bd7633-d5d3-460b-84aa-2210a812eb35, runId = e026fa0a-7b1e-428f-8da0-fb0edf35337e] terminated with error
java.lang.IllegalStateException: Timed out after 2000 ms while Describe Streams, last exception:
        at org.apache.spark.sql.kinesis.KinesisReader$$anonfun$org$apache$spark$sql$kinesis$KinesisReader$$retryOrTimeout$6.apply(KinesisReader.scala:248)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.sql.kinesis.KinesisReader.org$apache$spark$sql$kinesis$KinesisReader$$retryOrTimeout(KinesisReader.scala:246)
        at org.apache.spark.sql.kinesis.KinesisReader$$anonfun$5.apply(KinesisReader.scala:168)
        at org.apache.spark.sql.kinesis.KinesisReader$$anonfun$5.apply(KinesisReader.scala:168)
        at org.apache.spark.sql.kinesis.KinesisReader.runUninterruptibly(KinesisReader.scala:202)
        at org.apache.spark.sql.kinesis.KinesisReader.describeKinesisStream(KinesisReader.scala:167)
        at org.apache.spark.sql.kinesis.KinesisReader.getShards(KinesisReader.scala:84)
        at org.apache.spark.sql.kinesis.KinesisSource.getOffset(KinesisSource.scala:117)

The actual cause of the above error is an UnknownHostException exceptions from Kinesis on certain regions. The application should not stop for these errors as they are retryable. These errors would go away if we retry again.

Caused by: java.net.UnknownHostException: kinesis.us-west-2.amazonaws.com
        at java.net.InetAddress.getAllByName0(InetAddress.java:1280)
        at java.net.InetAddress.getAllByName(InetAddress.java:1192)
        at java.net.InetAddress.getAllByName(InetAddress.java:1126)
        at org.apache.spark.sql.kinesis.shaded.amazonaws.SystemDefaultDnsResolver.resolve(SystemDefaultDnsResolver.java:27)

I see that the below line skips retrying for timeout errors.

https://github.com/qubole/kinesis-sql/blob/master/src/main/scala/org/apache/spark/sql/kinesis/KinesisReader.scala#L220

Any particular reason why these timeout errors are handled differently and not retried?

Null Pointer Exception in intermittent micro batches

Hi @itsvikramagr

We are getting null pointer exception in our micro batches (spark structure streaming). We are using S3 as check-point with trigger interval of 1 second.

2018-09-11 20:08:24 ERROR MicroBatchExecution:91 - Query [id = a87275fe-875f-41bf-a892-57602f8385b8, runId = f8dce2f3-7efd-4cbf-b04f-d0b2e5f769a3] terminated with error
java.lang.NullPointerException
at org.apache.spark.sql.kinesis.KinesisSource.prevBatchShardInfo(KinesisSource.scala:216)
at org.apache.spark.sql.kinesis.KinesisSource.getOffset(KinesisSource.scala:105)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$5$$anonfun$apply$5.apply(MicroBatchExecution.scala:268)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$5$$anonfun$apply$5.apply(MicroBatchExecution.scala:268)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$5.apply(MicroBatchExecution.scala:267)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$5.apply(MicroBatchExecution.scala:264)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:264)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:128)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)

java.lang.IllegalStateException: /checkpoint/round/sources/0/shard-commit/1 does not exist

Hey,

I am using Kinesis-sql repo 2.4.0 branch and when i am trying to write to checkpoint to S3 i am facing this issue.

Initially i thought this might be an issue with eventual consistency of S3 but seems like not. This is happening in my local as well.

Seems like there is an issue with folder creation in shard-commit,

Code snapshot:

val roundStreamDf = sc.readStream
.format("kinesis")
.option("streamName",streamName)
.option("endpointUrl",s"https://kinesis.${region}.amazonaws.com")
.option("awsAccessKeyId", acceskeyId)
.option("awsSecretKey", secretAccessKey)
.option("startingposition",START_POS)
.option("kinesis.client.describeShardInterval","3600")
.load()

val roundStreamData1 = roundStreamDf
  .selectExpr("cast (data as STRING) jsonData")
  .select(from_json(col("jsonData"), ScalaReflection.schemaFor[Round].dataType.asInstanceOf[StructType]).as("round"))
  .select("round.*")

val query = roundStreamData1
.writeStream
.foreachBatch{(batchDf: DataFrame, _ ) =>
val RoundDf = commonRoundDataProcess(batchDf)
RoundDf.persist()
while(opNameIterator.hasNext){
val opName = opNameIterator.next()
val finalRoundDf = RoundDf.filter(col("OperatorShortName") === opName)
accessDataS3.writeDataToRefinedHudiS3(sc,finalRoundDf,extraOptions,opName,ROUND_TYPE_OF_DATA)
}
RoundDf.unpersist()
}
.outputMode("update")
.trigger(Trigger.ProcessingTime("5 seconds"))
.option("checkpointLocation",s"${checkPointBucket}/checkpoint/${ROUND_TYPE_OF_DATA}/")
.start()

query.awaitTermination()
Log:
2019-10-30 15:06:44 ERROR MicroBatchExecution:91 - Query [id = 56565a9a-c8bc-4818-97ab-0cfe6b345b79, runId = e9623150-3306-4cc0-8b8e-d85edc811cff] terminated with error
java.lang.IllegalStateException: Gave up after 3 retries while fetching MetaData, last exception:
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$withRetry$2.apply(HDFSMetadataCommitter.scala:241)
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$withRetry$2.apply(HDFSMetadataCommitter.scala:241)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.withRetry(HDFSMetadataCommitter.scala:240)
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.get(HDFSMetadataCommitter.scala:150)
at org.apache.spark.sql.kinesis.KinesisSource.prevBatchShardInfo(KinesisSource.scala:272)
at org.apache.spark.sql.kinesis.KinesisSource.getOffset(KinesisSource.scala:157)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:344)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:341)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:341)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:554)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:337)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:183)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: java.lang.IllegalStateException: s3a://gat-datastreaming-resources-dev/checkpoint/round/sources/0/shard-commit/1 does not exist
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$get$1.apply(HDFSMetadataCommitter.scala:163)
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$get$1.apply(HDFSMetadataCommitter.scala:151)
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.withRetry(HDFSMetadataCommitter.scala:229)
... 30 more
org.apache.spark.sql.streaming.StreamingQueryException: Gave up after 3 retries while fetching MetaData, last exception:
=== Streaming Query ===
Identifier: [id = 56565a9a-c8bc-4818-97ab-0cfe6b345b79, runId = e9623150-3306-4cc0-8b8e-d85edc811cff]
Current Committed Offsets: {KinesisSource[gat-rounds-stream]: {"metadata":{"streamName":"gat-rounds-stream","batchId":"1"},"shardId-000000000000":{"iteratorType":"AFTER_SEQUENCE_NUMBER","iteratorPosition":"49600867143952561714236237710058640910088621183742246914"}}}
Current Available Offsets: {KinesisSource[gat-rounds-stream]: {"metadata":{"streamName":"gat-rounds-stream","batchId":"1"},"shardId-000000000000":{"iteratorType":"AFTER_SEQUENCE_NUMBER","iteratorPosition":"49600867143952561714236237710058640910088621183742246914"}}}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
Project [round#17.Type AS Type#19, round#17.PId AS PId#20L, round#17.RId AS RId#21, round#17.UId AS UId#22, round#17.GId AS GId#23, round#17.GS AS GS#24, round#17.STS AS STS#25, round#17.FGExtId AS FGExtId#26, round#17.Dev AS Dev#27, round#17.CTS AS CTS#28, round#17.PCcy AS PCcy#29, round#17.PSysXR AS PSysXR#30, round#17.BetTotal AS BetTotal#31, round#17.BetBonus AS BetBonus#32, round#17.WinTotal AS WinTotal#33, round#17.WinBonus AS WinBonus#34, round#17.GGR AS GGR#35, round#17.JpContr AS JpContr#36, round#17.JpPO AS JpPO#37, round#17.JpSeed AS JpSeed#38, round#17.OpName AS OpName#39]
+- Project [jsontostructs(StructField(Type,StringType,true), StructField(PId,LongType,true), StructField(RId,StringType,true), StructField(UId,StringType,true), StructField(GId,StringType,true), StructField(GS,StringType,true), StructField(STS,StringType,true), StructField(FGExtId,StringType,true), StructField(Dev,StringType,true), StructField(CTS,StringType,true), StructField(PCcy,StringType,true), StructField(PSysXR,DoubleType,true), StructField(BetTotal,DoubleType,true), StructField(BetBonus,DoubleType,true), StructField(WinTotal,DoubleType,true), StructField(WinBonus,DoubleType,true), StructField(GGR,DoubleType,true), StructField(JpContr,DoubleType,true), StructField(JpPO,DoubleType,true), StructField(JpSeed,DoubleType,true), StructField(OpName,StringType,true), jsonData#15, Some(Europe/Berlin)) AS round#17]
+- Project [cast(data#5 as string) AS jsonData#15]
+- StreamingExecutionRelation KinesisSource[gat-rounds-stream], [data#5, streamName#6, partitionKey#7, sequenceNumber#8, approximateArrivalTimestamp#9]

at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)

Caused by: java.lang.IllegalStateException: Gave up after 3 retries while fetching MetaData, last exception:
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$withRetry$2.apply(HDFSMetadataCommitter.scala:241)
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$withRetry$2.apply(HDFSMetadataCommitter.scala:241)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.withRetry(HDFSMetadataCommitter.scala:240)
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.get(HDFSMetadataCommitter.scala:150)
at org.apache.spark.sql.kinesis.KinesisSource.prevBatchShardInfo(KinesisSource.scala:272)
at org.apache.spark.sql.kinesis.KinesisSource.getOffset(KinesisSource.scala:157)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:344)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:341)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:341)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:554)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:337)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:183)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
... 1 more
Caused by: java.lang.IllegalStateException: s3a://gat-datastreaming-resources-dev/checkpoint/round/sources/0/shard-commit/1 does not exist
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$get$1.apply(HDFSMetadataCommitter.scala:163)
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$get$1.apply(HDFSMetadataCommitter.scala:151)
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.withRetry(HDFSMetadataCommitter.scala:229)
... 30 more

Process finished with exit code 0

Avoid Polling of DescribeStream Kinesis API call

Can we avoid polling Kinesis for re-sharding in getOffset? We can detect that a shard has Closed using previous Batch Offsets. And ask Kinesis for new Shards information only when there is any closed shard.

When running <mvn install -DskipTests> on branch 2.3.2 mvn fails

When running <mvn install -DskipTests> on branch 2.3.2 mvn fails

The error are

[ERROR] Failed to execute goal org.scalastyle:scalastyle-maven-plugin:1.0.0:check (default) on project spark-sql-kinesis_2.11: Failed during scalastyle execution: You have 3 Scalastyle violation(s). -> [Help 1]
org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal org.scalastyle:scalastyle-maven-plugin:1.0.0:check (default) on project spark-sql-kinesis_2.11: Failed during scalastyle execution
    at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:213)
    at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:154)
    at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:146)
    at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:117)
    at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:81)
    at org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build (SingleThreadedBuilder.java:56)
    at org.apache.maven.lifecycle.internal.LifecycleStarter.execute (LifecycleStarter.java:128)
    at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:305)
    at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:192)
    at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:105)
    at org.apache.maven.cli.MavenCli.execute (MavenCli.java:954)
    at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:288)
    at org.apache.maven.cli.MavenCli.main (MavenCli.java:192)
    at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:498)
    at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced (Launcher.java:289)
    at org.codehaus.plexus.classworlds.launcher.Launcher.launch (Launcher.java:229)
    at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode (Launcher.java:415)
    at org.codehaus.plexus.classworlds.launcher.Launcher.main (Launcher.java:356)
Caused by: org.apache.maven.plugin.MojoExecutionException: Failed during scalastyle execution
    at org.scalastyle.maven.plugin.ScalastyleViolationCheckMojo.performCheck (ScalastyleViolationCheckMojo.java:246)
    at org.scalastyle.maven.plugin.ScalastyleViolationCheckMojo.execute (ScalastyleViolationCheckMojo.java:205)
    at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo (DefaultBuildPluginManager.java:137)
    at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:208)
    at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:154)
    at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:146)
    at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:117)
    at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:81)
    at org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build (SingleThreadedBuilder.java:56)
    at org.apache.maven.lifecycle.internal.LifecycleStarter.execute (LifecycleStarter.java:128)
    at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:305)
    at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:192)
    at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:105)
    at org.apache.maven.cli.MavenCli.execute (MavenCli.java:954)
    at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:288)
    at org.apache.maven.cli.MavenCli.main (MavenCli.java:192)
    at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:498)
    at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced (Launcher.java:289)
    at org.codehaus.plexus.classworlds.launcher.Launcher.launch (Launcher.java:229)
    at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode (Launcher.java:415)
    at org.codehaus.plexus.classworlds.launcher.Launcher.main (Launcher.java:356)
Caused by: org.apache.maven.plugin.MojoFailureException: You have 3 Scalastyle violation(s).
    at org.scalastyle.maven.plugin.ScalastyleViolationCheckMojo.performCheck (ScalastyleViolationCheckMojo.java:238)
    at org.scalastyle.maven.plugin.ScalastyleViolationCheckMojo.execute (ScalastyleViolationCheckMojo.java:205)
    at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo (DefaultBuildPluginManager.java:137)
    at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:208)
    at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:154)
    at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:146)
    at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:117)
    at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:81)
    at org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build (SingleThreadedBuilder.java:56)
    at org.apache.maven.lifecycle.internal.LifecycleStarter.execute (LifecycleStarter.java:128)
    at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:305)
    at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:192)
    at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:105)
    at org.apache.maven.cli.MavenCli.execute (MavenCli.java:954)
    at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:288)
    at org.apache.maven.cli.MavenCli.main (MavenCli.java:192)
    at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:498)
    at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced (Launcher.java:289)
    at org.codehaus.plexus.classworlds.launcher.Launcher.launch (Launcher.java:229)
    at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode (Launcher.java:415)
    at org.codehaus.plexus.classworlds.launcher.Launcher.main (Launcher.java:356)
[ERROR]
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException

publish to maven central

Hi,

It seems that only the pom files are published to maven central thus --packages option with spark-submit is not working.

Thanks,

Israel

Proxy support

Hi,
does the library support proxy settings when connecting to Kinesis public endpoint?
My spark application runs within a VPC with a proxy configured to reach AWS public endpoints.
Thank you

Cannot access Kinesis. Status Code: 400; Error Code: UnrecognizedClientException

We configured IAM role with full access to Kinesis and assigned it to our ec2-user. But when it wants to connect to kinesis with you library we get an error:
WARN KinesisReader: Error while Describe stream The security token included in the request is invalid. (Service: AmazonKinesis; Status Code: 400; Error Code: UnrecognizedClientException
We don't set any keys.
Please help

Mistake in example with buggy writeStream

"kinesis
.selectExpr("CAST(rand() AS STRING) as partitionKey","CAST(data AS STRING)").as[(String,String)]
.groupBy("data").count()
.writeStream
.format("kinesis")
.outputMode("update")
.option("streamName", "spark-sink-example")
.option("endpointUrl", "https://kinesis.us-east-1.amazonaws.com")
.option("awsAccessKeyId", [ACCESS_KEY])
.option("awsSecretKey", [SECRET_KEY])
.start()
.awaitTermination()"

cannot work coz you have to get rid of the groupBy :
"java.lang.IllegalStateException: Required attribute 'partitionKey' not found"

I would add that readStream works but writeStream do not work at all, even with simple data doing : "streamingDdf.selectExpr("CAST(rand() AS STRING) as partitionKey","CAST(rand() AS STRING) as data")"

Kinesis Application State in Dynamo DB

Hi @itsvikramagr,

Thanks again for writing this library and providing constant support to community.

Our spark structure streaming code doesn't need to be stateful. We are ok with cluster or executors going down as long as Kinesis is managing consumer's application state.

https://docs.aws.amazon.com/streams/latest/dev/kinesis-record-processor-ddb.html

Could you shed some light on -

  1. Why Kinesis isn't able to manage offset/application state in dynamo db although kinesis sql is calling describe stream API?
  2. I don't see any place in kinesis-sql code which is setting application name for Kinesis.

Thanks in advance for your response!

Migrate to DataSource V2 APIs

Current work is developed against SPARK 2.2.X. There are new data-source APIs in SPARK-2.3.0. We need to migrate to those APIs

Kinesis Consumer lag behavior in multiple regions

Hi,
We have a stream in 5 regions and the application (deployed in one region) reads from these 5 regions and does a union on the streams and then performs the operations.

What we see from time to time is that the application lags in some regions. It is not necessarily a region with high workload. We use the metric "iteratorAgeMilliSeconds" to find the lag. And as you can see from the attached image, the job is able to catchup to the current traffic, and then immediately falls behind. This lag at times is multiple hours. This abrupt increase and decrease within few seconds is surprising. I am trying to understand why it behaves like this and how we can keep the lag under a minute.

Below are the 3 configurations we use. We have already tried tweaking these params.

          maxRecordsPerRead = "9000"
          maxRecordsReadPerShard = "100000"
          maxFetchTimeInMs = "20000"

Please let me know if you need any additional parameters.

kinesis

Kinesis Enhanced fan-out consumer ( using consumer ARN)

I am able to use consume the Kinesis stream using this jar as a normal consumer. When i updated the user account to Enhanced fan out consumer, i am unable to access the stream.

Do we have any way to access the stream as Enhanced fan out consumer?

Need to add option --jars spark-sql-kinesis_2.11-2.3.0.jar when running spark-submit

Hi,

I am using version 2.3.0 of this library with Spark 2.3.0, which I package together into a fat jar to be run on a remote env via spark-submit.

But, when I run the fat jar with spark submit I get the error:

Exception in thread β€œmain” java.lang.ClassNotFoundException: Failed to find data source: kinesis. Please find packages at http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:635)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:159)

Caused by: java.lang.ClassNotFoundException: kinesis.DefaultSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:618)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:618)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:618)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:618)
at scala.util.Try.orElse(Try.scala:84)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:618)
... 28 more

To fix this I need to add --jars spark-sql-kinesis_2.11-2.3.0.jar option when running spark-submit, which solves the issue.. but I am wondering why I see this even when I can see the classes are packaged within the fat jar?

Thanks in advance

Streaming code results in small parquet files

I am trying to use the this code to read from a kinesis stream(10 shards) and write parquet files into S3.

Here is fragment of my code
Dataset streamReader = sparkSession.readStream()
.format("kinesis")
.option("streamName", streamName)
.option("region", regionName)
.option("endpointUrl", endpointURL)
.option("initialPosition", "TRIM_HORIZON")
.option("executor.maxFetchTimeInMs", "100000")
.option("executor.maxFetchRecordsPerShard", "80000")
.option("executor.maxRecordPerRead", "10000")
.load();

query = eventsToWriteOut.writeStream()
.format("parquet")
.outputMode("append")
.option("checkpointLocation", checkpointLocation)
.option("path", path)
.option("charset", "UTF-8")
.partitionBy("solution_id", "env", "event_date")
.trigger(Trigger.ProcessingTime("5 secs"))
.start();

spark-submit --class --conf spark.default.parallelism=42 --conf spark.sql.shuffle.partitions=60 --conf spark.executor.cores=5 --num-executors=5 --master yarn --deploy-mode cluster --jars spark-sql-kinesis_2.11-2.3.0.jar event-ingress-transform.jar --streamName databricks-tenant1 --path --checkpointDirectory

My records size on the stream is pretty large (~300k) and what I get in the parquet files is ~10-20 records. What are the configurations I should use in order to get a larger set of records in the parquet file.

Not supporting sessionToken/securityToken?

I'm experimenting this locally where only session based token is available. The credential details are stored in the credential file with a profile name. I don't see a way to authenticate this way?

Add Support for Kinesis Sink

We need to add support for kinesis sink. In the first cut, the support needs to be added using older data source APIs. We would migrate it to use v2 APIs once it is more stable.

Protobuf version conflict for deaggregation of Kinesis records

Hello,

Great library! I am trying to use it with aggregated Kinesis records, and I am running into some kind of conflict, compiling for Spark 2.3.1. When the aggregated record is read:

Caused by: java.lang.NoClassDefFoundError: com/google/protobuf/ProtocolStringList
	at org.apache.spark.sql.kinesis.shaded.amazonaws.services.kinesis.clientlibrary.types.UserRecord.deaggregate(UserRecord.java:235)
	at org.apache.spark.sql.kinesis.shaded.amazonaws.services.kinesis.clientlibrary.types.UserRecord.deaggregate(UserRecord.java:176)
	at org.apache.spark.sql.kinesis.KinesisReader.deaggregateRecords(KinesisReader.scala:151)
	at org.apache.spark.sql.kinesis.KinesisSourceRDD$$anon$1.getNext(KinesisSourceRDD.scala:147)
	at org.apache.spark.sql.kinesis.KinesisSourceRDD$$anon$1.getNext(KinesisSourceRDD.scala:119)
	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
...

I see that spark-parent:2.3.0has <protobuf.version>2.5.0</protobuf.version> (which doesn't have ProtocolStringList as far as I understand), while the amazon kinesis client library has 2.6.1.

Any suggestion on how to best fix this? Using the JAR for 2.6.1 directly in $SPARK_HOME/jars/ seems to work but I am a bit worried about unexpected consequences of using a newer version overall.

Consuming from multiple Kinesis Streams

Hi,
I have a consumer application which needs to connect/consume from multiple Kinesis Streams.
However, using this library, I see that we can consume from only a single stream.
While trying to read from multiple Kinesis stream I got the following error:
Caused by: org.apache.spark.sql.kinesis.shaded.amazonaws.services.kinesis.model.AmazonKinesisException: 1 validation error detected: Value 'Stream_1,Stream_2' at 'streamName' failed to satisfy constraint: Member must satisfy regular expression pattern: [a-zA-Z0-9_.-]+ (Service: AmazonKinesis; Status Code: 400; Error Code: ValidationException; Request ID: f5bf9ad6-471f-17d7-a1e2-f6de108bdde7)

Could you shed some light on how we can mention multiple streams or if that is even possible from the same consumer?

Thanks in advance.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    πŸ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. πŸ“ŠπŸ“ˆπŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❀️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.