Code Monkey home page Code Monkey logo

spark-dynamodb's Introduction

Spark+DynamoDB

Plug-and-play implementation of an Apache Spark custom data source for AWS DynamoDB.

We published a small article about the project, check it out here: https://www.audienceproject.com/blog/tech/sparkdynamodb-using-aws-dynamodb-data-source-apache-spark/

News

  • 2021-01-28: Added option inferSchema=false which is useful when writing to a table with many columns
  • 2020-07-23: Releasing version 1.1.0 which supports Spark 3.0.0 and Scala 2.12. Future releases will no longer be compatible with Scala 2.11 and Spark 2.x.x.
  • 2020-04-28: Releasing version 1.0.4. Includes support for assuming AWS roles through custom STS endpoint (credits @jhulten).
  • 2020-04-09: We are releasing version 1.0.3 of the Spark+DynamoDB connector. Added option to delete records (thank you @rhelmstetter). Fixes (thank you @juanyunism for #46).
  • 2019-11-25: We are releasing version 1.0.0 of the Spark+DynamoDB connector, which is based on the Spark Data Source V2 API. Out-of-the-box throughput calculations, parallelism and partition planning should now be more reliable. We have also pulled out the external dependency on Guava, which was causing a lot of compatibility issues.

Features

  • Distributed, parallel scan with lazy evaluation
  • Throughput control by rate limiting on target fraction of provisioned table/index capacity
  • Schema discovery to suit your needs
    • Dynamic inference
    • Static analysis of case class
  • Column and filter pushdown
  • Global secondary index support
  • Write support

Getting The Dependency

The library is available from Maven Central. Add the dependency in SBT as "com.audienceproject" %% "spark-dynamodb" % "latest"

Spark is used in the library as a "provided" dependency, which means Spark has to be installed separately on the container where the application is running, such as is the case on AWS EMR.

Quick Start Guide

Scala

import com.audienceproject.spark.dynamodb.implicits._
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().getOrCreate()

// Load a DataFrame from a Dynamo table. Only incurs the cost of a single scan for schema inference.
val dynamoDf = spark.read.dynamodb("SomeTableName") // <-- DataFrame of Row objects with inferred schema.

// Scan the table for the first 100 items (the order is arbitrary) and print them.
dynamoDf.show(100)

// write to some other table overwriting existing item with same keys
dynamoDf.write.dynamodb("SomeOtherTable")

// Case class representing the items in our table.
import com.audienceproject.spark.dynamodb.attribute
case class Vegetable (name: String, color: String, @attribute("weight_kg") weightKg: Double)

// Load a Dataset[Vegetable]. Notice the @attribute annotation on the case class - we imagine the weight attribute is named with an underscore in DynamoDB.
import org.apache.spark.sql.functions._
import spark.implicits._
val vegetableDs = spark.read.dynamodbAs[Vegetable]("VegeTable")
val avgWeightByColor = vegetableDs.agg($"color", avg($"weightKg")) // The column is called 'weightKg' in the Dataset.

Python

# Load a DataFrame from a Dynamo table. Only incurs the cost of a single scan for schema inference.
dynamoDf = spark.read.option("tableName", "SomeTableName") \
                     .format("dynamodb") \
                     .load() # <-- DataFrame of Row objects with inferred schema.

# Scan the table for the first 100 items (the order is arbitrary) and print them.
dynamoDf.show(100)

# write to some other table overwriting existing item with same keys
dynamoDf.write.option("tableName", "SomeOtherTable") \
              .format("dynamodb") \
              .save()

Note: When running from pyspark shell, you can add the library as:

pyspark --packages com.audienceproject:spark-dynamodb_<spark-scala-version>:<version>

Parameters

The following parameters can be set as options on the Spark reader and writer object before loading/saving.

  • region sets the region where the dynamodb table. Default is environment specific.
  • roleArn sets an IAM role to assume. This allows for access to a DynamoDB in a different account than the Spark cluster. Defaults to the standard role configuration.

The following parameters can be set as options on the Spark reader object before loading.

  • readPartitions number of partitions to split the initial RDD when loading the data into Spark. Defaults to the size of the DynamoDB table divided into chunks of maxPartitionBytes
  • maxPartitionBytes the maximum size of a single input partition. Default 128 MB
  • defaultParallelism the number of input partitions that can be read from DynamoDB simultaneously. Defaults to sparkContext.defaultParallelism
  • targetCapacity fraction of provisioned read capacity on the table (or index) to consume for reading. Default 1 (i.e. 100% capacity).
  • stronglyConsistentReads whether or not to use strongly consistent reads. Default false.
  • bytesPerRCU number of bytes that can be read per second with a single Read Capacity Unit. Default 4000 (4 KB). This value is multiplied by two when stronglyConsistentReads=false
  • filterPushdown whether or not to use filter pushdown to DynamoDB on scan requests. Default true.
  • throughput the desired read throughput to use. It overwrites any calculation used by the package. It is intended to be used with tables that are on-demand. Defaults to 100 for on-demand.

The following parameters can be set as options on the Spark writer object before saving.

  • writeBatchSize number of items to send per call to DynamoDB BatchWriteItem. Default 25.
  • targetCapacity fraction of provisioned write capacity on the table to consume for writing or updating. Default 1 (i.e. 100% capacity).
  • update if true items will be written using UpdateItem on keys rather than BatchWriteItem. Default false.
  • throughput the desired write throughput to use. It overwrites any calculation used by the package. It is intended to be used with tables that are on-demand. Defaults to 100 for on-demand.
  • inferSchema if false will not automatically infer schema - this is useful when writing to a table with many columns

System Properties

The following Java system properties are available for configuration.

  • aws.profile IAM profile to use for default credentials provider.
  • aws.dynamodb.region region in which to access the AWS APIs.
  • aws.dynamodb.endpoint endpoint to use for accessing the DynamoDB API.
  • aws.sts.endpoint endpoint to use for accessing the STS API when assuming the role indicated by the roleArn parameter.

Acknowledgements

Usage of parallel scan and rate limiter inspired by work in https://github.com/traviscrawford/spark-dynamodb

spark-dynamodb's People

Contributors

anish749 avatar cosmincatalin avatar fogrid avatar htorrence avatar jacobfi avatar jhulten avatar johsbk avatar kdallmeyer-sr avatar lepirlouit avatar mh2 avatar neuw84 avatar rhelmstetter avatar tmwong2003 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

spark-dynamodb's Issues

Job fails on Scaling Throttle from DDB

Running a job against an on-demand table with a GSI fails with this error:

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1192 in stage 1.0 failed 4 times, most recent failure: Lost task 1192.3 in stage 1.0 (TID 3214, ip-10-0-165-179.us-west-2.compute.internal, executor 176): com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException: Throughput exceeds the current capacity for one or more global secondary indexes. DynamoDB is automatically scaling your index so please try again shortly. (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ThrottlingException;

Seems like this shouldn't fail the job but cause it to retry.

Infer schema in Python

I don't see an option to provide schema in pyspark, while same option is in scala. Please let me know how to provide a class while reading data using pyspark

Please support spark 3.0

When I try to write to dynamo from a different source other than dynamo on spark 3.0, I get the following error:

DefaultSource does not allow create table as select

Providing a region and a roleArn still uses global STS endpoint

When you pass a roleArn and a region, a AWSSecurityTokenServiceClient is built that still uses the global STS endpoint. This does not allow use in an isolated VPC outside of us-east-1.

val stsClient = AWSSecurityTokenServiceClientBuilder
.standard()
.withCredentials(new DefaultAWSCredentialsProviderChain)
.withRegion(chosenRegion)
.build()

Dynamo DB Connector configuration

Where do we specify DynamoDB Connector Details. Like AWS Credentials and ARN name and other details..
val dynamoDf = spark.read.dynamodb("SomeTableName")

19/09/18 12:07:06 INFO DefaultSource: Using Guava version 16.0.1 Exception in thread "main" java.lang.NoClassDefFoundError: com/amazonaws/services/dynamodbv2/AmazonDynamoDBClientBuilder at com.audienceproject.spark.dynamodb.connector.DynamoConnector$$anonfun$getDynamoDBClient$2.apply(DynamoConnector.scala:49) at com.audienceproject.spark.dynamodb.connector.DynamoConnector$$anonfun$getDynamoDBClient$2.apply(DynamoConnector.scala:52) at scala.Option.getOrElse(Option.scala:121)

DynamoDB error: batch write request items is empty

Hi folks, I'm trying to do a fairly simple write to DynamoDB but it seems the batchWrite request contains no items. Here's the error from the live AWS API:

Caused by: com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException: 1 validation error detected: Value '{my-table=[]}' at 'requestItems' failed to satisfy constraint: Map value must satisfy constraint: [Member must have length less than or equal to 25, Member must have length greater than or equal to 1] (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ValidationException; Request ID: NP921O0BVIO5OEMBTANMMJ07Q3VV4KQNSO5AEMVJF66Q9ASUAAJG)

Here's my code:

val countsDS = countsStep.run(communitiesDS)
val artists = countsDS
  .as[FeedTopItem]
  .filter(item => item._type == "artistCounts")
  .map(item => ArtistCount(s"${item.feed}:${item.key}", dateWithHour, item.value))
  .as[ArtistCount]
artists.show()
+--------------------+------------+-----+
|      feedWithArtist|dateWithHour|count|
+--------------------+------------+-----+
| x:Shanti Celeste|  2019120818|    1|
|         x:Khotin|  2019120818|    1|
|      x:Chillinit|  2019120818|    1|
|       x:Good Gas|  2019120818|    1|
|      y:Yo Trane|  2019120818|    1|
|          y:dvsn|  2019120818|    1|
|          y:Belly|  2019120818|    1|
|           y:NAV|  2019120818|    1|
+--------------------+------------+-----+
artists.write.dynamodb("mtrAggregations")

I'm producing a regular Dataset[MyCaseClass] and saving it, nothing unusual. I've also tried

artists.toDF("feedWithArtist", "dateWithHour", "count").write.dynamodb("...")

but that also doesn't work.

I'm running Spark 2.4.4, Scala 2.11, and "com.audienceproject" %% "spark-dynamodb" % "1.0.0"

How do I serialize a dynamoDB column of string set datatype?

Hi guys, thanks for creating this project, it has been of great help to me and I have enjoyed using it so far.

I have a column in my table that is of string set datatype, and it is currently being inferred as an Array[String] which gets persisted as list of string when being written back to dynamoDB. I have tried coercing it toSet[String] but it is still being written back to dynamoDB as list of string. What datatype should I coerce it to in order to write the column as a string set?

Expected

  "names": {
    "SS": [
      "dummy-name"
    ]
  }

Actual

  "names": {
    "L": [
      {
        "S": "dummy-name"
      },
    ]
  }

EMR: java.lang.NoClassDefFoundError: scala/runtime/java8/JFunction0$mcI$sp

Hi All,
I'm a bit new to this topic so apologies if this cased by something i did not do right:

EMR Cluster Spark 2.4.4
Downloaded and bootstrapped latest jar from maven, i'm using jupyterlab with Livy.

The issue comes with both scala and pyspark code.
Could you please advise/resolve this?

Thanks in advance.

Code:
`import com.audienceproject.spark.dynamodb.implicits._
import org.apache.spark.sql.SparkSession

//val spark = SparkSession.builder().getOrCreate()

// Load a DataFrame from a Dynamo table. Only incurs the cost of a single scan for schema inference.
val dynamoDf = spark.read.dynamodb("company_data_test") // <-- DataFrame of Row objects with inferred schema.

// Scan the table for the first 100 items (the order is arbitrary) and print them.
dynamoDf.show(100)Error:java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: scala/runtime/java8/JFunction0$mcI$sp
at com.audienceproject.spark.dynamodb.datasource.DefaultSource.createReader(DefaultSource.scala:47)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$SourceHelpers.createReader(DataSourceV2Relation.scala:161)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$.create(DataSourceV2Relation.scala:178)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:204)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
at com.audienceproject.spark.dynamodb.implicits$DynamoDBDataFrameReader.dynamodb(implicits.scala:37)
... 56 elided
Caused by: java.lang.NoClassDefFoundError: scala/runtime/java8/JFunction0$mcI$sp
... 62 more
Caused by: java.lang.ClassNotFoundException: scala.runtime.java8.JFunction0$mcI$sp
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
... 62 more`

Code:
`dynamoDf = spark.read.option("tableName", "company_data_test")
.format("dynamodb")
.load() # <-- DataFrame of Row objects with inferred schema.

Scan the table for the first 100 items (the order is arbitrary) and print them.
dynamoDf.show(100)
`

Error:
`An error occurred while calling o88.load.
: java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: scala/runtime/java8/JFunction0$mcI$sp
at com.audienceproject.spark.dynamodb.datasource.DefaultSource.createReader(DefaultSource.scala:47)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$SourceHelpers.createReader(DataSourceV2Relation.scala:161)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$.create(DataSourceV2Relation.scala:178)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:204)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoClassDefFoundError: scala/runtime/java8/JFunction0$mcI$sp
... 16 more
Caused by: java.lang.ClassNotFoundException: scala.runtime.java8.JFunction0$mcI$sp
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
... 16 more

Traceback (most recent call last):
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 172, in load
return self._df(self._jreader.load())
File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in call
answer, self.gateway_client, self.target_id, self.name)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o88.load.
: java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: scala/runtime/java8/JFunction0$mcI$sp
at com.audienceproject.spark.dynamodb.datasource.DefaultSource.createReader(DefaultSource.scala:47)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$SourceHelpers.createReader(DataSourceV2Relation.scala:161)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$.create(DataSourceV2Relation.scala:178)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:204)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoClassDefFoundError: scala/runtime/java8/JFunction0$mcI$sp
... 16 more
Caused by: java.lang.ClassNotFoundException: scala.runtime.java8.JFunction0$mcI$sp
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
... 16 more
`

Validation of a Write

Hey,
Is there some way to know if some of the items weren't being written? for example because of throttling.
If not, it might be a good enhancement idea.

Can't change the region

Hi!
I might be missing something but it seems to me that it's not possible(?) to use the library in regions other than us-east-1, at least in linux environments.
aws.dynamodb.region is not a valid identifier for an environment variable, at least in bash. Simple fix could be just changing that to AWS_DYNAMODB_REGION?
This the line I'm referring to:

val chosenRegion = region.getOrElse(sys.env.getOrElse("aws.dynamodb.region", "us-east-1"))

Stop using guava

or update to a newer version, there are too many dependency issues

targetCapacity when executors != readPartitions

If readPartitions is set, the targetCapacity seems to be based solely on that and not actual number of executors. Is this correct? If so it's worth noting that targetCapacity may not be accurate if you set readPartitions and use a different number of executor cores.

Adding support for Binary Colums

Hello,

Currently the library does not support binary types. I have been trying for a while to add them but without too much luck.

My TypeConversion object:

/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
  * regarding copyright ownership.  The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
  *
  * Copyright © 2018 AudienceProject. All rights reserved.
  */
package com.audienceproject.spark.dynamodb.rdd

import com.amazonaws.services.dynamodbv2.document.Item
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

import scala.collection.JavaConverters._

private[dynamodb] object TypeConversion {

    def apply(attrName: String, sparkType: DataType): Item => Any =

        sparkType match {
            case BooleanType => nullableGet(_.getBOOL)(attrName)
            case StringType => nullableGet(_.getString)(attrName)
            case IntegerType => nullableGet(_.getInt)(attrName)
            case LongType => nullableGet(_.getLong)(attrName)
            case DoubleType => nullableGet(_.getDouble)(attrName)
            case FloatType => nullableGet(_.getFloat)(attrName)
            case DecimalType() => nullableGet(_.getNumber)(attrName)
            case BinaryType => nullableGet(_.getBinary)(attrName)
            case ArrayType(innerType, _) =>
                nullableGet(_.getList)(attrName).andThen(extractArray(convertValue(innerType)))
            case MapType(keyType, valueType, _) =>
                if (keyType != StringType) throw new IllegalArgumentException(s"Invalid Map key type '${keyType.typeName}'. DynamoDB only supports String as Map key type.")
                nullableGet(_.getRawMap)(attrName).andThen(extractMap(convertValue(valueType)))
            case StructType(fields) =>
                val nestedConversions = fields.collect({ case StructField(name, dataType, _, _) => name -> convertValue(dataType) })
                nullableGet(_.getRawMap)(attrName).andThen(extractStruct(nestedConversions))
            case _ => throw new IllegalArgumentException(s"Spark DataType '${sparkType.typeName}' could not be mapped to a corresponding DynamoDB data type.")
        }

    private def convertValue(sparkType: DataType): Any => Any =

        sparkType match {
            case IntegerType => nullableConvert(_.intValue())
            case LongType => nullableConvert(_.longValue())
            case DoubleType => nullableConvert(_.doubleValue())
            case FloatType => nullableConvert(_.floatValue())
            case DecimalType() => nullableConvert(identity)
            case BinaryType => {
                case byteArray : Array[Byte] => byteArray
                case _ => null
            }
            case ArrayType(innerType, _) => extractArray(convertValue(innerType))
            case MapType(keyType, valueType, _) =>
                if (keyType != StringType) throw new IllegalArgumentException(s"Invalid Map key type '${keyType.typeName}'. DynamoDB only supports String as Map key type.")
                extractMap(convertValue(valueType))
            case StructType(fields) =>
                val nestedConversions = fields.collect({ case StructField(name, dataType, _, _) => name -> convertValue(dataType) })
                extractStruct(nestedConversions)
            case BooleanType => {
                case boolean: Boolean => boolean
                case _ => null
            }
            case StringType => {
                case string: String => string
                case _ => null
            }
            case _ => throw new IllegalArgumentException(s"Spark DataType '${sparkType.typeName}' could not be mapped to a corresponding DynamoDB data type.")
        }

    private def nullableGet(getter: Item => String => Any)(attrName: String): Item => Any = {
        case item if item.hasAttribute(attrName) => try getter(item)(attrName) catch {
            case _: NumberFormatException => null
        }
        case _ => null
    }

    private def nullableConvert(converter: java.math.BigDecimal => Any): Any => Any = {
        case item: java.math.BigDecimal => converter(item)
        case _ => null
    }

    private def extractArray(converter: Any => Any): Any => Any = {
        case list: java.util.List[_] => list.asScala.map(converter)
        case set: java.util.Set[_] => set.asScala.map(converter).toSeq
        case _ => null
    }

    private def extractMap(converter: Any => Any): Any => Any = {
        case map: java.util.Map[_, _] => map.asScala.mapValues(converter)
        case _ => null
    }

    private def extractStruct(conversions: Seq[(String, Any => Any)]): Any => Any = {
        case map: java.util.Map[_, _] => Row.fromSeq(conversions.map({
            case (name, conv) => conv(map.get(name))
        }))
        case _ => null
    }

}

However with this approach I get an AmazonException indicating this suggesting that Spark is trying to convert the Binary column to String.

Caused by: com.amazonaws.services.dynamodbv2.document.IncompatibleTypeException: Cannot convert class [B into a string

Any hints?

Bests,

Write binary column types as output

I see this error when trying to write out a binary column type.

19/11/26 19:19:48 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 2, 192.168.208.164, executor 1): scala.MatchError: BinaryType (of class org.apache.spark.sql.types.BinaryType$)
	at com.audienceproject.spark.dynamodb.connector.TableConnector.$anonfun$updateItems$5(TableConnector.scala:166)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at com.audienceproject.spark.dynamodb.connector.TableConnector.$anonfun$updateItems$4(TableConnector.scala:164)
	at com.audienceproject.spark.dynamodb.connector.TableConnector.$anonfun$updateItems$4$adapted(TableConnector.scala:161)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at com.audienceproject.spark.dynamodb.connector.TableConnector.updateItems(TableConnector.scala:161)
	at com.audienceproject.spark.dynamodb.rdd.DynamoWriteRelation.$anonfun$update$1(DynamoWriteRelation.scala:51)
	at com.audienceproject.spark.dynamodb.rdd.DynamoWriteRelation.$anonfun$update$1$adapted(DynamoWriteRelation.scala:51)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:935)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:935)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

java.lang.OutOfMemoryError when inserting dataframe to dynamo db

Hello,

I am using spark-dynamodb to insert a dataframe into a dynamodb table. The lib is working fine on small dataframes, but when I try to insert a huge dataframe (65 millions row), I hit the following exception:
java.lang.OutOfMemoryError at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877) at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) at org.apache.spark.SparkContext.clean(SparkContext.scala:2287) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:841) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:840) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:840) at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:389) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116) at org.apache.spark.sql.execution.exchange.ShuffleExchange.prepareShuffleDependency(ShuffleExchange.scala:88) at org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:124) at org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:115) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52) at org.apache.spark.sql.execution.exchange.ShuffleExchange.doExecute(ShuffleExchange.scala:115) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116) at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:252) at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:141) at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:386) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116) at org.apache.spark.sql.execution.exchange.ShuffleExchange.prepareShuffleDependency(ShuffleExchange.scala:88) at org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:124) at org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:115) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52) at org.apache.spark.sql.execution.exchange.ShuffleExchange.doExecute(ShuffleExchange.scala:115) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116) at org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:95) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92) at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:2581) at org.apache.spark.sql.Dataset.rdd(Dataset.scala:2578) at com.audienceproject.spark.dynamodb.rdd.DynamoWriteRelation.<init>(DynamoWriteRelation.scala:36) at com.audienceproject.spark.dynamodb.DefaultSource.createRelation(DefaultSource.scala:57) at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:472) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:610) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233) at com.audienceproject.spark.dynamodb.implicits$DynamoDBDataFrameWriter.dynamodb(implicits.scala:73) at io.voodoo.ads.data.DataIndexer$.insertIndexes(DataIndexer.scala:74) at io.voodoo.ads.data.DataIndexer$.indexData(DataIndexer.scala:23) at io.voodoo.ads.Application$.main(Application.scala:27) at io.voodoo.ads.Application.main(Application.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:635)

I am running this code on EMR with a total cluster memory of 2TB, my executors and the driver have about 200GB of memory each. Do you have any idea why I am hitting a memory issue even though my configuration has enough memory?

Thanks

Capacity calculations are made eagerly

The connector computes the capacity to use for writing eagerly, not at the time when data starts being written. This can be problematic in two cases:

  • If the capacity is modified in the interval between when the Spark computation started and when the actual writing to DynamoDB starts.
  • If the Spark cluster is auto-scaling, meaning that more workers are added after the Spark execution starts. All of the new workers will write to DynamoDB using the capacity calculations made with a different number of executors.

Very bad read performance for pretty small DynamoDB table

I want to read a very small DynamoDB table (about 6.500 elements, 200 KB in total) into my Spark Structured Streaming job every micro-batch. I use Pyspark with Spark version 2.4.4, spark-dynamodb version 1.0.4. The DynamoDB table has a provisioned read capacity of 5.

My code looks as follows:

spark.read.format("dynamodb") \
  .option("region", "eu-central-1") \
  .option("tableName", my_table) \
  .load() \
  .filter(<some-easy-filtering-on-one-boolean-column>) \
  .select(<some-column-selects>)

I faced a very slow read performance, where it takes multiple seconds, up to few minutes, to read those few elements from Dynamo:
grafik

I also noticed that only small portion of the provisioned read capacity is used for every read:
grafik

It seems to be random how many read capacity is used. Sometimes, there is used even less. But anyway, even with a read capacity of 1 or so, it should be much faster to read ~6.500 elements from a very small DynamoDB table.

I also tried some configurations like:

  • .option("filterPushdown", False)
  • .option("readPartitions", <1, 2, 4, 8, ...>)
  • .option("targetCapacity", 1)

with no effect at all. I noticed that with readPartitions i. e. of 8, it's a little bit faster (about 20 seconds), but not fast enough for my understanding. And I think that such a small amount of elements should be readable with one partition in a feasible amount of time.

Any ideas, what I'm doing wrong? Any advice on that? Thank's in advance!

Question on throughput parameter

So far this code is working great for me , thank you very much.

I am setting throughput as 25000 , but getting throttled error, and Dynamodb is hovering around 40000 WCU in metrics tab and sometimes it gets to 55000 WCU. Is there any other parameter that throughput depends on?

final.repartition(300).write.option("tableName", table).option("writeBatchSize",25).option("throughput",25000)

com.amazonaws.services.dynamodbv2.model.RequestLimitExceededException: Throughput exceeds the current throughput limit for your account. Please contact AWS Support at https://aws.amazon.com/support request a limit increase (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: RequestLimitExceeded; Request ID: L5VFK8QJA1E2J5GG58S26K1VGRVV4KQNSO5AEMVJF66Q9ASUAAJG)

java.lang.NoSuchMethodError

I'm trying to get data from S3 to DynamoDB but encounter this exception when I submit a Spark job to a cluster (Spark 2.4.4 without Hadoop + Hadoop 3.2.1).

This is the stack trace:

Exception in thread "main" java.lang.NoSuchMethodError: com.amazonaws.transform.JsonUnmarshallerContext.getCurrentToken()Lcom/amazonaws/thirdparty/jackson/core/JsonToken;
at com.amazonaws.services.dynamodbv2.model.transform.DescribeTableResultJsonUnmarshaller.unmarshall(DescribeTableResultJsonUnmarshaller.java:39)
at com.amazonaws.services.dynamodbv2.model.transform.DescribeTableResultJsonUnmarshaller.unmarshall(DescribeTableResultJsonUnmarshaller.java:29)
at com.amazonaws.http.JsonResponseHandler.handle(JsonResponseHandler.java:118)
at com.amazonaws.http.JsonResponseHandler.handle(JsonResponseHandler.java:43)
at com.amazonaws.http.response.AwsResponseHandlerAdapter.handle(AwsResponseHandlerAdapter.java:69)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleResponse(AmazonHttpClient.java:1627)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1336)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512)
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.doInvoke(AmazonDynamoDBClient.java:3443)
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.invoke(AmazonDynamoDBClient.java:3419)
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.executeDescribeTable(AmazonDynamoDBClient.java:1660)
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.describeTable(AmazonDynamoDBClient.java:1635)
at com.amazonaws.services.dynamodbv2.document.Table.describe(Table.java:137)
at com.audienceproject.spark.dynamodb.connector.TableConnector.(TableConnector.scala:47)
at com.audienceproject.spark.dynamodb.datasource.DynamoDataSourceWriter.(DynamoDataSourceWriter.scala:32)
at com.audienceproject.spark.dynamodb.datasource.DefaultSource.createWriter(DefaultSource.scala:57)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:255)
at example.S3$.main(S3.scala:102)
at example.S3.main(S3.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

I tried various things with excluding and shadowing different libraries. But nothing works.

My build.sbt:

ThisBuild / scalaVersion     := "2.11.8"
ThisBuild / version          := "0.1.0-SNAPSHOT"
ThisBuild / organization     := "com.example"
ThisBuild / organizationName := "example"

lazy val root = (project in file("."))
  .settings(
    name := "ec",
    libraryDependencies ++= Seq(
      "org.apache.spark" %% "spark-sql" % "2.4.4",  
      "com.amazonaws" % "aws-java-sdk-s3" % "1.11.678",
      "org.apache.hadoop" % "hadoop-common" % "3.2.1",
      "com.amazonaws" % "aws-java-sdk-dynamodb" % "1.11.678",
      "com.audienceproject" %% "spark-dynamodb" % "1.0.2"
    )
  )

Could you help please?

Error while connecting to Dynamodb

Hi
We are getting below error , when we are tring to read from dynamo db, using below configuration

  1. Scala version : 2.11.12
  2. Spark version : 2.4.4
  3. Hadoop version : 2.8.5
  4. com.audienceproject :spark-dynamodb_2.12 , vesrion : 1.0.1

//Code snippet
val conf = new SparkConf()
.set("spark.default.parallelism", "4")

val sparkSession = SparkSession
  .builder()
  .master("yarn")
  .appName("ApplicationName")
  .config(conf)
  .getOrCreate()


val par = sparkSession.sparkContext.defaultParallelism
println("Parallelism " + par)

import com.audienceproject.spark.dynamodb.implicits._

val df = sparkSession.read.dynamodb("Table_Name") // Failing at this read 

//

java.lang.NoSuchMethodError: scala.Some.value()Ljava/lang/Object;
at com.audienceproject.spark.dynamodb.datasource.DefaultSource.getDefaultParallelism(DefaultSource.scala:65)
at com.audienceproject.spark.dynamodb.datasource.DefaultSource.$anonfun$createReader$4(DefaultSource.scala:47)
at scala.runtime.java8.JFunction0$mcI$sp.apply(JFunction0$mcI$sp.java:23)
at scala.Option.getOrElse(Option.scala:121)
at com.audienceproject.spark.dynamodb.datasource.DefaultSource.createReader(DefaultSource.scala:47)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$SourceHelpers.createReader(DataSourceV2Relation.scala:161)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$.create(DataSourceV2Relation.scala:178)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:204)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
at com.audienceproject.spark.dynamodb.implicits$DynamoDBDataFrameReader.dynamodb(implicits.scala:37)

	Please suggest.
	
	Thanks

Issues with Guava on EMR?

I'm getting java.lang.NoSuchMethodError: com.google.common.util.concurrent.RateLimiter.acquire(I)D errors
when running .write.dynamodb() on EMR. Running locally I'm not having any trouble.
I presume that is coming from Guava library.
I noticed this issue has been escalated and closed in the past, but I'm having issues with it still.
I've tried different versions (0.4.1 and 0.3.6) as well as different emr releases (5.22.0 and 5.20.0) the result is always the same. I don't think I'm pulling in Guava from anywhere else as by now I've only left the "com.audienceproject" %% "spark-dynamodb" % "0.4.1" in my dependencies.
May I ask what is the target/intended EMR image version for the latest release of the lib?

Error When Running on an EMR Cluster

Hello, I'm trying to run a pyspark script which writes to dynamo (using this connector) in an EMR cluster. My spark submit command is as follow:
spark-submit --packages com.audienceproject:spark-dynamodb_2.11:1.0.0 s3://snaptravel-data/pyspark_scripts/process_rapid_data.py

However I get the following error:

Traceback (most recent call last): File "/mnt/tmp/spark-7f98fbab-0dcf-4eb2-b220-0a83903bcb4f/process_rapid_data.py", line 102, in <module> .format("dynamodb") \ File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 736, in save File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__ File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o90.save. : org.apache.spark.SparkException: Writing job aborted. at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:92) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:260) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 4, ip-172-31-27-184.ec2.internal, executor 1): **java.io.InvalidClassException: com.audienceproject.spark.dynamodb.datasource.DynamoWriterFactory; local class incompatible: stream classdesc serialVersionUID = 5047852721666334302, local class serialVersionUID = -29859842707073509** at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1884) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1750) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2041) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

The actual error seems to be:

java.io.InvalidClassException: com.audienceproject.spark.dynamodb.datasource.DynamoWriterFactory; local class incompatible: stream classdesc serialVersionUID = 5047852721666334302, local class serialVersionUID = -29859842707073509

I am currently running on emr version 5.27.0.

Could you please let me know the correct EMR image to be used with the latest version of this connector?

Performance issue writing in DynamoDB

Hi I have some performance issue writing in an "on demand" scaling DynamoDB Table with audience connector.

With a cluster with 1 m5.xlarge master and 2 m5.xlarge core nodes I am stuck with 300 WCU on dynamo which is pretty slow.

I tried a lot of different Spark configuration but it doesn't change anything. The only way to improve my write capacity is to increase my instance type to m5.4xlarge for example..

I am running a simple ETL Spark job reading in S3, a bit of processing and writing data in DynamoDB.
I m using the 0.4.0 version for audience connector and emr-5.20.0 for EMR version.

Many thanks for your help

Filter Pushdown

I can't get filter pushdown work.
The table SSEmployeeCountUnion has about 20 million rows. CompanyId is the partition key and RecordHash is the sort key. When I run

  val unionDataset = spark.read.dynamodbAs[CountRecordEx]("SSEmployeeCountUnion")
  unionDataset.filter($"CompanyId" === 209963).foreach(x => println(x))

I expect the result which is 19 records returned immediately but it looks like the full scan is being performed (it runs for a few minutes until I abort the job).

I also tried setting filterPushdown explicitely but it also doesn't work:

  val unionDataset = spark.read.option("filterPushdown", "true").dynamodbAs[CountRecordEx]("SSEmployeeCountUnion")
  unionDataset.filter($"CompanyId" === 209963).foreach(x => println(x))

Could you help?

Add git tags

Hi all,

First of all, thanks for this project! It made my life really easy when exporting data from DynamoDB.

I just would like to know if you mind adding git tags to the repository every time you release a new version. This would be really helpful to navigate on the source and correlate possible issues with the version we have running in production.

sbt package fails with unresolved dependency: com.jsuereth#sbt-pgp;1.1.1

Running sbt package results in:

[warn] ::::::::::::::::::::::::::::::::::::::::::::::
[warn] :: UNRESOLVED DEPENDENCIES ::
[warn] ::::::::::::::::::::::::::::::::::::::::::::::
[warn] :: com.jsuereth#sbt-pgp;1.1.1: not found
[warn] :: com.typesafe.sbteclipse#sbteclipse-plugin;5.2.4: not found
[warn] :: com.dwijnand#sbt-compat;1.2.6: not found
[warn] ::::::::::::::::::::::::::::::::::::::::::::::

How can this be compiled/built?

unresolved dependency error

Spawned EMR with Spark: Spark 2.4.4 on Hadoop 2.8.5, clone the repo and packaged sbt.

[hadoop@ip-xxxxxx spark-dynamodb]$ pyspark --packages com.audienceproject:spark-dynamodb_2.11.12:1.0.3
Python 2.7.16 (default, Oct 14 2019, 21:26:56) 
[GCC 4.8.5 20150623 (Red Hat 4.8.5-28)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Ivy Default Cache set to: /home/hadoop/.ivy2/cache
The jars for the packages stored in: /home/hadoop/.ivy2/jars
:: loading settings :: url = jar:file:/usr/lib/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
com.audienceproject#spark-dynamodb_2.11.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-6748f4f1-9ccf-4ef2-9556-6e70ac4121b1;1.0
	confs: [default]
:: resolution report :: resolve 780ms :: artifacts dl 0ms
	:: modules in use:
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   0   |   0   |   0   ||   0   |   0   |
	---------------------------------------------------------------------
:: problems summary ::
:::: WARNINGS
		module not found: com.audienceproject#spark-dynamodb_2.11.12;1.0.3
	==== local-m2-cache: tried
	  file:/home/hadoop/.m2/repository/com/audienceproject/spark-dynamodb_2.11.12/1.0.3/spark-dynamodb_2.11.12-1.0.3.pom
	  -- artifact com.audienceproject#spark-dynamodb_2.11.12;1.0.3!spark-dynamodb_2.11.12.jar:
	  file:/home/hadoop/.m2/repository/com/audienceproject/spark-dynamodb_2.11.12/1.0.3/spark-dynamodb_2.11.12-1.0.3.jar
	==== local-ivy-cache: tried
	  /home/hadoop/.ivy2/local/com.audienceproject/spark-dynamodb_2.11.12/1.0.3/ivys/ivy.xml
	  -- artifact com.audienceproject#spark-dynamodb_2.11.12;1.0.3!spark-dynamodb_2.11.12.jar:
	  /home/hadoop/.ivy2/local/com.audienceproject/spark-dynamodb_2.11.12/1.0.3/jars/spark-dynamodb_2.11.12.jar
	==== central: tried
	  https://repo1.maven.org/maven2/com/audienceproject/spark-dynamodb_2.11.12/1.0.3/spark-dynamodb_2.11.12-1.0.3.pom
	  -- artifact com.audienceproject#spark-dynamodb_2.11.12;1.0.3!spark-dynamodb_2.11.12.jar:
	  https://repo1.maven.org/maven2/com/audienceproject/spark-dynamodb_2.11.12/1.0.3/spark-dynamodb_2.11.12-1.0.3.jar
	==== spark-packages: tried
	  https://dl.bintray.com/spark-packages/maven/com/audienceproject/spark-dynamodb_2.11.12/1.0.3/spark-dynamodb_2.11.12-1.0.3.pom
	  -- artifact com.audienceproject#spark-dynamodb_2.11.12;1.0.3!spark-dynamodb_2.11.12.jar:
	  https://dl.bintray.com/spark-packages/maven/com/audienceproject/spark-dynamodb_2.11.12/1.0.3/spark-dynamodb_2.11.12-1.0.3.jar
		::::::::::::::::::::::::::::::::::::::::::::::
		::          UNRESOLVED DEPENDENCIES         ::
		::::::::::::::::::::::::::::::::::::::::::::::
		:: com.audienceproject#spark-dynamodb_2.11.12;1.0.3: not found
		:::::::::::::::::::::::::::::::::::::::::::::
:: USE VERBOSE OR DEBUG MESSAGE LEVEL FOR MORE DETAILS
Exception in thread "main" java.lang.RuntimeException: [unresolved dependency: com.audienceproject#spark-dynamodb_2.11.12;1.0.3: not found]
	at org.apache.spark.deploy.SparkSubmitUtils$.resolveMavenCoordinates(SparkSubmit.scala:1310)
	at org.apache.spark.deploy.DependencyUtils$.resolveMavenDependencies(DependencyUtils.scala:54)
	at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:304)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:782)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/shell.py", line 38, in <module>
    SparkContext._ensure_initialized()
  File "/usr/lib/spark/python/pyspark/context.py", line 324, in _ensure_initialized
    SparkContext._gateway = gateway or launch_gateway(conf)
  File "/usr/lib/spark/python/pyspark/java_gateway.py", line 46, in launch_gateway
    return _launch_gateway(conf)
  File "/usr/lib/spark/python/pyspark/java_gateway.py", line 108, in _launch_gateway
    raise Exception("Java gateway process exited before sending its port number")
Exception: Java gateway process exited before sending its port number

Input type class org.apache.spark.sql.types.Decimal is not currently supported

Hi,

Is Decimal type not supported. Do I have to convert it explicitly?
I am getting the below error while trying to load a table having Decimal type into dynamodb,
Caused by: java.lang.UnsupportedOperationException: Input type class org.apache.spark.sql.types.Decimal is not currently supported
at com.amazonaws.services.dynamodbv2.document.Item.with(Item.java:1081)
at com.audienceproject.spark.dynamodb.connector.TableConnector$$anonfun$putItems$1$$anonfun$apply$7.apply(TableConnector.scala:134)
at com.audienceproject.spark.dynamodb.connector.TableConnector$$anonfun$putItems$1$$anonfun$apply$7.apply(TableConnector.scala:132)
at scala.collection.immutable.List.foreach(List.scala:392)
at com.audienceproject.spark.dynamodb.connector.TableConnector$$anonfun$putItems$1.apply(TableConnector.scala:132)
at com.audienceproject.spark.dynamodb.connector.TableConnector$$anonfun$putItems$1.apply(TableConnector.scala:118)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at com.audienceproject.spark.dynamodb.connector.TableConnector.putItems(TableConnector.scala:118)
at com.audienceproject.spark.dynamodb.datasource.DynamoBatchWriter.flush(DynamoBatchWriter.scala:56)
at com.audienceproject.spark.dynamodb.datasource.DynamoBatchWriter.commit(DynamoBatchWriter.scala:48)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:127)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Implement exponential backoff for the retry logic

I noticed that when the actual through put exceeds DynamoDB's provision throughput and requests start failing, DynamoDB starts receiving a bunch of throttling request from the library.

I dig a little bit into the writer's code and notice that currently the logic is to retry immediately: https://github.com/audienceproject/spark-dynamodb/blob/master/src/main/scala/com/audienceproject/spark/dynamodb/connector/TableConnector.scala#L242

Would it be possible to implement exponential backoff as suggested by AWS's DynamoDB API?
https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchWriteItem.html

If DynamoDB returns any unprocessed items, you should retry the batch operation on those items. However, we strongly recommend that you use an exponential backoff algorithm. If you retry the batch operation immediately, the underlying read or write requests can still fail due to throttling on the individual tables. If you delay the batch operation using exponential backoff, the individual requests in the batch are much more likely to succeed.

Understanding throughput and rate limiting

When writing and using the throughput parameter, how does the throughput value get implemented?
For example, when setting it to 100, is the limit 100 writes per second or maximum of 100 concurrent writes? Also as putBatchItem is used by default, is it 100 putBatchItem calls or 100 records?

IllegalArgumentException: rate must be positive

Issue on table in billing mode PAY_PER_REQUEST (on demand)

Rate cannot be computed because there is no Provisioned Throughput, no Read/Write capacity units.

// Rate limit calculation.
val tableSize = desc.getTableSizeBytes
val avgItemSize = tableSize.toDouble / desc.getItemCount
val readCapacity = desc.getProvisionedThroughput.getReadCapacityUnits * targetCapacity
val writeCapacity = desc.getProvisionedThroughput.getWriteCapacityUnits * targetCapacity

Requests will not be throttled in that case.

No data retrieved from table

I am having some issues getting data from a DynamoDB table. There are no errors, but when I print the schema, the only result is root. Doing a show is similar, with

++
||
++
++

On a whim, I tried retrieving another of our tables. Everything was perfect, so I tried all 9 of our organization's tables. Two displayed the symptoms detailed above (no rows, no columns) while the rest had no problems. I have tried to compare and contrast the tables, but I see no patterns.

At this point, I'm not certain how to debug this issue. What suggestions do you have?

WCUs constantly dropping after a while

I am writing some data from an s3 file to dynamoDB and I believe the write limit should not change after the spark job makes the connection and starts writing to DynamoDB. But in my case the write limit starts dropping after a certain time. I have tried running the job multiple times and notice the same behavior.

Here is the cloudwatch metric for the WCU in the table:
image

I have tried setting a constant throughput using the "throughput" parameter and also tried reducing the number of worker nodes on my spark cluster to just two. I still see the same behavior in both the cases. Does the write throughput dynamically change during the write?

java.lang.NoClassDefFoundError: com/audienceproject/spark/dynamodb/implicits$

(Spark Streaming newbie here, sorry in advance if this is something obvious, or not directly caused by spark-dynamodb)

I'm trying to write to DynamoDB from DataStreamWriter.foreachBatch, which fails with
the exception in the title (full stack trace below). The spark code seems fine, because the console output is correct.

Running job on macos locally, using spark-submit --class <class> --master local --deploy-mode client <jar> <args>

source code (shortened for brevity):

...
import com.audienceproject.spark.dynamodb.implicits._
...

object g1q1 {

  def main(args: Array[String]): Unit = {
    ...
    val sparkSession = SparkSession.builder
      .appName("t2g1q1")
      .getOrCreate()

    sparkSession.conf.set("spark.sql.streaming.checkpointLocation", data_tmp)

    val reader = sparkSession.readStream
      ...
      .csv(data_in)

    val writer = reader
      ...
      .writeStream
      .format("console")
      .outputMode(OutputMode.Complete())

    val status = writer.start()

    writer.foreachBatch({ (batchDF: DataFrame, batchId: Long) =>
      batchDF.write.dynamodb(dynamo_table)
    }).start()

    status.awaitTermination()
  }
}

build.sbt:

name := "flightdata_spark"

version := "0.1"

scalaVersion := "2.11.12"

val sparkVersion = "2.4.0"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-streaming" % sparkVersion,
  "org.apache.spark" %% "spark-sql" % sparkVersion,
  "com.audienceproject" %% "spark-dynamodb" % "0.4.1"
)
$ spark-submit --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/

Using Scala version 2.11.12, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_202

Full stack trace:

2019-04-04 19:22:36 ERROR MicroBatchExecution:91 - Query [id = 5a71e6ca-1448-4169-b3d1-b4b6ef91211f, runId = 086a09bd-8f78-4564-97d2-1ff604401ef8] terminated with error
java.lang.NoClassDefFoundError: com/audienceproject/spark/dynamodb/implicits$
        at g1q1$$anonfun$main$1.apply(g1q1.scala:43)
        at g1q1$$anonfun$main$1.apply(g1q1.scala:42)
        at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:534)
        at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:532)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:531)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
        at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: java.lang.ClassNotFoundException: com.audienceproject.spark.dynamodb.implicits$
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 21 more
Exception in thread "stream execution thread for [id = 5a71e6ca-1448-4169-b3d1-b4b6ef91211f, runId = 086a09bd-8f78-4564-97d2-1ff604401ef8]" java.lang.NoClassDefFoundError: com/audienceproject/spark/dynamodb/implicits$
        at g1q1$$anonfun$main$1.apply(g1q1.scala:43)
        at g1q1$$anonfun$main$1.apply(g1q1.scala:42)
        at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:534)
        at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:532)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:531)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
        at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: java.lang.ClassNotFoundException: com.audienceproject.spark.dynamodb.implicits$
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 21 more

Thanks so much in advance to the people looking at this!

java.lang.IllegalArgumentException: Spark DataType 'decimal(19,0)' could not be mapped to a corresponding DynamoDB data type.

I gt this error :

java.lang.IllegalArgumentException: Spark DataType 'decimal(19,0)' could not be mapped to a corresponding DynamoDB data type. at com.audienceproject.spark.dynamodb.rdd.TypeConversion$.apply(TypeConversion.scala:48) at com.audienceproject.spark.dynamodb.rdd.ScanPartition$$anonfun$com$audienceproject$spark$dynamodb$rdd$ScanPartition$$typeConversions$1.applyOrElse(ScanPartition.scala:42) at com.audienceproject.spark.dynamodb.rdd.ScanPartition$$anonfun$com$audienceproject$spark$dynamodb$rdd$ScanPartition$$typeConversions$1.applyOrElse(ScanPartition.scala:41)

Couldn't say which data I have in my table that causes this issue.

NPE when upgrading Spark-DynamoDB to 1.0.0

Hi we are using your project (many thanks by the way) and we wanted to upgrade the connector with the 1.0.0 version.
Because of Guava we weren't able to test the connector locally (conflicts between our guava and yours). So 1.0.0 is a great news for us.
Sadly when I update the version, the code is not working anymore , I got an NPE.

java.lang.NullPointerException: null at org.apache.spark.sql.catalyst.InternalRow.getString(InternalRow.scala:34) at com.audienceproject.spark.dynamodb.catalyst.JavaConverter$.convertRowValue(JavaConverter.scala:39) at com.audienceproject.spark.dynamodb.catalyst.JavaConverter$$anonfun$5.apply(JavaConverter.scala:71) at com.audienceproject.spark.dynamodb.catalyst.JavaConverter$$anonfun$5.apply(JavaConverter.scala:70) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.Range.foreach(Range.scala:160) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at com.audienceproject.spark.dynamodb.catalyst.JavaConverter$.convertStruct(JavaConverter.scala:70) at com.audienceproject.spark.dynamodb.catalyst.JavaConverter$.convertRowValue(JavaConverter.scala:38) at com.audienceproject.spark.dynamodb.connector.TableConnector$$anonfun$putItems$1$$anonfun$apply$7.apply(TableConnector.scala:134) at com.audienceproject.spark.dynamodb.connector.TableConnector$$anonfun$putItems$1$$anonfun$apply$7.apply(TableConnector.scala:132) at scala.collection.immutable.List.foreach(List.scala:392)

The data and the code are exactly the same the only thing which has changed is the connector version.
Our data are json based, some fields could be missing but we replace any null column with a null object. For example

name info
foo {"age":18, "address" : "..."}
bar null

Becames

name info
foo {"age":18, "address" : "..."}
bar {"age": null, "address" : null}

If you have any clue about how can we make the 1.0.0 working for our case, it would be great :)

java.lang.NoClassDefFoundError: org/spark_project/guava/util/concurrent/RateLimiter

I'm using spark-dynamodb_2.11:0.2.2 from https://oss.sonatype.org
I got no RateLimiter class error when I write to a dynamo table as below,

newItemsDs.write.dynamodb("table-name")

I use the jar from https://mvnrepository.com/artifact/com.google.guava/guava/26.0-jre as I cannot find org.spark_project.guava used in the table connector.

TableConnector.scala:30:import org.spark_project.guava.util.concurrent.RateLimiter

On demand capacity

When reading/writing to dynamo tables with on-demand capacity mode, the connector uses the default value (100L) instead of unlimited value. In order to support the large scale we are using, we had to change the table configuration to a high provisioned capacity in order to acheive better performance than maximum 100 reads on average. On the long term we would like to return to on demand usage to support unlimited capacity.

PySpark example is missing ".save()" at the end

This is a very small adjustment to readme. Instead of:

dynamoDf.write.format("com.audienceproject.spark.dynamodb") \
              .option("tableName", "SomeOtherTable")

there should be

dynamoDf.write.format("com.audienceproject.spark.dynamodb") \
              .option("tableName", "SomeOtherTable") \
              .save()

Otherwise nothing is written to DynamoDB.

issues infering schema

I have an issue with schema inferring 👍
A. scenario :

  1. create dynamo db table with primary key "alpha" of type String
  2. add some items with alpha values : 1177410, 1177411, 1177412, 1177413

B. do some extract with the lib.
Will give you this as résult :

++
||
++
||
||
||
||
||
||
++

add item with alpha value of 1177414

and you will get as result

+--------+
|   alpha|
+--------+
| 1177414|
| 1177410|
|10000000|
| 1000000|
| 1177413|
| 1177412|
| 1177411|
+--------+

Could someone explain me this ?

Write nested Object in DynamoDB

When I try to write a complex dataset (based on a case class for the data model) in dynamoDB with some nested sub object like (just an example) :
{
"HashKey": 1,
"MyObject": [
{
"ProductId":100,
"Quantity": 50
}
]
}
I received this error message from dynamo : "AmazonDynamoDBException: One or more parameter values were invalid: An AttributeValue may not contain an empty string" which is dumb because there is obviously no empty string.

Is it a bug ? Or does the connector only manage flat object ?

java.math.BigDecimal cannot be cast to java.util.Map when infering schema

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 11, ip-10-44-5-206.ec2.internal, executor 2): java.lang.ClassCastException: java.math.BigDecimal cannot be cast to java.util.Map
	at com.amazonaws.services.dynamodbv2.document.Item.getRawMap(Item.java:891)
	at com.audienceproject.spark.dynamodb.datasource.TypeConversion$$anonfun$apply$21$$anonfun$apply$22.apply(TypeConversion.scala:51)
	at com.audienceproject.spark.dynamodb.datasource.TypeConversion$$anonfun$apply$21$$anonfun$apply$22.apply(TypeConversion.scala:51)
	at com.audienceproject.spark.dynamodb.datasource.TypeConversion$$anonfun$nullableGet$1.apply(TypeConversion.scala:88)
	at com.audienceproject.spark.dynamodb.datasource.TypeConversion$$anonfun$nullableGet$1.apply(TypeConversion.scala:87)
	at scala.Function1$$anonfun$andThen$1.apply(Function1.scala:52)
	at com.audienceproject.spark.dynamodb.datasource.ScanPartition$$anonfun$com$audienceproject$spark$dynamodb$datasource$ScanPartition$$itemToRow$1.apply(ScanPartition.scala:100)
	at com.audienceproject.spark.dynamodb.datasource.ScanPartition$$anonfun$com$audienceproject$spark$dynamodb$datasource$ScanPartition$$itemToRow$1.apply(ScanPartition.scala:100)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.immutable.List.map(List.scala:296)
	at com.audienceproject.spark.dynamodb.datasource.ScanPartition.com$audienceproject$spark$dynamodb$datasource$ScanPartition$$itemToRow(ScanPartition.scala:100)
	at com.audienceproject.spark.dynamodb.datasource.ScanPartition$PartitionReader$$anonfun$nextPage$2.apply(ScanPartition.scala:94)
	at com.audienceproject.spark.dynamodb.datasource.ScanPartition$PartitionReader$$anonfun$nextPage$2.apply(ScanPartition.scala:94)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at com.audienceproject.spark.dynamodb.datasource.ScanPartition$PartitionReader.get(ScanPartition.scala:82)
	at com.audienceproject.spark.dynamodb.datasource.ScanPartition$PartitionReader.get(ScanPartition.scala:59)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.next(DataSourceRDD.scala:59)
	at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:291)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:283)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2041)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2029)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2028)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2028)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:966)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2262)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2211)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2200)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:401)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3389)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
  at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
  at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:84)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:165)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
  at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:751)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:710)
  ... 54 elided
Caused by: java.lang.ClassCastException: java.math.BigDecimal cannot be cast to java.util.Map
  at com.amazonaws.services.dynamodbv2.document.Item.getRawMap(Item.java:891)
  at com.audienceproject.spark.dynamodb.datasource.TypeConversion$$anonfun$apply$21$$anonfun$apply$22.apply(TypeConversion.scala:51)
  at com.audienceproject.spark.dynamodb.datasource.TypeConversion$$anonfun$apply$21$$anonfun$apply$22.apply(TypeConversion.scala:51)
  at com.audienceproject.spark.dynamodb.datasource.TypeConversion$$anonfun$nullableGet$1.apply(TypeConversion.scala:88)
  at com.audienceproject.spark.dynamodb.datasource.TypeConversion$$anonfun$nullableGet$1.apply(TypeConversion.scala:87)
  at scala.Function1$$anonfun$andThen$1.apply(Function1.scala:52)
  at com.audienceproject.spark.dynamodb.datasource.ScanPartition$$anonfun$com$audienceproject$spark$dynamodb$datasource$ScanPartition$$itemToRow$1.apply(ScanPartition.scala:100)
  at com.audienceproject.spark.dynamodb.datasource.ScanPartition$$anonfun$com$audienceproject$spark$dynamodb$datasource$ScanPartition$$itemToRow$1.apply(ScanPartition.scala:100)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.immutable.List.map(List.scala:296)
  at com.audienceproject.spark.dynamodb.datasource.ScanPartition.com$audienceproject$spark$dynamodb$datasource$ScanPartition$$itemToRow(ScanPartition.scala:100)
  at com.audienceproject.spark.dynamodb.datasource.ScanPartition$PartitionReader$$anonfun$nextPage$2.apply(ScanPartition.scala:94)
  at com.audienceproject.spark.dynamodb.datasource.ScanPartition$PartitionReader$$anonfun$nextPage$2.apply(ScanPartition.scala:94)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
  at com.audienceproject.spark.dynamodb.datasource.ScanPartition$PartitionReader.get(ScanPartition.scala:82)
  at com.audienceproject.spark.dynamodb.datasource.ScanPartition$PartitionReader.get(ScanPartition.scala:59)
  at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.next(DataSourceRDD.scala:59)
  at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:291)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:283)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
  at org.apache.spark.scheduler.Task.run(Task.scala:123)
  at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
  ... 3 more

V0.4.0 tanks write performance

Issue: Upgrading to v0.4.0 seems to kill any previous parallelism benefits - only really writing ~25 records at a time.

image

The above screenshot (from DynamoDB console) illustrates. There's a slight bump in the consumed write capacity right before 16:00 - this is using v0.4.0. The spike a few minutes after that is the exact same job, using v0.3.6.

Expected behavior: Parallelism on writes should be same as V0.3.6.

Schema inference on tables with too many attributes

if (typeSeq.size > 100) throw new RuntimeException("Schema inference not possible, too many attributes in table.")

The number here is pretty arbitrary, and it's not due to a limitation with Spark. The real issue is that if there are too many columns in the schema, the column pushdown to DynamoDB fails due to the projection expression exceeding the maximum length in the DynamoDB API.

A better solution would be preferable.

Exceeding throughput when table has GSIs

I have a table with two GSIs. The table and the GSIs all have a read and write throughput of 20,000. When not specifying the throughput, writing to the table results in the following error:

Caused by: com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException: The level of configured provisioned throughput for one or more global secondary indexes of the table was exceeded. Consider increasing your provisioning level for the under-provisioned global secondary indexes with the UpdateTable API

java.lang.ClassNotFoundException: Failed to find data source: dynamodb. Please find packages at http://spark.apache.org/third-party-projects.html

I am getting following error

Spark version is
Welcome to
____ __
/ / ___ / /
\ / _ / _ `/ __/ '/
/
/ .__/_,// //_\ version 2.4.2
/
/

Using Scala version 2.11.12, OpenJDK 64-Bit Server VM, 1.8.0_252

**spark-submit --packages com.audienceproject:spark-dynamodb_2.11:0.4.1 p.py

Error is following -- any help much appreciated

An error occurred while calling o89.save.
: java.lang.ClassNotFoundException: Failed to find data source: dynamodb. Please find packages at http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:657)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:245)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: dynamodb.DefaultSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
at scala.util.Try.orElse(Try.scala:84)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:634)
... 12 more

Query Functionality

In the article published about the project, it was said that "future improvements could see the query operation be given a part to play as well".

Is adding a query functionality to this project on the roadmap for the near future?
Thanks

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.