Code Monkey home page Code Monkey logo

pulsar-spark's Introduction

pulsar-spark

Version License

Unified data processing with Apache Pulsar and Apache Spark.

Prerequisites

  • Java 8 or later
  • Spark 3.4.0 or later
  • Pulsar 2.10.2 or later

Preparations

Link

Client library

For Scala/Java applications using SBT/Maven project definitions, link your application with the following artifact:

    groupId = io.streamnative.connectors
    artifactId = pulsar-spark-connector_{{SCALA_BINARY_VERSION}}
    version = {{PULSAR_SPARK_VERSION}}

Deploy

Client library

As with any Spark applications, spark-submit is used to launch your application.
pulsar-spark-connector_{{SCALA_BINARY_VERSION}} and its dependencies can be directly added to spark-submit using --packages.

Example

$ ./bin/spark-submit 
  --packages io.streamnative.connectors:pulsar-spark-connector_{{SCALA_BINARY_VERSION}}:{{PULSAR_SPARK_VERSION}}
  ...

CLI

For experimenting on spark-shell (or pyspark for Python), you can also use --packages to add pulsar-spark-connector_{{SCALA_BINARY_VERSION}} and its dependencies directly.

Example

$ ./bin/spark-shell 
  --packages io.streamnative.connectors:pulsar-spark-connector_{{SCALA_BINARY_VERSION}}:{{PULSAR_SPARK_VERSION}}
  ...

When locating an artifact or library, --packages option checks the following repositories in order:

  1. Local maven repository

  2. Maven central repository

  3. Other repositories specified by --repositories

The format for the coordinates should be groupId:artifactId:version.

For more information about submitting applications with external dependencies, see Application Submission Guide.

Usage

Read data from Pulsar

Create a Pulsar source for streaming queries

The following examples are in Scala.

// Subscribe to 1 topic
val df = spark
  .readStream
  .format("pulsar")
  .option("service.url", "pulsar://localhost:6650")
  .option("topic", "topic1")
  .load()
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to multiple topics
val df = spark
  .readStream
  .format("pulsar")
  .option("service.url", "pulsar://localhost:6650")
  .option("topics", "topic1,topic2")
  .load()
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to a topic pattern
val df = spark
  .readStream
  .format("pulsar")
  .option("service.url", "pulsar://localhost:6650")
  .option("topicsPattern", "topic.*")
  .load()
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

Tip

For more information on how to use other language bindings for Spark Structured Streaming, see Structured Streaming Programming Guide.

Create a Pulsar source for batch queries

If you have a use case that is better suited to batch processing, you can create a Dataset/DataFrame for a defined range of offsets.

The following examples are in Scala.

// Subscribe to 1 topic defaults to the earliest and latest offsets
val df = spark
  .read
  .format("pulsar")
  .option("service.url", "pulsar://localhost:6650")
  .option("topic", "topic1")
  .load()
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to multiple topics, specifying explicit Pulsar offsets
import org.apache.spark.sql.pulsar.JsonUtils._
val startingOffsets = topicOffsets(Map("topic1" -> messageId1, "topic2" -> messageId2))
val endingOffsets = topicOffsets(...)
val df = spark
  .read
  .format("pulsar")
  .option("service.url", "pulsar://localhost:6650")
  .option("topics", "topic1,topic2")
  .option("startingOffsets", startingOffsets)
  .option("endingOffsets", endingOffsets)
  .load()
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to a pattern, at the earliest and latest offsets
val df = spark
  .read
  .format("pulsar")
  .option("service.url", "pulsar://localhost:6650")
  .option("topicsPattern", "topic.*")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

Write data to Pulsar

The DataFrame written to Pulsar can have arbitrary schema, since each record in DataFrame is transformed as one message sent to Pulsar, fields of DataFrame are divided into two groups: __key, __eventTime and __messageProperties fields are encoded as metadata of Pulsar message; other fields are grouped and encoded using AVRO and put in value():

producer.newMessage().key(__key).value(avro_encoded_fields).eventTime(__eventTime)

Create a Pulsar sink for streaming queries

The following examples are in Scala.

// Write key-value data from a DataFrame to a specific Pulsar topic specified in an option
val ds = df
  .selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("pulsar")
  .option("service.url", "pulsar://localhost:6650")
  .option("topic", "topic1")
  .start()

// Write key-value data from a DataFrame to Pulsar using a topic specified in the data
val ds = df
  .selectExpr("__topic", "CAST(__key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("pulsar")
  .option("service.url", "pulsar://localhost:6650")
  .start()

Write the output of batch queries to Pulsar

The following examples are in Scala.

// Write key-value data from a DataFrame to a specific Pulsar topic specified in an option
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("pulsar")
  .option("service.url", "pulsar://localhost:6650")
  .option("topic", "topic1")
  .save()

// Write key-value data from a DataFrame to Pulsar using a topic specified in the data
df.selectExpr("__topic", "CAST(__key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("pulsar")
  .option("service.url", "pulsar://localhost:6650")
  .save()

Limitations

Currently, we provide at-least-once semantic. Consequently, when writing either streaming queries or batch queries to Pulsar, some records may be duplicated. A possible solution to remove duplicates when reading the written data could be to introduce a primary (unique) key that can be used to perform de-duplication when reading.

Configurations

OptionValueRequiredDefaultQueryTypeDescription
`service.url` The Pulsar `serviceUrl` String Yes None Streaming and Batch The Pulsar `serviceUrl` configuration for Pulsar service. Example: "pulsar://localhost:6650".
`admin.url` A service HTTP URL of your Pulsar cluster No None Streaming and Batch The Pulsar `serviceHttpUrl` configuration. Only needed when `maxBytesPerTrigger` is specified
`maxBytesPerTrigger` A long value in unit of number of bytes No None Streaming and Batch A soft limit of the maximum number of bytes we want to process per microbatch. If this is specified, `admin.url` also needs to be specified.
`predefinedSubscription` A Subscription name string No None Streaming and Batch The predefined subscription name used by the connector to track spark application progress.
`subscriptionPrefix` A subscription prefix string No None Streaming and Batch A prefix used by the connector to generate a random subscription to track spark application progress.
`topic` A topic name string Yes None Streaming and Batch The topic to be consumed. Only one of `topic`, `topics` or `topicsPattern` options can be specified for Pulsar source.
`topics` A comma-separated list of topics Yes None Streaming and Batch The topic list to be consumed. Only one of `topic`, `topics` or `topicsPattern` options can be specified for Pulsar source.
`topicsPattern` A Java regex string Yes None Streaming and Batch The pattern used to subscribe to topic(s). Only one of `topic`, `topics` or `topicsPattern` options can be specified for Pulsar source.
`pollTimeoutMs` A number string in unit of milliseconds No "120000" Streaming and Batch The timeout for reading messages from Pulsar. Example: `6000`.
`waitingForNonExistedTopic` The following are valid values: true or false
No "false" Streaming and Batch Whether the connector should wait until the desired topics are created. By default, the connector will not wait for the topic
`startingOffsets` The following are valid values:
  • "earliest"(streaming and batch queries)

  • "latest" (streaming query)

  • A JSON string

    Example

    """ {"topic-1":[8,11,16,101,24,1,32,1],"topic-5":[8,15,16,105,24,5,32,5]} """

No
  • "earliest"(batch query)

  • "latest"(streaming query)

Streaming and batch queries

startingOffsets option controls where a reader reads data from.

  • "earliest": lacks a valid offset, the reader reads all the data in the partition, starting from the very beginning.

  • "latest": lacks a valid offset, the reader reads from the newest records written after the reader starts running.

  • A JSON string: specifies a starting offset for each Topic.
    You can use org.apache.spark.sql.pulsar.JsonUtils.topicOffsets(Map[String, MessageId]) to convert a message offset to a JSON string.

Note:

  • For batch query, "latest" is not allowed, either implicitly specified or use MessageId.latest ([8,-1,-1,-1,-1,-1,-1,-1,-1,127,16,-1,-1,-1,-1,-1,-1,-1,-1,127]) in JSON.

  • For streaming query, "latest" only applies when a new query is started, and the resuming will always pick up from where the query left off. Newly discovered partitions during a query will start at "earliest".

`endingOffsets` The following are valid values:
  • "latest" (batch query)

  • A JSON string

Example

{"topic-1":[8,12,16,102,24,2,32,2],"topic-5":[8,16,16,106,24,6,32,6]}

No "latest" Batch query

endingOffsets option controls where a reader stops reading data.

  • "latest": the reader stops reading data at the latest record.

  • A JSON string: specifies an ending offset for each topic.

    Note:

    MessageId.earliest ([8,-1,-1,-1,-1,-1,-1,-1,-1,-1,1,16,-1,-1,-1,-1,-1,-1,-1,-1,-1,1]) is not allowed.

`startingTime` A number in unit of milliseconds No None batch queries For batch query, You can set a starting offset using milliseconds.
The target time of this option is publishTime.
Example: `1709254800000` (2024-03-01 01:00:00)
`endingTime` A number in unit of milliseconds No None batch queries For batch query, You can set a ending offset using milliseconds.
The target time of this option is publishTime.
Example: `1709254800000` (2024-03-01 02:00:00)
`failOnDataLoss` The following are valid values: true or false No true Streaming query

failOnDataLoss option controls whether to fail a query when data is lost (for example, topics are deleted, or messages are deleted because of retention policy).

This may cause a false alarm. You can set it to false when it doesn't work as you expected.

A batch query always fails if it fails to read any data from the provided offsets due to data loss.

`allowDifferentTopicSchemas` Boolean value No `false` Streaming query If multiple topics with different schemas are read, using this parameter automatic schema-based topic value deserialization can be turned off. In that way, topics with different schemas can be read in the same pipeline - which is then responsible for deserializing the raw values based on some schema. Since only the raw values are returned when this is `true`, Pulsar topic schema(s) are not taken into account during operation.
`pulsar.client.*` Pulsar Client configurations No None Streaming and Batch Client configurations. Example: "pulsar.client.authPluginClassName".

Please check Pulsar Client Configuration for more details

`pulsar.admin.*` Pulsar Admin configurations No None Streaming and Batch Admin configurations. Example: "pulsar.admin.tlsAllowInsecureConnection".

Please check Pulsar Admin Configuration for more details

`pulsar.reader.*` Pulsar Reader configurations No None Streaming and Batch Reader configurations. Example: "pulsar.reader.subscriptionName".

Please check Pulsar Reader Configuration for more details

`pulsar.producer.*` Pulsar Producer configurations No None Streaming and Batch Producer configurations. Example: "pulsar.producer.blockIfQueueFull".

Please check Pulsar Producer Configuration for more details

Authentication

Should the Pulsar cluster require authentication, credentials can be set in the following way.

The following examples are in Scala.

// Secure connection with authentication, using the same credentials on the
// Pulsar client and admin interface (if not given explicitly, the client configuration
// is used for admin as well).
val df = spark
  .readStream
  .format("pulsar")
  .option("service.url", "pulsar://localhost:6650")
  .option("pulsar.client.authPluginClassName","org.apache.pulsar.client.impl.auth.AuthenticationToken")
  .option("pulsar.client.authParams","token:<valid client JWT token>")
  .option("topicsPattern", "sensitiveTopic")
  .load()
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Secure connection with client TLS enabled.
// Note that the certificate file has to be present at the specified
// path on every machine of the cluster!
val df = spark
  .readStream
  .format("pulsar")
  .option("service.url", "pulsar+ssl://localhost:6651")
  .option("pulsar.admin.authPluginClassName","org.apache.pulsar.client.impl.auth.AuthenticationToken")
  .option("pulsar.admin.authParams","token:<valid admin JWT token>")
  .option("pulsar.client.authPluginClassName","org.apache.pulsar.client.impl.auth.AuthenticationToken")
  .option("pulsar.client.authParams","token:<valid client JWT token>")
  .option("pulsar.client.tlsTrustCertsFilePath","/path/to/tls/cert/cert.pem")
  .option("pulsar.client.tlsAllowInsecureConnection","false")
  .option("pulsar.client.tlsHostnameVerificationenable","true")
  .option("topicsPattern", "sensitiveTopic")
  .load()
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

Schema of Pulsar source

  • For topics without schema or with primitive schema in Pulsar, messages' payload is loaded to a value column with the corresponding type with Pulsar schema.
  • For topics with Avro or JSON schema, their field names and field types are kept in the result rows.
  • If the topicsPattern matches for topics which have different schemas, then setting allowDifferentTopicSchemas to true will allow the connector to read this content in a raw form. In this case it is the responsibility of the pipeline to apply the schema on this content, which is loaded to the value column.

Besides, each row in the source has the following metadata fields as well.

ColumnType
`__key` Binary
`__topic` String
`__messageId` Binary
`__publishTime` Timestamp
`__eventTime` Timestamp
`__messageProperties` Map < String, String >

Example

The topic of AVRO schema s in Pulsar is as below:

  case class Foo(i: Int, f: Float, bar: Bar)
  case class Bar(b: Boolean, s: String)
  val s = Schema.AVRO(Foo.getClass)

has the following schema as a DataFrame/DataSet in Spark:

root
 |-- i: integer (nullable = false)
 |-- f: float (nullable = false)
 |-- bar: struct (nullable = true)
 |    |-- b: boolean (nullable = false)
 |    |-- s: string (nullable = true)
 |-- __key: binary (nullable = true)
 |-- __topic: string (nullable = true)
 |-- __messageId: binary (nullable = true)
 |-- __publishTime: timestamp (nullable = true)
 |-- __messageProperties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

For Pulsar topic with Schema.DOUBLE, it's schema as a DataFrame is:

root
|-- value: double (nullable = false)
|-- __key: binary (nullable = true)
|-- __topic: string (nullable = true)
|-- __messageId: binary (nullable = true)
|-- __publishTime: timestamp (nullable = true)
|-- __eventTime: timestamp (nullable = true)
|-- __messageProperties: map (nullable = true)
|    |-- key: string
|    |-- value: string (valueContainsNull = true)

Build Spark Pulsar Connector

If you want to build a Spark-Pulsar connector reading data from Pulsar and writing results to Pulsar, follow the steps below.

  1. Checkout the source code.
$ git clone https://github.com/streamnative/pulsar-spark.git
$ cd pulsar-spark
  1. Install Docker.

Pulsar-spark connector is using Testcontainers for integration tests. In order to run the integration tests, make sure you have installed Docker.

  1. Set a Scala version.

Change scala.version and scala.binary.version in pom.xml.

Note

Scala version should be consistent with the Scala version of Spark you use.

  1. Build the project.
$ mvn clean install -DskipTests

If you get the following error during compilation, try running Maven with Java 8:

[ERROR] [Error] : Source option 6 is no longer supported. Use 7 or later.
[ERROR] [Error] : Target option 6 is no longer supported. Use 7 or later.
  1. Run the tests.
$ mvn clean install

Note: by configuring scalatest-maven-plugin in the usual ways, individual tests can be executed, if that is needed:

mvn -Dsuites=org.apache.spark.sql.pulsar.CachedPulsarClientSuite clean install

This might be handy if test execution is slower, or you get a java.io.IOException: Too many open files exception during full suite run.

Once the installation is finished, there is a fat jar generated under both local maven repo and target directory.

pulsar-spark's People

Contributors

amehochan avatar atezs82 avatar caseyrathbone avatar chaoqin-li1123 avatar dependabot[bot] avatar ericm-db avatar fossabot avatar honderdzevenduuzend avatar imaffe avatar jsteggink avatar nathluu avatar nlu90 avatar obobj avatar sijie avatar streamnativebot avatar syhily avatar tisonkun avatar violetkjs avatar yaalsn avatar yjshen avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

pulsar-spark's Issues

[FEATURE] Upgrade Pulsar client lib version to 2.9.2

Is your feature request related to a problem? Please describe.
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]

Describe the solution you'd like
The current pulsar version used is 2.4.2, it's quite old.
We need to upgrade to newer pulsar releases.

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.

[BUG] Failed to get the subscription indicator information related to Spark's consumption of Pulsar

Describe the bug
The subscription consumption information obtained through the command bin/pulsar-admin topics partitioned-stats persistent://qlm-ns/qlm-test/qlm-test1 --per-partition is all 0, which cannot truly display the consumption situation

To Reproduce

Steps to reproduce the behavior:
Consumption configuration information:

    val df = spark.readStream
      .format("pulsar")
      .option("service.url", serviceUrl)
      .option("admin.url", adminUrl)
      .option("topic", "persistent://qlm-ns/qlm-test/qlm-test1")
      .option("pulsar.client.authPluginClassName","org.apache.pulsar.client.impl.auth.AuthenticationToken")
      .option("pulsar.client.authParams",token)
      .load()

After starting to consume Pulsar, execute the command bin/pulsar-admin topics partitioned-stats persistent://qlm-ns/qlm-test/qlm-test1 --per-partition

Display all subscription consumption rate and throughput are 0
As shown below:

008vxvgGgy1h8vglwsk5aj313v0sxtc6

[FEATURE] Expose Pulsar-Client Metrics with Prometheus

Is your feature request related to a problem? Please describe.
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]

Describe the solution you'd like
A clear and concise description of what you want to happen.

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.

[BUG] Failed to create producer: Namespace is being unloaded, cannot add topic

Describe the bug
A running Spark application stopped working after a microbatch execution failed (HTTP 500 error from Pulsar), after the failure, the Spark Application seems to try to reconnect to the output topic but only gets "Could not get connection to broker: Namespace is being unloaded, cannot add topic".

Others consumers/producers are connected to the same Topic and are still working correctly.

To Reproduce
Pulsar (2.5.0) and Spark (2.4.4) are deployed on the same kubernetes cluster. Pulsar is deployed with default mini-helm charts. Spark is deployed with Spark-Operator.

The Spark application is a streaming Dataset (input and output are pulsar topics). The output topic name is dynamically created based on informations in the input message.

The application was working correctly during hours, and then an error occured (seems to be a temporary error from Pulsar) :

20/01/31 19:35:20 ERROR MicroBatchExecution: Query [id = 27bd0ffe-0c41-4a43-8d1b-2636549b6314, runId = 86420f14-ff24-4e1d-b2da-40627d6e5c6a] terminated with error
java.lang.RuntimeException: Failed to get last messageId for persistent://public/default/aom-emp
at org.apache.spark.sql.pulsar.PulsarMetadataReader$$anonfun$fetchLatestOffsets$1.liftedTree1$1(PulsarMetadataReader.scala:203)
at org.apache.spark.sql.pulsar.PulsarMetadataReader$$anonfun$fetchLatestOffsets$1.apply(PulsarMetadataReader.scala:197)
at org.apache.spark.sql.pulsar.PulsarMetadataReader$$anonfun$fetchLatestOffsets$1.apply(PulsarMetadataReader.scala:195)
...
org.apache.spark.sql.streaming.StreamingQueryException: Failed to get last messageId for persistent://public/default/aom-emp
=== Streaming Query ===
Identifier: [id = 27bd0ffe-0c41-4a43-8d1b-2636549b6314, runId = 86420f14-ff24-4e1d-b2da-40627d6e5c6a]
Current Committed Offsets: {org.apache.spark.sql.pulsar.PulsarMicroBatchReader@6d62ae90: {"persistent://public/default/aom-emp":[8,-114,-123,11,16,6]}}
Current Available Offsets: {org.apache.spark.sql.pulsar.PulsarMicroBatchReader@6d62ae90: {"persistent://public/default/aom-emp":[8,-114,-123,11,16,6]}}
Current State: ACTIVE
Thread State: RUNNABLE
Logical Plan:
Project [concat(objet#8, _, action#9) AS __topic#40, cast(window#24-T3660000ms as string) AS __key#41, sum(nb)#34L AS value#42L]
+- Aggregate [window#35-T3660000ms, objet#8, action#9], [window#35-T3660000ms AS window#24-T3660000ms, objet#8, action#9, sum(cast(nb#10 as bigint)) AS sum(nb)#34L]
+- Filter isnotnull(__eventTime#15-T3660000ms)
+- Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(__eventTime#15-T3660000ms, TimestampType, LongType) - 0) as double) / cast(3600000000 as double))) as double) = (cast((precisetimestampconversion(__eventTime#15-T3660000ms, TimestampType, LongType) - 0) as double) / cast(3600000000 as double))) THEN (CEIL((cast((precisetimestampconversion(__eventTime#15-T3660000ms, TimestampType, LongType) - 0) as double) / cast(3600000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(__eventTime#15-T3660000ms, TimestampType, LongType) - 0) as double) / cast(3600000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 3600000000) + 0), LongType, TimestampType), end, precisetimestampconversion((((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(__eventTime#15-T3660000ms, TimestampType, LongType) - 0) as double) / cast(3600000000 as double))) as double) = (cast((precisetimestampconversion(__eventTime#15-T3660000ms, TimestampType, LongType) - 0) as double) / cast(3600000000 as double))) THEN (CEIL((cast((precisetimestampconversion(__eventTime#15-T3660000ms, TimestampType, LongType) - 0) as double) / cast(3600000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(__eventTime#15-T3660000ms, TimestampType, LongType) - 0) as double) / cast(3600000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 3600000000) + 0) + 3600000000), LongType, TimestampType)) AS window#35-T3660000ms, objet#8, action#9, nb#10, __key#11, __topic#12, __messageId#13, __publishTime#14, __eventTime#15-T3660000ms]
+- EventTimeWatermark __eventTime#15: timestamp, interval 1 hours 1 minutes
+- StreamingExecutionRelation org.apache.spark.sql.pulsar.PulsarMicroBatchReader@6d62ae90, [objet#8, action#9, nb#10, __key#11, __topic#12, __messageId#13, __publishTime#14, __eventTime#15]
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:297)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
Caused by: java.lang.RuntimeException: Failed to get last messageId for persistent://public/default/aom-emp
at org.apache.spark.sql.pulsar.PulsarMetadataReader$$anonfun$fetchLatestOffsets$1.liftedTree1$1(PulsarMetadataReader.scala:203)
...
Caused by: org.apache.pulsar.shade.javax.ws.rs.InternalServerErrorException: HTTP 500 Internal Server Error
at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation.convertToException(JerseyInvocation.java:1098)
at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation.access$700(JerseyInvocation.java:99)
... 50 more

After this error the Spark application seems to try to reconnect regularly, but only have these errors :

20/01/31 23:57:20 ERROR ClientCnx: [id: 0xce703541, L:/10.42.0.28:37144 - R:172.16.101.114/172.16.101.114:30002] Close connection because received internal-server error org.apache.pulsar.client.api.PulsarClientException$LookupException: java.lang.IllegalStateException: Namespace bundle public/default/0x70000000_0x80000000 is being unloaded
20/01/31 23:57:20 WARN BinaryProtoLookupService: [persistent://public/default/AbsenceEnseignant_creation] failed to send lookup request : org.apache.pulsar.client.api.PulsarClientException$LookupException: org.apache.pulsar.client.api.PulsarClientException$LookupException: java.lang.IllegalStateException: Namespace bundle public/default/0x70000000_0x80000000 is being unloaded
20/01/31 23:57:20 WARN ConnectionHandler: [AbsenceEnseignant_creation] [terrific-dragonfly-pulsar-0-109] Error connecting to broker: org.apache.pulsar.client.api.PulsarClientException$LookupException: org.apache.pulsar.client.api.PulsarClientException$LookupException: java.lang.IllegalStateException: Namespace bundle public/default/0x70000000_0x80000000 is being unloaded
20/01/31 23:57:20 WARN ConnectionHandler: [AbsenceEnseignant_creation] [terrific-dragonfly-pulsar-0-109] Could not get connection to broker: org.apache.pulsar.client.api.PulsarClientException$LookupException: org.apache.pulsar.client.api.PulsarClientException$LookupException: java.lang.IllegalStateException: Namespace bundle public/default/0x70000000_0x80000000 is being unloaded -- Will try again in 0.195 s
20/02/01 01:42:20 ERROR ClientCnx: [id: 0x9c396ec5, L:/10.42.0.28:38560 - R:172.16.101.114/172.16.101.114:30002] Close connection because received internal-server error org.apache.pulsar.client.api.PulsarClientException$LookupException: java.lang.IllegalStateException: Namespace bundle public/default/0x70000000_0x80000000 is being unloaded
20/02/01 01:42:20 WARN BinaryProtoLookupService: [persistent://public/default/AbsenceEnseignant_creation] failed to send lookup request : org.apache.pulsar.client.api.PulsarClientException$LookupException: org.apache.pulsar.client.api.PulsarClientException$LookupException: java.lang.IllegalStateException: Namespace bundle public/default/0x70000000_0x80000000 is being unloaded
20/02/01 01:42:20 WARN ConnectionHandler: [AbsenceEnseignant_creation] [terrific-dragonfly-pulsar-0-109] Error connecting to broker: org.apache.pulsar.client.api.PulsarClientException$LookupException: org.apache.pulsar.client.api.PulsarClientException$LookupException: java.lang.IllegalStateException: Namespace bundle public/default/0x70000000_0x80000000 is being unloaded
20/02/01 01:42:20 WARN ConnectionHandler: [AbsenceEnseignant_creation] [terrific-dragonfly-pulsar-0-109] Could not get connection to broker: org.apache.pulsar.client.api.PulsarClientException$LookupException: org.apache.pulsar.client.api.PulsarClientException$LookupException: java.lang.IllegalStateException: Namespace bundle public/default/0x70000000_0x80000000 is being unloaded -- Will try again in 0.191 s
20/02/01 16:30:20 ERROR ClientCnx: [id: 0x9e3cd9fd, L:/10.42.0.28:60278 - R:172.16.101.114/172.16.101.114:30002] Close connection because received internal-server error org.apache.pulsar.client.api.PulsarClientException$LookupException: java.lang.IllegalStateException: Namespace bundle public/default/0x70000000_0x80000000 is being unloaded
20/02/01 16:30:20 WARN BinaryProtoLookupService: [persistent://public/default/AbsenceEnseignant_creation] failed to send lookup request : org.apache.pulsar.client.api.PulsarClientException$LookupException: org.apache.pulsar.client.api.PulsarClientException$LookupException: java.lang.IllegalStateException: Namespace bundle public/default/0x70000000_0x80000000 is being unloaded
20/02/01 16:30:20 WARN ConnectionHandler: [AbsenceEnseignant_creation] [terrific-dragonfly-pulsar-0-109] Error connecting to broker: org.apache.pulsar.client.api.PulsarClientException$LookupException: org.apache.pulsar.client.api.PulsarClientException$LookupException: java.lang.IllegalStateException: Namespace bundle public/default/0x70000000_0x80000000 is being unloaded
20/02/01 16:30:20 WARN ConnectionHandler: [AbsenceEnseignant_creation] [terrific-dragonfly-pulsar-0-109] Could not get connection to broker: org.apache.pulsar.client.api.PulsarClientException$LookupException: org.apache.pulsar.client.api.PulsarClientException$LookupException: java.lang.IllegalStateException: Namespace bundle public/default/0x70000000_0x80000000 is being unloaded -- Will try again in 0.193 s
20/02/02 16:51:21 WARN ClientCnx: [id: 0x035d4a85, L:/10.42.0.28:37146 - R:172.16.101.114/172.16.101.114:30002] Received error from server: Namespace is being unloaded, cannot add topic persistent://public/default/AbsenceEnseignant_creation
20/02/02 16:51:21 ERROR ProducerImpl: [AbsenceEnseignant_creation] [terrific-dragonfly-pulsar-0-109] Failed to create producer: Namespace is being unloaded, cannot add topic persistent://public/default/AbsenceEnseignant_creation
20/02/02 16:51:21 WARN ConnectionHandler: [AbsenceEnseignant_creation] [terrific-dragonfly-pulsar-0-109] Could not get connection to broker: Namespace is being unloaded, cannot add topic persistent://public/default/AbsenceEnseignant_creation -- Will try again in 0.185 s
20/02/03 20:24:20 WARN ClientCnx: [id: 0x035d4a85, L:/10.42.0.28:37146 - R:172.16.101.114/172.16.101.114:30002] Received error from server: Namespace is being unloaded, cannot add topic persistent://public/default/AbsenceEnseignant_creation
20/02/03 20:24:20 ERROR ProducerImpl: [AbsenceEnseignant_creation] [terrific-dragonfly-pulsar-0-109] Failed to create producer: Namespace is being unloaded, cannot add topic persistent://public/default/AbsenceEnseignant_creation
20/02/03 20:24:20 WARN ConnectionHandler: [AbsenceEnseignant_creation] [terrific-dragonfly-pulsar-0-109] Could not get connection to broker: Namespace is being unloaded, cannot add topic persistent://public/default/AbsenceEnseignant_creation -- Will try again in 0.198 s

Others producers and consumers have no issues connecting to the topic persistent://public/default/AbsenceEnseignant_creation

Expected behavior
After a temporary Pulsar error, the Streaming Spark application should succeed to reconnect to the topic.

[BUG] Exception on seek to a non-existence position

In Microbatch execution, when we get a MessageId from the last batch execution, we need to skip to the next position to start the current batch execution, currently, the next position is calculated as:

case (_: MessageIdImpl, cbmid: BatchMessageIdImpl) =>
          // we seek using a message id, this is supposed to be read by previous task since it's
          // inclusive for the last batch (start, end], so we skip this batch
          val newStart =
            new MessageIdImpl(cbmid.getLedgerId, cbmid.getEntryId + 1, cbmid.getPartitionIndex)
          consumer.seek(newStart)

however, when the current message is the last entry in a ledger, pulsar will complain SubscriptionInvalidCursorPosition since the messageId we generated doesn't exist.

The broker log could be seen as follows:

broker-log.txt

[FEATURE] Make this library available to maven central repository

Is your feature request related to a problem? Please describe.
Maven Central is the go to place to resolve any java dependency. Relying on the StreamNative hosted repository doesn't provide as much confidence.

Describe the solution you'd like
Make this library available to maven central repository for better reliability.

Additional context
Sometime org policies/firewall doesn't allow to resolve dependencies from 3rd party sources. This feature will help adoption of the library.

[FEATURE] upgrade Spark DataSource V2 APIs for unify streaming and batch

Thanks to @hanguyen6's proposal. in PR #42

Spark DataSource V2 APIs has been refactored to unify between streaming and batch. Should we upgrade our code to follow the updated abstraction:

https://issues.apache.org/jira/browse/SPARK-25390

write API refactor:
https://docs.google.com/document/d/1vI26UEuDpVuOjWw4WPoH2T6y8WAekwtI7qoowhOFnI4/edit#

read API refactoring:
https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit#

I have updated code to follow that abstraction and can help if needed.

[BUG] Incorrect subscription name used by PulsarProvider when reader subscription name is provided already

Describe the bug
Consider a scenario:

predefinedSubscription option is not provided.
pulsar.reader.subscriptionName option is provided
Optionally - subscriptionPrefix may or maynot be provided

In this case, as per logic, pulsar-spark will go ahead and subscribe to the topic using subscription name derived from $driverGroupIdPrefix-$topicPartition (here) which in turn is made up by $subscriptionPrefix-$topicPartition as that's what is passed to PulsarMetadataReader object here and other places.

But the issue is that since the option pulsar.reader.subscriptionName is set, the actual reader created here will use a partition-agnostic common subscription name.

To Reproduce
mentioned above

Expected behavior
Ideally, if a low level (reader level) override is used, the PulsarProvider and PulsarMetadataReader should use that vallue instead of deriving it from subscriptionPrefix and topicPartition combination.

[FEATURE] Allow configure predefined subscription for metadata reader

Is your feature request related to a problem? Please describe.
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]

Describe the solution you'd like
Allow users to provide a predefined pulsar subscription to manage the cursor.

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.

add auth token ,error: illegal cyclic reference involving object InterfaceAudience

Describe the bug
A clear and concise description of what the bug is.

To Reproduce
Steps to reproduce the behavior:

  1. Go to '...'
  2. Click on '....'
  3. Scroll down to '....'
  4. See error

dependencies

      <dependency>
            <groupId>org.apache.pulsar</groupId>
            <artifactId>pulsar-client</artifactId>
            <version>2.7.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/io.streamnative.connectors/pulsar-spark-connector -->
    <dependency>
        <groupId>io.streamnative.connectors</groupId>
        <artifactId>pulsar-spark-connector_2.11</artifactId>
        <version>2.4.5</version>
        <scope>provided</scope>
    </dependency>

Expected behavior

val df = spark
      .readStream
      .format("pulsar")
      .option("service.url", "pulsar://xxx:6650")
      .option("admin.url", "http://xxx:8080")
      .option("topic", "persistent://sd-bigdata/xxx")
      .option("auto.offset.reset",false)
      .option("startingOffsets", "earliest")
      .option("pulsar.admin.authPluginClassName","org.apache.pulsar.client.impl.auth.AuthenticationToken")
      .option("pulsar.admin.authParams","token:eyJhbGciOiJIUzI1Nixxxx")
      .option("pulsar.client.authpluginclassname","org.apache.pulsar.client.impl.auth.AuthenticationToken")
      .option("pulsar.client.authparams","token:eyJhbGciOiJIUzI1Nixxxx")
      .option("failOnDataLoss", false)
      .load()

Screenshots
If applicable, add screenshots to help explain your problem.

Additional context
Add any other context about the problem here.

Exception in thread "main" java.lang.ExceptionInInitializerError
at org.apache.spark.sql.pulsar.PulsarProvider$$anonfun$getClientParams$1.apply(PulsarProvider.scala:340)
at org.apache.spark.sql.pulsar.PulsarProvider$$anonfun$getClientParams$1.apply(PulsarProvider.scala:339)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.Map$Map2.foreach(Map.scala:137)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.pulsar.PulsarProvider$.getClientParams(PulsarProvider.scala:339)
at org.apache.spark.sql.pulsar.PulsarProvider$.org$apache$spark$sql$pulsar$PulsarProvider$$prepareConfForReader(PulsarProvider.scala:570)
at org.apache.spark.sql.pulsar.PulsarProvider.sourceSchema(PulsarProvider.scala:67)
at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:209)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:95)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:95)
at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:33)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:171)
at process.Pulsar2Hudi$.main(Pulsar2Hudi.scala:37)
at process.Pulsar2Hudi.main(Pulsar2Hudi.scala)
Caused by: scala.reflect.internal.Symbols$CyclicReference: illegal cyclic reference involving object InterfaceAudience
at scala.reflect.internal.Symbols$Symbol$$anonfun$info$3.apply(Symbols.scala:1523)
at scala.reflect.internal.Symbols$Symbol$$anonfun$info$3.apply(Symbols.scala:1521)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.reflect.internal.Symbols$Symbol.lock(Symbols.scala:567)
at scala.reflect.internal.Symbols$Symbol.info(Symbols.scala:1521)
at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$2.scala$reflect$runtime$SynchronizedSymbols$SynchronizedSymbol$$super$info(SynchronizedSymbols.scala:171)
at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$info$1.apply(SynchronizedSymbols.scala:127)
at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$info$1.apply(SynchronizedSymbols.scala:127)
at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:123)
at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$2.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:171)
at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.info(SynchronizedSymbols.scala:127)
at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$2.info(SynchronizedSymbols.scala:171)
at scala.reflect.runtime.JavaMirrors$JavaMirror.scala$reflect$runtime$JavaMirrors$JavaMirror$$coreLookup$1(JavaMirrors.scala:992)
at scala.reflect.runtime.JavaMirrors$JavaMirror.scala$reflect$runtime$JavaMirrors$JavaMirror$$lookupClass$1(JavaMirrors.scala:998)
at scala.reflect.runtime.JavaMirrors$JavaMirror.scala$reflect$runtime$JavaMirrors$JavaMirror$$classToScala1(JavaMirrors.scala:1003)
at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToScala$1.apply(JavaMirrors.scala:980)
at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToScala$1.apply(JavaMirrors.scala:980)
at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$toScala$1.apply(JavaMirrors.scala:97)
at scala.reflect.runtime.TwoWayCaches$TwoWayCache$$anonfun$toScala$1.apply(TwoWayCaches.scala:38)
at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
at scala.reflect.runtime.TwoWayCaches$TwoWayCache.toScala(TwoWayCaches.scala:33)
at scala.reflect.runtime.JavaMirrors$JavaMirror.toScala(JavaMirrors.scala:95)
at scala.reflect.runtime.JavaMirrors$JavaMirror.classToScala(JavaMirrors.scala:980)
at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaAnnotationProxy.(JavaMirrors.scala:163)
at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaAnnotationProxy$.apply(JavaMirrors.scala:162)
at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaAnnotationProxy$.apply(JavaMirrors.scala:162)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at scala.reflect.runtime.JavaMirrors$JavaMirror.scala$reflect$runtime$JavaMirrors$JavaMirror$$copyAnnotations(JavaMirrors.scala:683)
at scala.reflect.runtime.JavaMirrors$JavaMirror$FromJavaClassCompleter.load(JavaMirrors.scala:733)
at scala.reflect.runtime.JavaMirrors$JavaMirror$FromJavaClassCompleter.complete(JavaMirrors.scala:744)
at scala.reflect.internal.Symbols$Symbol.info(Symbols.scala:1535)
at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$2.scala$reflect$runtime$SynchronizedSymbols$SynchronizedSymbol$$super$info(SynchronizedSymbols.scala:171)
at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$info$1.apply(SynchronizedSymbols.scala:127)
at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$info$1.apply(SynchronizedSymbols.scala:127)
at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:123)
at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$2.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:171)
at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.info(SynchronizedSymbols.scala:127)
at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$2.info(SynchronizedSymbols.scala:171)
at scala.reflect.runtime.JavaMirrors$JavaMirror.scala$reflect$runtime$JavaMirrors$JavaMirror$$coreLookup$1(JavaMirrors.scala:992)
at scala.reflect.runtime.JavaMirrors$JavaMirror.scala$reflect$runtime$JavaMirrors$JavaMirror$$lookupClass$1(JavaMirrors.scala:998)
at scala.reflect.runtime.JavaMirrors$JavaMirror.scala$reflect$runtime$JavaMirrors$JavaMirror$$classToScala1(JavaMirrors.scala:1003)
at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToScala$1.apply(JavaMirrors.scala:980)
at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToScala$1.apply(JavaMirrors.scala:980)
at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$toScala$1.apply(JavaMirrors.scala:97)
at scala.reflect.runtime.TwoWayCaches$TwoWayCache$$anonfun$toScala$1.apply(TwoWayCaches.scala:38)
at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
at scala.reflect.runtime.TwoWayCaches$TwoWayCache.toScala(TwoWayCaches.scala:33)
at scala.reflect.runtime.JavaMirrors$JavaMirror.toScala(JavaMirrors.scala:95)
at scala.reflect.runtime.JavaMirrors$JavaMirror.classToScala(JavaMirrors.scala:980)
at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaAnnotationProxy.(JavaMirrors.scala:163)
at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaAnnotationProxy$.apply(JavaMirrors.scala:162)
at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaAnnotationProxy$.apply(JavaMirrors.scala:162)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at scala.reflect.runtime.JavaMirrors$JavaMirror.scala$reflect$runtime$JavaMirrors$JavaMirror$$copyAnnotations(JavaMirrors.scala:683)
at scala.reflect.runtime.JavaMirrors$JavaMirror$FromJavaClassCompleter.load(JavaMirrors.scala:733)
at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$typeParams$1.apply(SynchronizedSymbols.scala:142)
at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$typeParams$1.apply(SynchronizedSymbols.scala:133)
at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:123)
at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$8.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:168)
at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.typeParams(SynchronizedSymbols.scala:132)
at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$8.typeParams(SynchronizedSymbols.scala:168)
at scala.reflect.internal.Types$class.isRawIfWithoutArgs(Types.scala:3791)
at scala.reflect.internal.SymbolTable.isRawIfWithoutArgs(SymbolTable.scala:16)
at scala.reflect.internal.tpe.TypeMaps$$anon$1.apply(TypeMaps.scala:328)
at scala.reflect.runtime.JavaMirrors$JavaMirror.typeToScala(JavaMirrors.scala:1086)
at scala.reflect.runtime.JavaMirrors$JavaMirror.scala$reflect$runtime$JavaMirrors$JavaMirror$$jfieldAsScala1(JavaMirrors.scala:1131)
at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$scala$reflect$runtime$JavaMirrors$JavaMirror$$jfieldAsScala$1.apply(JavaMirrors.scala:1126)
at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$scala$reflect$runtime$JavaMirrors$JavaMirror$$jfieldAsScala$1.apply(JavaMirrors.scala:1126)
at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$toScala$1.apply(JavaMirrors.scala:97)
at scala.reflect.runtime.TwoWayCaches$TwoWayCache$$anonfun$toScala$1.apply(TwoWayCaches.scala:38)
at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
at scala.reflect.runtime.TwoWayCaches$TwoWayCache.toScala(TwoWayCaches.scala:33)
at scala.reflect.runtime.JavaMirrors$JavaMirror.toScala(JavaMirrors.scala:95)
at scala.reflect.runtime.JavaMirrors$JavaMirror.scala$reflect$runtime$JavaMirrors$JavaMirror$$jfieldAsScala(JavaMirrors.scala:1126)
at scala.reflect.runtime.JavaMirrors$JavaMirror$FromJavaClassCompleter$$anonfun$completeRest$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$3.apply(JavaMirrors.scala:781)
at scala.reflect.runtime.JavaMirrors$JavaMirror$FromJavaClassCompleter$$anonfun$completeRest$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$3.apply(JavaMirrors.scala:781)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.reflect.runtime.JavaMirrors$JavaMirror$FromJavaClassCompleter$$anonfun$completeRest$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(JavaMirrors.scala:781)
at scala.reflect.runtime.JavaMirrors$JavaMirror$FromJavaClassCompleter$$anonfun$completeRest$1.apply$mcV$sp(JavaMirrors.scala:791)
at scala.reflect.runtime.JavaMirrors$JavaMirror$FromJavaClassCompleter$$anonfun$completeRest$1.apply(JavaMirrors.scala:749)
at scala.reflect.runtime.JavaMirrors$JavaMirror$FromJavaClassCompleter$$anonfun$completeRest$1.apply(JavaMirrors.scala:749)
at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
at scala.reflect.runtime.JavaMirrors$JavaMirror$FromJavaClassCompleter.completeRest(JavaMirrors.scala:749)
at scala.reflect.runtime.JavaMirrors$JavaMirror$FromJavaClassCompleter.complete(JavaMirrors.scala:745)
at scala.reflect.internal.Symbols$Symbol.info(Symbols.scala:1535)
at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$8.scala$reflect$runtime$SynchronizedSymbols$SynchronizedSymbol$$super$info(SynchronizedSymbols.scala:168)
at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$info$1.apply(SynchronizedSymbols.scala:127)
at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$info$1.apply(SynchronizedSymbols.scala:127)
at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:123)
at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$8.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:168)
at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.info(SynchronizedSymbols.scala:127)
at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$8.info(SynchronizedSymbols.scala:168)
at scala.reflect.internal.Types$TypeRef.thisInfo(Types.scala:2199)
at scala.reflect.internal.Types$TypeRef.baseClasses(Types.scala:2204)
at scala.reflect.internal.tpe.FindMembers$FindMemberBase.(FindMembers.scala:17)
at scala.reflect.internal.tpe.FindMembers$FindMembers.(FindMembers.scala:194)
at scala.reflect.internal.Types$Type.scala$reflect$internal$Types$Type$$findMembersInternal$1(Types.scala:1004)
at scala.reflect.internal.Types$Type.findMembers(Types.scala:1005)

[BUG] A persistent random subscription is created each time consumption is started

Describe the bug
A persistent random subscription name is created every time consumption is started. It is neither in the form of a reader prefixed with a subscription name nor in the form of a persistent subscription name with a fixed name, which will cause the cluster to generate a large number of subscriptions.

To Reproduce
Steps to reproduce the behavior:
Consumption configuration information:
val df = spark.readStream
.format("pulsar")
.option("service.url", serviceUrl)
.option("admin.url", adminUrl)
.option("topic", "persistent://qlm-ns/qlm-test/qlm-test1")
.option("pulsar.client.authPluginClassName","org.apache.pulsar.client.impl.auth.AuthenticationToken")
.option("pulsar.client.authParams",token)
.load()

"persistent://qlm-ns/qlm-test/qlm-test1" has 5 partitions, execute the command: "bin/pulsar-admin topics subscriptions persistent://qlm-ns/qlm-test/qlm-test1 "View:
https://tva1.sinaimg.cn/large/008vxvgGgy1h8vgzhirlxj30ve0u0alx.jpg

Every time the spark task is started, 5 persistent random subscription names will be created

Expected behavior
Hope to provide a reader consumption method with a subscription name prefix or a persistent consumption method with a fixed subscription name

[Release] The requirements of pulsar-spark release

To release pulsar-spark via streamnative-ci, the following items are required:

  1. get-project-version.sh and set-project-version.sh
    Please make a new directory named scripts in the repository and create the corresponding scripts to get/set the project version.

  2. Create a project directory under the streamnative-ci/projects/
    Please submit a project directory to streamnative-ci/projects. In general, under the submitted project, there should be three shell scripts, i.e. publish.sh, get-version.sh and set-version.sh. To add this directory, You can refer to this document and the directories submitted by other projects in streamnative-ci/projects. publish.sh indicates how to deploy this project to Maven Central. (i.e, some commands like mvn -U -q clean deploy -DskipTests -Dcheckstyle.skip=true)

  3. Please add release plugins and change the groupId from org.apache.pulsar to io.streamnative
    You can refer this part in pulsar-flink

Please contact me if you have any question :)

[FEATURE] Allow configure the connector to wait indefinitely if topic not exists

Is your feature request related to a problem? Please describe.
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]

Describe the solution you'd like
A clear and concise description of what you want to happen.

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.

[FEATURE] Add endingtime feature

Is your feature request related to a problem? Please describe.
Sometimes we need to specify the start time and end time to pull a batch of data during the period, but currently only the startingtime can be specified.

Describe the solution you'd like
At the beginning of the program, read through seek to the specified endingtime messageId.

#95

[FEATURE] Using custom subscription name

Is your feature request related to a problem? Please describe.
When the Spark Job starts, currently it uses random subscription names for consuming data. This makes it harder to log and debug.

Describe the solution you'd like
The subscription name can be taken as an option to the spark reader format. If the subscription name is not provided, we can continue generating random subscription name.

Need a newer pulsar-spark version with pulsar-cient version 2.9+

We are trying to use this adapter with pyspark. One of the challenges we are facing is setting custom subscription name for the individual readers that are created. The current version - 3.1.1.4 uses pulsar client 2.4 which doesn't have a way to set the full subscription name to something custom. It always appends a random UUID string.

Latest pulsar client 2.9 fixes that. This repo's master branch is already updated to 2.9 pulsar client but there has been no release out of master since.
When we tried locally building from master which has the pulsar client 2.9 dependency, (or taking the 3.1 branch and updating pulsar-client version to 2.9), we get the following error from pyspark when we eventually use the jar:

java.lang.ExceptionInInitializerError
        at org.apache.spark.sql.pulsar.PulsarProvider$.$anonfun$getClientParams$3(PulsarProvider.scala:259)
        at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
        at scala.collection.immutable.Map$Map2.foreach(Map.scala:273)
        at scala.collection.TraversableLike.map(TraversableLike.scala:286)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
        at scala.collection.AbstractTraversable.map(Traversable.scala:108)
        at org.apache.spark.sql.pulsar.PulsarProvider$.getClientParams(PulsarProvider.scala:258)
        at org.apache.spark.sql.pulsar.PulsarProvider$.org$apache$spark$sql$pulsar$PulsarProvider$$prepareConfForReader(PulsarProvider.scala:513)
        at org.apache.spark.sql.pulsar.PulsarProvider.sourceSchema(PulsarProvider.scala:60)
        at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:236)
        at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:118)
        at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:118)
        at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:34)
        at org.apache.spark.sql.streaming.DataStreamReader.loadInternal(DataStreamReader.scala:168)
        at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:144)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.lang.Thread.run(Thread.java:748)
Caused by: scala.reflect.internal.Symbols$CyclicReference: illegal cyclic reference involving class InterfaceAudience

When set the checkpoint option about 'checkpointLocation',missing commons-io in the shade jar

VERSION: pulsar-spark-connector_2.11-2.4.1

StreamingQuery query = df.select(col("__eventTime").as("event_time"))
.withWatermark("event_time","1 minute")
.groupBy(window(col("event_time"),"1 minute","1 minute"))
//.groupBy("event_time")
.count().withColumn("process_time",current_timestamp())
.writeStream()
.outputMode("append")
.format("console")
//.trigger(Trigger.ProcessingTime("1 minute"))
.option("checkpointLocation","G:/output/checkpoint")
.start();
query.awaitTermination();

java.lang.NoClassDefFoundError: org/apache/pulsar/shade/org/apache/commons/io/IOUtils
at org.apache.spark.sql.pulsar.PulsarSourceInitialOffsetWriter.deserialize(PulsarSources.scala:140)
at org.apache.spark.sql.pulsar.PulsarSourceInitialOffsetWriter.deserialize(PulsarSources.scala:127)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.get(HDFSMetadataLog.scala:145)
at org.apache.spark.sql.pulsar.PulsarSourceInitialOffsetWriter.getInitialOffset(PulsarSources.scala:164)

[BUG] Error when resetting subscription

Describe the bug
Randomly (or at least I havent found the exact moment when it happens), when the Spark application receives a message from Pulsar I get this exception : "org.apache.pulsar.client.api.PulsarClientException: Error when resetting subscription: 142926:2" and stops working.

To Reproduce
I'm using Pulsar 2.5.0, deployed on kubernetes with the default mini helm configuration (no custom configuration) and Spark 2.4.4 running locally (spark-sql_2.11 and pulsar-spark-connector_2.11 packages).

I use this code in the Spark application :

`
String serviceUrl = "pulsar://172.16.101.114:30002";
String adminUrl = "http://172.16.101.114:30001";

    SparkSession spark = SparkSession.builder().appName("PulsarTest").master("local[2]").getOrCreate();
	
    spark.sparkContext().setLogLevel("WARN");
	
    Dataset<Row> dataset = spark
            .readStream()
            .format("pulsar")
            .option("service.url", serviceUrl)
            .option("admin.url", adminUrl)
            .option("topic", "aom-emp")
            .load();
			
    StreamingQuery query = dataset
            .withWatermark("__eventTime", "10 minutes")
            .groupBy(window(dataset.col("__eventTime"), "1 minute"), col("objet"), col("action"))
            .sum("nb")
            .select(
                    concat(col("objet"), lit("-"), col("action")).as("__topic"),
                    col("window").cast(DataTypes.StringType).as("__key"),
                    col("sum(nb)").as("value"))
            .writeStream()
            .outputMode("update")
            .format("pulsar")
            .option("service.url", serviceUrl)
            .option("admin.url", adminUrl)
            .option("checkpointLocation", "/tmp/checkpoint")
            .start();
			
    query.awaitTermination();`

172.16.101.114 is the IP of the Pulsar Proxy.

When I send messages in "aom-emp", most of the times it works but sometimes the exception occurs and the Spark application crashes.

Expected behavior
The application runs forever with no error.

[BUG] The requirements of pulsar-spark with token release

Describe the bug
A clear and concise description of what the bug is.

To Reproduce
Steps to reproduce the behavior:

  1. Go to '...'
  2. Click on '....'
  3. Scroll down to '....'
  4. See error

Expected behavior
A clear and concise description of what you expected to happen.

Screenshots
If applicable, add screenshots to help explain your problem.

Additional context
Add any other context about the problem here.

[BUG] Pulsar SQL can NOT read topic data correctly which were written by Spark-Pulsar

Describe the bug
I just write number 1 to 10 into a Pulsar Topic thru spark-pulsar connector . When I use Pulsar SQL to read the 10-number, the exception info occured "Query 20190902_101219_00035_9r62s failed: Topic persistent://public/default/topic_test1 does not have a valid schema"

Screenshots

  1. A simple Spark-Pulsar code to write 1-10 value into a Pulsar Topic
    image

  2. Exceptional Info from Pulsar SQL CLI while reading topic data
    image

[BUG] unnecessary subscription created when processing batches

Describe the bug

Why do we need to create subscription when reader API is used and Spark manages offsets on its own.
I can see new subscriptions got created and deleted by PulsarMetadataReader before and after committing a batch but dont see it useful. Is that legacy code and should be removed ?

Expected behavior

  • No subscription creation needed when reader API is used

Additional context
Spark version 2.4.5 and Pulsar version 2.5.0

Pulsar-spark connector for Spark Streaming.

We already have spark structured streaming based pulsar connector. However I am unable to find any DStream based pulsar connector for Spark streaming.
Is it possible today with streamnative?
Is there any workaround suggestions?

[FEATURE] Can I have one topic for each subscription name

Is your feature request related to a problem? Please describe.
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]

Describe the solution you'd like
A clear and concise description of what you want to happen.

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.

Need a newer pulsar-spark version with pulsar-cient version 2.9+

Creating a new ticket as #88 was wrongfully closed.

We are trying to use this adapter with pyspark. One of the challenges we are facing is setting custom subscription name for the individual readers that are created. The current version - 3.1.1.4 uses pulsar client 2.4 which doesn't have a way to set the full subscription name to something custom. It always appends a random UUID string.

Latest pulsar client 2.9 fixes that. This repo's master branch is already updated to 2.9 pulsar client but there has been no release out of master since.
When we tried locally building from master which has the pulsar client 2.9 dependency, (or taking the 3.1 branch and updating pulsar-client version to 2.9), we get the following error from pyspark when we eventually use the jar:

java.lang.ExceptionInInitializerError
        at org.apache.spark.sql.pulsar.PulsarProvider$.$anonfun$getClientParams$3(PulsarProvider.scala:259)
        at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
        at scala.collection.immutable.Map$Map2.foreach(Map.scala:273)
        at scala.collection.TraversableLike.map(TraversableLike.scala:286)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
        at scala.collection.AbstractTraversable.map(Traversable.scala:108)
        at org.apache.spark.sql.pulsar.PulsarProvider$.getClientParams(PulsarProvider.scala:258)
        at org.apache.spark.sql.pulsar.PulsarProvider$.org$apache$spark$sql$pulsar$PulsarProvider$$prepareConfForReader(PulsarProvider.scala:513)
        at org.apache.spark.sql.pulsar.PulsarProvider.sourceSchema(PulsarProvider.scala:60)
        at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:236)
        at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:118)
        at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:118)
        at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:34)
        at org.apache.spark.sql.streaming.DataStreamReader.loadInternal(DataStreamReader.scala:168)
        at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:144)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.lang.Thread.run(Thread.java:748)
Caused by: scala.reflect.internal.Symbols$CyclicReference: illegal cyclic reference involving class InterfaceAudience

[BUG] Durable subscription created, and not able to delete the subscription on the partitioned topic

Describe the bug
My understanding of this spark adapter would be creating a reader interface with each of the partitions of the topic. And once the spark job quits, these subscriptions will be removed by pulsar since they are supposed to be NonDurable.

To Reproduce
Steps to reproduce the behavior:

  1. Create a partitioned topic
  2. Use Spark-Shell with below command:
val df = spark.readStream.format("pulsar").option("service.url", "pulsar://pulsar.vpc:6650").option("admin.url", "http://pulsar.vpc:8080/").option("topics", "test.json").load()
  1. You will see subscriptions being created in such format:
 spark-pulsar-c51b97c9-2c9a-4d5a-af55-21312df9072a--252816474-persistent://public/default/test.json-partition-0

Expected behavior
After quitting the spark-shell, we should see this subscription to be removed, However, they stayed there forever.
Using the pulsar-admin unsubscribe command doesn't help to remove these subscriptions.

bin/pulsar-admin topics unsubscribe -s 'spark-pulsar-c51b97c9-2c9a-4d5a-af55-21312df9072a--252816474-persistent://public/default/test.json-partition-0' persistent://public/default/test.json
2023-01-06T03:56:31,942+0000 [AsyncHttpClient-7-1] WARN  org.apache.pulsar.client.admin.internal.BaseResource - [http://localhost:8080/admin/v2/persistent/public/default/test.json/subscription/spark-pulsar-c51b97c9-2c9a-4d5a-af55-21312df9072a--252816474-persistent%253A%252F%252Fpublic%252Fdefault%252Ftest.json-partition-0?force=false] Failed to perform http delete request: javax.ws.rs.NotFoundException: HTTP 404 Subscription not found
Subscription not found

Reason: Subscription not found

The only way to get rid of these subscriptions is by treating each partition as an individual non-partitioned topic, and find out their subscriptions and then delete them.

When checking the internal stats, these subscriptions were marked as durable:

bin/pulsar-admin topics  partitioned-stats   persistent://kafka/archiving/useractions.json

...
   "spark-pulsar-c51b97c9-2c9a-4d5a-af55-21312df9072a--252816474-persistent://public/default/test.json-partition-0" : {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "bytesOutCounter" : 0,
      "msgOutCounter" : 0,
      "msgRateRedeliver" : 0.0,
      "messageAckRate" : 0.0,
      "chunkedMessageRate" : 0,
      "msgBacklog" : 3,
      "backlogSize" : 0,
      "earliestMsgPublishTimeInBacklog" : 0,
      "msgBacklogNoDelayed" : 3,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "msgDelayed" : 0,
      "unackedMessages" : 0,
      "msgRateExpired" : 0.0,
      "totalMsgExpired" : 0,
      "lastExpireTimestamp" : 0,
      "lastConsumedFlowTimestamp" : 0,
      "lastConsumedTimestamp" : 0,
      "lastAckedTimestamp" : 0,
      "lastMarkDeleteAdvancedTimestamp" : 0,
      "consumers" : [ ],
      "isDurable" : true,
      "isReplicated" : false,
      "allowOutOfOrderDelivery" : false,
      "consumersAfterMarkDeletePosition" : { },
      "nonContiguousDeletedMessagesRanges" : 0,
      "nonContiguousDeletedMessagesRangesSerializedSize" : 0,
      "subscriptionProperties" : { },
 >>>>     "durable" : true,
      "replicated" : false
    },
...

[FEATURE] Pulsar Spark Integration

Spark Pulsar Connector could be roughly divided into two parts: the support for batch tasks and the support for stream tasks; furthermore, to enable stream mode processing, we need to incorporate continuous mode as well as micro-batch mode.

Basics:

  • Batch Mode
  • Stream Mode
    • Continuous
    • MicroBatch
  • Documentation and examples on how to use Pulsar in Spark
  • Integration tests

Schema Part

  • Basic types and Avro
  • read with a different versioned schema
  • Avro complex types . -- union, map, array, decimal . 112425b
  • JSON deserialization 7dc3005
  • event time -- metadata field write 4fe2917
  • revisit nullable semantics #70

Multi-partitioned topic

  • partition add / delete discovery . c26b612

[BUG] Spark can't start read stream- NullPointerException in pulsar-spark-connector V3.1.1.2

Describe the bug
While trying to start spark structured streaming read stream, connector throws NullPointerException.

pulsar-connector jar - “io.streamnative.connectors:pulsar-spark-connector_2.12:3.1.1.2”
PySpark version = 3.1.2
python version  = 3.7

Code snippet throwing error is-


eventsDF = spark_session.readStream \
    .format("pulsar") \
    .option("service.url", service_url) \
    .option("admin.url", admin_url) \
    .option("topics", topic) \
    .option("subscriptionprefix", "nlu-test") \
    .load() \
    .selectExpr("CAST(value AS STRING)") \
    .select(from_json("value",schema_item).alias("event")) \
    .select("event.*").repartition(partitions)

Error Trace is -

Traceback (most recent call last):
  File "/Users/ashi/MyDev/workspace/src/post_call_etl_job/main_local.py", line 361, in <module>
    compute_df = pci.events_queueDF(spark, topic_compute, schema, partition_count)
  File "/Users/ashi/MyDev/workspace/src/post_call_etl_job/main_local.py", line 102, in events_queueDF
    .option("subscriptionprefix", "nlu-test") \
  File "/Users/ashi/MyDev/workspace/.venv/lib/python3.7/site-packages/pyspark/sql/streaming.py", line 482, in load
    return self._df(self._jreader.load())
  File "/Users/ashi/MyDev/workspace/.venv/lib/python3.7/site-packages/py4j/java_gateway.py", line 1305, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/Users/ashi/MyDev/workspace/.venv/lib/python3.7/site-packages/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/Users/ashi/MyDev/workspace/.venv/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o63.load.
: java.lang.NullPointerException
	at org.apache.spark.sql.pulsar.PulsarMetadataReader.getPulsarSchema(PulsarMetadataReader.scala:170)
	at org.apache.spark.sql.pulsar.PulsarMetadataReader.getSchema(PulsarMetadataReader.scala:164)
	at org.apache.spark.sql.pulsar.PulsarMetadataReader.getAndCheckCompatible(PulsarMetadataReader.scala:148)
	at org.apache.spark.sql.pulsar.PulsarProvider.$anonfun$sourceSchema$2(PulsarProvider.scala:71)
	at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2622)
	at org.apache.spark.sql.pulsar.PulsarProvider.sourceSchema(PulsarProvider.scala:70)
	at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:236)
	at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:117)
	at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:117)
	at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:33)
	at org.apache.spark.sql.streaming.DataStreamReader.loadInternal(DataStreamReader.scala:219)
	at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:194)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


To Reproduce
Steps to reproduce the behavior:

  1. Run apache pulsar docker container (apachepulsar/pulsar:2.6.1)
  2. Use pulsar python client to send message to persistent topic.
  3. Start spark streaming app. I load pulsar connector library in spark app code.
  4. As soon as it tries to parse options and create data steam, I get error.

Expected behavior
Spark should be able to create read stream.

@nlu90 - for your kind attention.

[FEATURE] StartingOffsets doesn't work with BatchMessageId

Hi,

I'm trying to use startingOffsets on my topic, but messages id of my topic are instance of BatchMessageIdImpl (with batchIndex set).
I have tried with a topic where messages id are basic messages id (MessageIdImpl) and everything works well.
For example: I have 10 messages (sent with batching producer) in my topic. I set startingOffsets with this messageId : 39637:0:- 1:4
But Spark should shows next 6 messages but it shows nothing.

There is my producer code :
`

    public class Producer {

public static void main(String[] args) throws Exception {

	try (PulsarClient client = PulsarClient.builder()
			.serviceUrl("pulsar://localhost:6650")
			.build()) {

		Producer<String> producer = client
				.newProducer(Schema.STRING)
				.topic("topic")
                                    // if enableBatching is set to false, startingOffsets in application spark works well
				.enableBatching(true)
				.batchingMaxMessages(10)
				.blockIfQueueFull(false)
				.batchingMaxPublishDelay(100, TimeUnit.SECONDS)
				.create();

		CompletableFuture<MessageId> messageIdCompletableFuture = null;
		for (int i = 0; i < 10; i++) {
			messageIdCompletableFuture = producer.newMessage()
					.value(String.valueOf(i))
					.sendAsync();
		}

		messageIdCompletableFuture.get();
	}
}
    }`

And my SparkApplication :
`

    public class SparkFunctionBatch {

private static final String TOPIC = "persistent://public/default/topic2";

public static void main(String[] args) {
	SparkSession spark = SparkSession.builder().appName("AppName").master("local[2]").getOrCreate();
	spark.sparkContext().setLogLevel("WARN");

	spark.udf().register("convert", (UDF1<byte[], String>)
			(messageIdBytes) -> MessageId.fromByteArray(messageIdBytes).toString(), DataTypes.StringType);

	Dataset<Row> kdecoleAppLog = spark
			.read()
			.format("pulsar")
			.option("service.url", "pulsar://localhost:6650")
			.option("admin.url", "http://localhost:8080")
			.option("topic", TOPIC)
			.option("startingOffsets", getStartingOffsets())
			.load();

	kdecoleAppLog
			.withColumn("__messageIdToString", callUDF("convert", col("__messageId")))
			.show(99999, false);
}

private static String getStartingOffsets() {
	String startingOffsets = "earliest";

	boolean calculatingStartingOffset = false;
	if (calculatingStartingOffset) {
		MessageId messageId = new BatchMessageIdImpl(39637, 0, -1, 4);

		Map<String, MessageId> jmap = new HashMap<>();
		jmap.put(TOPIC, messageId);

		scala.collection.immutable.Map<String, MessageId> map = JavaConverters.mapAsScalaMapConverter(jmap).asScala().toMap(
				Predef.conforms()
		);

		startingOffsets = JsonUtils.topicOffsets(map);
	}
	return startingOffsets;
}
    }`

[FEATURE] Bump spark to 3.2.0 with custom monitor metrics support.

Is your feature request related to a problem? Please describe.
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]

Describe the solution you'd like
A clear and concise description of what you want to happen.

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.

[Bug] Same named durable subscription created when using custom subscription name via readerconfig

Describe the bug
Consider a scenario:

predefinedSubscription option is not provided.
pulsar.reader.subscriptionName option is provided

Additionally, assume #107 is fixed.

In this case, the metadatareader will use the subscription name from either readerConfig or create a subscription based on the logic here .

In case it reuses the subscription name from reader config, the code logic in PulsarProvider::createSource is as following:

  • Create a metadata reader
  • Setup cursor
    • Create a new durable subscription as provided in readerConfig
    • Get offset info and note down the latest offsets
  • Create PulsarSource object
    • Setup a reader using same subscription name as mentioned in readerConfig

This fails as during reader creation, it would observe that a durable subscription already exists (created by the metadatareader)

In case metadata reader uses a different logic to figure out the subscription name, then doing any sort of custom authorization logic is tricky.

** Proposal **

Use a different/new config to inpput a custom subscription name that would be used by the metada reader class only.

OR

use a non-durable subsription in metadata reader here

[BUG] Incorrect string deserialization in pulsar-spark_2.11-2.4.5

Description

Pulsar message read into a structured stream is incorrectly deserialized if a string field has length over 127

Environment

scala 2.11

spark 2.4.7

pulsar-spark_2.11 2.4.5

Protobuf message definition

message Test {
  required string query = 1;
}

Spark receiver

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object TestError {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[3]").setAppName("spark-stream-dev")
    conf.set("spark.sql.streaming.checkpointLocation", "./checkpoints")
    val spark = SparkSession.builder.config(conf).getOrCreate()
    import spark.implicits._

    val in = spark.readStream
      .format("pulsar")
      .option("service.url", "pulsar://pulsarHost:6650")
      .option("admin.url", "http://pulsarHost:8080")
      .option("topics", "persistent://sample/standalone/ns1/b")
      .load()

    val valueAsBytes = in.map { row =>
      val str = row.getAs[String]("value")
      val prettyBytes = str.getBytes().map(b => "%02X".format(b)) // pretty-print hex values
      prettyBytes.mkString(" ")
    }

    val out = valueAsBytes.writeStream
      .outputMode("append")
      .format("console")
      .start()

    val outCsv = valueAsBytes.writeStream
      .outputMode("append")
      .format("csv")
      .start("raw")


    out.awaitTermination()
    outCsv.awaitTermination()
  }
}

query field contains a string of length 127 (works correctly) or 128 (works incorrectly) letters.

Correct behaviour - message of length 127

Sent to pulsar:
0A 7F 61 61 ... 127 occurrences of letter 'a' ... 61 61

Received using bin/pulsar-client consume -n 0 --hex -s s1 persistent://sample/standalone/ns1/b:
0A 7F 61 61 ... 127 occurrences of letter 'a' ... 61 61

Received using spark-streaming (code above):
0A 7F 61 61 ... 127 occurrences of letter 'a' ... 61 61

Incorrect behaviour - message of length 128:

Sent to pulsar:
0A 80 01 61 61 ... 128 occurrences of letter 'a' ... 61 61

Received using bin/pulsar-client consume -n 0 --hex -s s1 persistent://sample/standalone/ns1/b:
0A 80 01 61 61 ... 128 occurrences of letter 'a' ... 61 61

Received using spark-streaming:
0A EF BF BD 01 61 61 ... 128 occurrences of letter 'a' ... 61 61

Why is the 80 01 sequence read as EF BF BF 01?

[BUG] Current Pulsar Client is not shutting down `EventLoopGroup` properly

Describe the bug

While we're testing our integration with Pulsar Spark Connector, we found out that while shutting down PulsarClientImpl doesn't shutdown EventLoopGroup properly (which is seems to be addressed in most recent Pulsar Client 2.10.1)

To Reproduce
Steps to reproduce the behavior:

  1. Create PulsarClient (version from "pulsar-spark-connector")
  2. Try shutting down Pulsar (everything will be wound down except eventLoopGroup, which will keep JVM from shutting down)

Expected behavior
JVM to be shutdown after PulsarClient is successfully closed.

Screenshots
NA

Additional context
NA

[BUG] java.lang.RuntimeException: Failed to create schema for topic

When I try to call dataset. Write(), output the data to pulsar, and throw the error that failed to create the schema

Environment:
Pulsar-2.4.1
Spark-2.4.4

java.lang.RuntimeException: Failed to create schema for persistent://public/default/als_statistics_tocheck
	at org.apache.spark.sql.pulsar.SchemaUtils$.uploadPulsarSchema(SchemaUtils.scala:104)
	at org.apache.spark.sql.pulsar.PulsarRowWriter.singleProducer$lzycompute(PulsarWriteTask.scala:140)
	at org.apache.spark.sql.pulsar.PulsarRowWriter.singleProducer(PulsarWriteTask.scala:138)
	at org.apache.spark.sql.pulsar.PulsarRowWriter.producerFlush(PulsarWriteTask.scala:210)
     RecordSchemaBuilder schemaBuilder = SchemaBuilder.record("topLevelRecord");
     schemaBuilder.field("ip").type(SchemaType.STRING);
     schemaBuilder.field("port").type(SchemaType.INT32);
     schemaBuilder.field("url_id").type(SchemaType.STRING);
     schemaBuilder.field("response_rate").type(SchemaType.DOUBLE);
     schemaBuilder.field("success_rate").type(SchemaType.DOUBLE);
     schemaBuilder.field("average_response_time").type(SchemaType.DOUBLE);
     schemaBuilder.field("average_network_time").type(SchemaType.DOUBLE);
     schemaBuilder.field("start_time").type(SchemaType.TIMESTAMP);
     schemaBuilder.field("end_time").type(SchemaType.TIMESTAMP);

     SchemaInfo  statistics2checkSchemaInfo = schemaBuilder.build(SchemaType.AVRO);

      waitToCheckDataSet.write()
                            //.mode("append")
                            .format("pulsar")
                            .option("service.url", serviceUrl)
                            .option("admin.url", adminUrl)
                            .option("topic", statistics2CheckTopic)
                            .option("pulsar.producer.sendTimeoutMs","60000")
                            //.option("avroSchema",statistics2checkSchemaInfo.getSchemaDefinition())
                            //.option("recordName","CheckDataSet")
                            //.option("recordNamespace","com.some.domain")
                            .save();

    try {
            admin.schemas().getSchemaInfo(statistics2CheckTopic);
        } catch (PulsarAdminException e) {
            if (404 == e.getStatusCode()) {
                admin.schemas().createSchema(statistics2CheckTopic,statistics2checkSchemaInfo);
            }
        }

        spark.readStream()
                .format("pulsar")
                .option("service.url", serviceUrl)
                .option("admin.url", adminUrl)
                .option("topic", statistics2CheckTopic)
                .option("startingOffsets", "earliest")
                .load()
                .withWatermark("__eventTime", "1 minute")
                .writeStream().queryName("WaitToCheckDataSet")
                .outputMode("append")
                .trigger(Trigger.ProcessingTime("1 minute"))
                .foreachBatch((dataset,batchId) -> {
                    System.out.println("------WaitToCheckDataSet-------");
                     dataset.show(false);
               }).start();

In addition,how can support options "avroSchema,recordName,recordNamespace"
The name of the schema has always been named "topLevelRecord".Except : com.some.domain.

[http://spark.apache.org/docs/latest/sql-data-sources-avro.html](url)
{
    "type": "record",
    "name": "topLevelRecord",
    "fields": [
      {
        "name": "ip",
        "type": [
          "string",
          "null"
        ]
      },
      {
        "name": "port",
        "type": [
          "int",
          "null"
        ]
      },
      {
        "name": "url_id",
        "type": "string"
      },
      {
        "name": "response_rate",
        "type": [
          "double",
          "null"
        ]
      },
      {
        "name": "success_rate",
        "type": [
          "double",
          "null"
        ]
      },
      {
        "name": "average_response_time",
        "type": "double"
      },
      {
        "name": "average_network_time",
        "type": "double"
      },
      {
        "name": "start_time",
        "type": [
          {
            "type": "long",
            "logicalType": "timestamp-micros"
          },
          "null"
        ]
      },
      {
        "name": "end_time",
        "type": [
          {
            "type": "long",
            "logicalType": "timestamp-micros"
          },
          "null"
        ]
      },
      {
        "name": "type",
        "type": "int"
      }
    ]
  }

Support SparkV3.0 and Pulsar 2.6 +

Is your feature request related to a problem? Please describe.
Current connector cannot support SparkV3.0 because of lots of changes brought by SparkV3.0;

Describe the solution you'd like
The connector can support Spark V3.0 and Pulsar 2.6+ version;

Describe alternatives you've considered

Additional context

[BUG] Spark StreamNative Driver should not require Admin URL

Describe the bug
Pulsar admins do not allow consumers/producers to access the admin URL. Consumers/producers using certificates for authentication/authorization. Teams using Spark driver cannot take advantage of Streamnative Spark Driver due to a lack of access to the admin URL.

To Reproduce
Steps to reproduce the behavior:

  1. Go to '...'
  2. Click on '....'
  3. Scroll down to '....'
  4. See error

Expected behavior
A clear and concise description of what you expected to happen.

Screenshots
If applicable, add screenshots to help explain your problem.

Additional context
Add any other context about the problem here.

[FEATURE] support pyspark

Is your feature request related to a problem? Please describe.
It seems that for now only java version is supported for spark. I'm using pyspark, and when I try 2 use standard pulsar client to send data to pulsar, it returns error saying PicklingError: Could not serialize object: RuntimeError: Pickling of "pulsar.Producer" instances is not enabled

Describe the solution you'd like
also support pyspark

Pulsar Spark cannot work with Spark 2.3. Build fails.

@sijie and @yjshen:

It is not currently possible to use pulsar spark module with Spark 2.3.x. Builds fails with the below trace.

[ERROR] [Error] /home/kabeer/code/apache/pulsar-spark/src/main/scala/org/apache/spark/sql/pulsar/JSONParser.scala:364: value encoding is not a member of org.apache.spark.sql.catalyst.json.JSONOptions
[ERROR] [Error] /home/kabeer/code/apache/pulsar-spark/src/main/scala/org/apache/spark/sql/pulsar/PulsarContinuousReader.scala:27: object JSONOptionsInRead is not a member of package org.apache.spark.sql.catalyst.json
[ERROR] [Error] /home/kabeer/code/apache/pulsar-spark/src/main/scala/org/apache/spark/sql/pulsar/PulsarContinuousReader.scala:29: object ContinuousInputPartition is not a member of package org.apache.spark.sql.sources.v2.reader
[ERROR] [Error] /home/kabeer/code/apache/pulsar-spark/src/main/scala/org/apache/spark/sql/pulsar/PulsarContinuousReader.scala:30: object ContinuousInputPartitionReader is not a member of package org.apache.spark.sql.sources.v2.reader.streaming
[ERROR] [Error] /home/kabeer/code/apache/pulsar-spark/src/main/scala/org/apache/spark/sql/pulsar/PulsarContinuousReader.scala:49: not found: type JSONOptionsInRead
[ERROR] [Error] /home/kabeer/code/apache/pulsar-spark/src/main/scala/org/apache/spark/sql/pulsar/PulsarContinuousReader.scala:79: not found: type InputPartition
[ERROR] [Error] /home/kabeer/code/apache/pulsar-spark/src/main/scala/org/apache/spark/sql/pulsar/PulsarContinuousReader.scala:108: not found: type InputPartition
[ERROR] [Error] /home/kabeer/code/apache/pulsar-spark/src/main/scala/org/apache/spark/sql/pulsar/PulsarContinuousReader.scala:149: not found: type ContinuousInputPartition
[ERROR] [Error] /home/kabeer/code/apache/pulsar-spark/src/main/scala/org/apache/spark/sql/pulsar/PulsarContinuousReader.scala:148: not found: type JSONOptionsInRead
[ERROR] [Error] /home/kabeer/code/apache/pulsar-spark/src/main/scala/org/apache/spark/sql/pulsar/PulsarContinuousReader.scala:156: not found: type InputPartitionReader
[ERROR] [Error] /home/kabeer/code/apache/pulsar-spark/src/main/scala/org/apache/spark/sql/pulsar/PulsarContinuousReader.scala:252: not found: type ContinuousInputPartitionReader
[ERROR] [Error] /home/kabeer/code/apache/pulsar-spark/src/main/scala/org/apache/spark/sql/pulsar/PulsarContinuousReader.scala:251: not found: type JSONOptionsInRead
[ERROR] [Error] /home/kabeer/code/apache/pulsar-spark/src/main/scala/org/apache/spark/sql/pulsar/PulsarContinuousReader.scala:174: not found: type InputPartitionReader
[ERROR] [Error] /home/kabeer/code/apache/pulsar-spark/src/main/scala/org/apache/spark/sql/pulsar/PulsarContinuousReader.scala:229: not found: type JSONOptionsInRead
[ERROR] [Error] /home/kabeer/code/apache/pulsar-spark/src/main/scala/org/apache/spark/sql/pulsar/PulsarDeserializer.scala:37: object JSONOptionsInRead is not a member of package org.apache.spark.sql.catalyst.json
[ERROR] [Error] /home/kabeer/code/apache/pulsar-spark/src/main/scala/org/apache/spark/sql/pulsar/PulsarDeserializer.scala:42: not found: type JSONOptionsInRead
[ERROR] [Error] /home/kabeer/code/apache/pulsar-spark/src/main/scala/org/apache/spark/sql/pulsar/PulsarMicroBatchReader.scala:29: object JSONOptionsInRead is not a member of package org.apache.spark.sql.catalyst.json
[ERROR] [Error] /home/kabeer/code/apache/pulsar-spark/src/main/scala/org/apache/spark/sql/pulsar/PulsarMicroBatchReader.scala:31: object InputPartition is not a member of package org.apache.spark.sql.sources.v2.reader
[ERROR] [Error] /home/kabeer/code/apache/pulsar-spark/src/main/scala/org/apache/spark/sql/pulsar/PulsarMicroBatchReader.scala:44: not found: type JSONOptionsInRead
[ERROR] [Error] /home/kabeer/code/apache/pulsar-spark/src/main/scala/org/apache/spark/sql/pulsar/PulsarMicroBatchReader.scala:96: not found: type InputPartition
[ERROR] [Error] /home/kabeer/code/apache/pulsar-spark/src/main/scala/org/apache/spark/sql/pulsar/PulsarMicroBatchReader.scala:150: not found: type InputPartition
[ERROR] [Error] /home/kabeer/code/apache/pulsar-spark/src/main/scala/org/apache/spark/sql/pulsar/PulsarMicroBatchReader.scala:169: not found: type InputPartition
[ERROR] [Error] /home/kabeer/code/apache/pulsar-spark/src/main/scala/org/apache/spark/sql/pulsar/PulsarMicroBatchReader.scala:168: not found: type JSONOptionsInRead
[ERROR] [Error] /home/kabeer/code/apache/pulsar-spark/src/main/scala/org/apache/spark/sql/pulsar/PulsarMicroBatchReader.scala:172: not found: type InputPartitionReader
[ERROR] [Error] /home/kabeer/code/apache/pulsar-spark/src/main/scala/org/apache/spark/sql/pulsar/PulsarMicroBatchReader.scala:193: not found: type InputPartitionReader
[ERROR] [Error] /home/kabeer/code/apache/pulsar-spark/src/main/scala/org/apache/spark/sql/pulsar/PulsarMicroBatchReader.scala:210: not found: type InputPartitionReader
[ERROR] [Error] /home/kabeer/code/apache/pulsar-spark/src/main/scala/org/apache/spark/sql/pulsar/PulsarMicroBatchReader.scala:209: not found: type JSONOptionsInRead
[ERROR] [Error] /home/kabeer/code/apache/pulsar-spark/src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala:27: object JSONOptionsInRead is not a member of package org.apache.spark.sql.catalyst.json
[ERROR] [Error] /home/kabeer/code/apache/pulsar-spark/src/main/scala/org/apache/spark/sql/pulsar/PulsarSource.scala:25: object JSONOptionsInRead is not a member of package org.apache.spark.sql.catalyst.json
[ERROR] [Error] /home/kabeer/code/apache/pulsar-spark/src/main/scala/org/apache/spark/sql/pulsar/PulsarSource.scala:39: not found: type JSONOptionsInRead
[ERROR] [Error] /home/kabeer/code/apache/pulsar-spark/src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala:615: not found: type JSONOptionsInRead
[ERROR] [Error] /home/kabeer/code/apache/pulsar-spark/src/main/scala/org/apache/spark/sql/pulsar/PulsarRelation.scala:21: object JSONOptionsInRead is not a member of package org.apache.spark.sql.catalyst.json
[ERROR] [Error] /home/kabeer/code/apache/pulsar-spark/src/main/scala/org/apache/spark/sql/pulsar/PulsarRelation.scala:40: not found: type JSONOptionsInRead
[ERROR] [Error] /home/kabeer/code/apache/pulsar-spark/src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala:617: not found: type JSONOptionsInRead
[ERROR] [Error] /home/kabeer/code/apache/pulsar-spark/src/main/scala/org/apache/spark/sql/pulsar/PulsarSourceRDD.scala:27: object JSONOptionsInRead is not a member of package org.apache.spark.sql.catalyst.json
[ERROR] [Error] /home/kabeer/code/apache/pulsar-spark/src/main/scala/org/apache/spark/sql/pulsar/PulsarSourceRDD.scala:200: not found: type JSONOptionsInRead
[ERROR] [Error] /home/kabeer/code/apache/pulsar-spark/src/main/scala/org/apache/spark/sql/pulsar/PulsarSourceRDD.scala:158: not found: type JSONOptionsInRead
[ERROR] [Error] /home/kabeer/code/apache/pulsar-spark/src/main/scala/org/apache/spark/sql/pulsar/PulsarSourceRDD.scala:43: not found: type JSONOptionsInRead
[ERROR] [Error] /home/kabeer/code/apache/pulsar-spark/src/main/scala/org/apache/spark/sql/pulsar/SchemaUtils.scala:36: value minBytesForPrecision is not a member of object org.apache.spark.sql.types.Decimal
[ERROR] [Error] /home/kabeer/code/apache/pulsar-spark/src/main/scala/org/apache/spark/sql/pulsar/SchemaUtils.scala:345: not found: value minBytesForPrecision
[ERROR] 40 errors found
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  23.981 s
[INFO] Finished at: 2019-10-30T11:55:38Z
[INFO] ------------------------------------------------------------------------

To Reproduce
Steps to reproduce the behavior:

  1. Go to Clone the pulsar spark project
  2. Change the spark.version in pom.xml to 2.3.4 (or any other valid spark 2.3.x release)
  3. Run mvn clean package -DskipTests

Expected behavior
Build should pass.

Additional context
There are quite a few changes in files between Spark 2.3 and Spark 2.4. One of them which is more pertinent is:

If you look into file for Spark 2.3 -> https://github.com/apache/spark/blob/branch-2.3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala, class JSONOptionsInRead is missing.
Whilst in Spark 2.4 -> https://github.com/apache/spark/blob/branch-2.4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala, class JSONOptionsInRead is present.

[BUG] (Spark 2.4.5) Pulsar receiver requires DataFrame creation before readStream/writeStream methods can be used

Describe the bug
The spark-submit of a spark job written with the connector fails if a DataFrame is not created prior to calling readStream and writeStream.

To Reproduce
Steps to reproduce the behavior:

  1. Write a pyspark program that streams data from pulsar using readStream and writes somewhere using writeStream, see example code below.
  2. Attempt to spark-submit using ./bin/spark-submit --master "local[*]" --packages io.streamnative.connectors:pulsar-spark-connector_${SCALA_VERSION}:${SPARK_VERSION} --repositories https://dl.bintray.com/streamnative/maven <your-script>
  3. If a createDataFrame using the SparkSession is not executed, the job will inevitably fail with error
py4j.protocol.Py4JJavaError: An error occurred while calling o34.load.
: java.util.NoSuchElementException: None.get
	at scala.None$.get(Option.scala:347)
	at scala.None$.get(Option.scala:345)
	at org.apache.spark.sql.pulsar.PulsarProvider$.org$apache$spark$sql$pulsar$PulsarProvider$$jsonOptions(PulsarProvider.scala:603)
	at org.apache.spark.sql.pulsar.PulsarProvider.createMicroBatchReader(PulsarProvider.scala:154)
	at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:182)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)

Expected behavior
The spark job does NOT need a DataFrame or other to be instantiated in order to run.

Example failing python code

import sys
from pyspark.sql import SparkSession

if __name__ == "__main__":
    if len(sys.argv) != 4:
        print("""
        Usage: TestPyspark.py <service_url> <admin_url> <topics>
        """, file=sys.stderr)
        sys.exit(-1)

    serviceUrl = sys.argv[1]
    adminUrl = sys.argv[2]
    topics = sys.argv[3]

    spark = SparkSession\
        .builder\
        .appName("StructuredPulsarWordCount")\
        .getOrCreate()

    # Create DataSet representing the stream of input lines from pulsar
    lines = spark\
        .readStream\
        .format("pulsar")\
        .option("service.url", serviceUrl)\
        .option("admin.url", adminUrl)\
        .option("topics", topics)\
        .load()\
        .selectExpr("CAST(value AS STRING)")

    query = lines\
        .writeStream\
        .outputMode('append')\
        .format('console')\
        .start()

    query.awaitTermination()

Example working python code

import sys
from pyspark.sql import SparkSession

if __name__ == "__main__":
    if len(sys.argv) != 4:
        print("""
        Usage: TestPyspark.py <service_url> <admin_url> <topics>
        """, file=sys.stderr)
        sys.exit(-1)
    serviceUrl = sys.argv[1]
    adminUrl = sys.argv[2]
    topics = sys.argv[3]

    spark = SparkSession\
        .builder\
        .appName("StructuredPulsarWordCount")\
        .getOrCreate()

    dt = spark.createDataFrame([(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0), (5, 12.0)], ("id", "v"))

    # Create DataSet representing the stream of input lines from pulsar
    lines = spark\
        .readStream\
        .format("pulsar")\
        .option("service.url", serviceUrl)\
        .option("admin.url", adminUrl)\
        .option("topics", topics)\
        .load()\
        .selectExpr("CAST(value AS STRING)")

    query = lines\
        .writeStream\
        .outputMode('append')\
        .format('console')\
        .start()

    query.awaitTermination()

Additional context
Using Spark 2.4.5
Using Python 3.7.3

[BUG]write data into pulsar, Literal must have a corresponding value to string, but class String found

Describe the bug
when write data in pulsar,,use the follow code:

def main(args: Array[String]): Unit = {

    val sparkSession = SparkSession.builder().appName("test-pulsar").master("local").getOrCreate()

    val startingOffsets = topicOffsets(Map("persistent://public/default/my-topic" -> MessageId.fromByteArray(Array(8,33,16,8))))

    import sparkSession.implicits._
    val ds = sparkSession.createDataset(1 to 10)
       .write
       .format("pulsar")
       .option("service.url", "pulsar://localhost:6650")
       .option("admin.url", "http://localhost:8080")
       .option("topic", "persistent://public/default/my-topic-2")
       .save()

    sparkSession.stop()
}

then, get a error

Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: Literal must have a corresponding value to string, but class String found.
	at scala.Predef$.require(Predef.scala:277)
	at org.apache.spark.sql.catalyst.expressions.Literal$.validateLiteralValue(literals.scala:219)
	at org.apache.spark.sql.catalyst.expressions.Literal.<init>(literals.scala:296)
	at org.apache.spark.sql.pulsar.PulsarSinks$.$anonfun$validateQuery$2(PulsarSinks.scala:89)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.sql.pulsar.PulsarSinks$.validateQuery(PulsarSinks.scala:83)
	at org.apache.spark.sql.pulsar.PulsarProvider.createRelation(PulsarProvider.scala:185)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
	at pulsar.exProduce$.main(exProduce.scala:26)
	at pulsar.exProduce.main(exProduce.scala)
21/08/13 15:41:12 INFO SparkContext: Invoking stop() from shutdown hook
21/08/13 15:41:12 INFO SparkUI: Stopped Spark web UI at http://172.18.21.74:4040
21/08/13 15:41:12 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!

To Reproduce
data must write into pulsar, and in topic can read the write data

Expected behavior

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.