cloudspannerecosystem / dynamodb-adapter Goto Github PK
View Code? Open in Web Editor NEWLicense: Apache License 2.0
License: Apache License 2.0
Is it intentional to read using a 10 second exact staleness? While this could lead to lower read latency, it will by definition read 10 seconds old data. The default would be to use a strong read which is guaranteed to return all changes that have been committed, which is also one of the USP's of Spanner compared to storage systems that use eventual consistency. Another option here could also be to use a max staleness of 10 seconds, instead of exact staleness, and let Spanner choose the optimal read timestamp.
Originally posted by @olavloite in #3 (comment)
This should strip away the length of the column for variable length columns (STRING and BYTES). That would make it possible to use columns with a different length than only STRING(MAX) and BYTES(MAX).
(And yes, I know it could be easily circumvented by adding the column to the dynamodb_adapter_table_ddl
as STRING(MAX)
, even if the column is for example STRING(100)
, but that will probably confuse users.)
Originally posted by @olavloite in #3 (comment)
In storage/spanner.go
queryHash is populated but doesn't appear to be used anywhere. Can this be removed.
This contains the baseline code to perform operations on Spanner with DynamoDB queries.
If we publish two stream objects to two different topics, they should be able to run in parallel, right? However, the current code only allows a single go routine to publish a message. We can change this to:
mux.Lock()
topic, ok := mClients[topicName]
if !ok {
topic = pubsubClient.c.
TopicInProject(topicName, config.ConfigurationMap.GOOGLE_PROJECT_ID)
mClients[topicName] = topic
}
mux.Unlock()
message := &pubsub.Message{}
message.Data, _ = json.Marshal(streamObj)
_, err := topic.Publish(context.Background(), message).Get(ctx)
if err != nil {
logger.LogError(err)
}
topic.Publish
is a thread-safe method: https://github.com/googleapis/google-cloud-go/blob/pubsub/v1.8.2/pubsub/topic.go#L434-L435 (t.scheduler
also has an internal sync.Mutex).
Originally posted by @hengfengli in #3 (comment)
Currently the adapter requires that a session get created for each operation that needs to be performed GetItem
, Query
, PutItem
, etc.
It would be better to use the X-Amz-Target
HTTP Header field in the /
endpoint of the adapter to determine the operation begin requested and route appropriately . For example X-Amz-Target: DynamoDB_20120810.Query
could be handled by /
and routed to the query handler in the adapter.
Originally posted by @bgood in #27 (comment)
To keep the markdown clean it would be good to add a markdown linter to the CircleCI process.
While running UpdateItem operation, UpdateExpression requires an operator like "set" as shown below
UpdateExpression: aws.String("SET number_of_items = :number_of_items").
With dynamo , both "set" and "SET" works and it should be the same with Spanner.
With spanner, update operation is ignored when we use "set" but succeeds when with use "SET"
func updateCustomerDetails(svc *dynamodb.DynamoDB) {
_, err := svc.UpdateItem(&dynamodb.UpdateItemInput{
TableName: aws.String("Customer_Order"),
Key: map[string]*dynamodb.AttributeValue{
"PK": {
S: aws.String("CUST#0000000000"),
},
"SK": {
S: aws.String("EMAIL#[email protected]"),
},
},
ExpressionAttributeValues: map[string]*dynamodb.AttributeValue{
":number_of_items": {
S: aws.String("6"),
},
},
ReturnValues: aws.String("UPDATED_NEW"),
UpdateExpression: aws.String("set number_of_items = :number_of_items"),
})
if err != nil {
fmt.Println(err.Error())
}
getCustomerContactDetails(svc,"CUST#0000000000","EMAIL#[email protected]")
}
Query operations failing with JSON serialization error.
SerializationError: failed decoding JSON RPC response
status code: 200, request id:
caused by: JSON value is not a list (map[string]interface {}{"L":[]interface {}{map[string]interface {}{"PK":map[string]interface {}{"S":"CUST#0000000000"}, "SK":map[string]interface {}{"S":"ORDER#ej68vuldzgps"}, "customer_id":map[string]interface {}{"S":"0000000000"}, "order_id":map[string]interface {}{"S":"ej68vuldzgps"}, "order_status":map[string]interface {}{"S":"PROCESSING"}, "order_ts":map[string]interface {}{"S":"2021-05-14T12:07:00.000000"}}}})
No record foundpanic: runtime error: index out of range [0] with length 0
goroutine 1 [running]:
main.getCustomerOrderDetails(0xc0000b2340)
projects/forks/dynamodb-adapter/examples/golang/main.go:115 +0x6b3
main.main()
projects/forks/dynamodb-adapter/examples/golang/main.go:52 +0x1c5
Originally posted by @bgood in #27 (comment)
To help new users adopt the adapter we should have an example Java application. It should include all supported operations.
Integration tests should be passing on the baseline branch when run through CircleCI.
The following is produced when trying to run integrations through CircleCI
#!/bin/bash -eo pipefail
cat <<EOF > $HOME/config-files/staging/config.json
{
"GoogleProjectID": "$SPANNER_PROJECT",
"SpannerDb": "dynamodb-adapter-int",
"QueryLimit": 5000
}
EOF
cat <<EOF > $HOME/config-files/staging/spanner.json
{
"dynamodb_adapter_table_ddl": "dynamodb-adapter",
"dynamodb_adapter_config_manager": "dynamodb-adapter",
"department": "dynamodb-adapter",
"employee": "dynamodb-adapter"
}
EOF
cat <<EOF > $HOME/config-files/staging/tables.json
{
"employee":{
"partitionKey":"emp_id",
"sortKey": "",
"attributeTypes": {
"emp_id": "N",
"first_name":"S",
"last_name":"S",
"address":"S",
"age":"N"
},
"indices": {}
},
"department":{
"partitionKey":"d_id",
"sortKey": "",
"attributeTypes": {
"d_id": "N",
"d_name":"S",
"d_specialization":"S"
},
"indices": {}
}
}
EOF
go run integrationtest/setup.go setup
go test integrationtest/api_test.go
go run integrationtest/setup.go teardown
/bin/bash: /home/circleci/config-files/staging/config.json: No such file or directory
Exited with code exit status 1
CircleCI received exit code 1
Using the adapter with the DynamoDB Java client library did not work for me. Using it with
DynamoDB Go client library did, but only if I set the entire endpoint including the operation that I want to execute manually when creating a session. That surprised me (see example below). I think this code base would benefit from a couple of simple examples on how it could be used. Considering the fact that it is intended as kind of a drop-in replacement of a DynamoDB backend, I would expect to be able to use it with a client library, and not have to create and submit the JSON messages manually.
My test client currently looks like this:
package main
import (
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
)
func main() {
// Create DynamoDB client for GetItem.
svc := dynamodb.New(createGetItemSession())
result, err := svc.GetItem(&dynamodb.GetItemInput{
TableName: aws.String("DynamoDbTest"),
Key: map[string]*dynamodb.AttributeValue{
"ColFloat64": {
N: aws.String("2.1440135556938245e-06"),
},
},
})
if err != nil {
fmt.Println(err.Error())
return
}
fmt.Printf("%s\n", result.String())
// Test PutItem.
type Rec struct {
ColFloat64 float64
ColInt64 int64
ColString string
ColBytes []byte
ColBool bool
}
item := Rec{
ColFloat64: 3.14,
ColInt64: 1000,
ColString: "test",
ColBytes: []byte{1,2,3},
ColBool: true,
}
av, err := dynamodbattribute.MarshalMap(item)
if err != nil {
fmt.Println("Got error marshalling new record:")
fmt.Println(err.Error())
return
}
input := &dynamodb.PutItemInput{
Item: av,
TableName: aws.String("DynamoDbTest"),
}
// Create DynamoDB client for PutItem.
svc = dynamodb.New(createPutItemSession())
_, err = svc.PutItem(input)
if err != nil {
fmt.Println("Got error calling PutItem:")
fmt.Println(err.Error())
return
}
}
var creds = credentials.NewStaticCredentials("secret", "even-more-secret", "")
var region = "eu-west-1"
func createSession(url string) *session.Session {
return session.Must(session.NewSessionWithOptions(session.Options{
Config: aws.Config{
Credentials: creds,
Region: ®ion,
Endpoint: aws.String(url),
},
}))
}
func createGetItemSession() *session.Session {
return createSession("http://localhost:9050/v1/GetItem")
}
func createPutItemSession() *session.Session {
return createSession("http://localhost:9050/v1/PutItem")
}
Carried over from PR #3 #3 (review)
To help new users adopt the adapter we should have an example Python application. It should include all supported operations.
There are instances were ReadWriteTransactions are being used, however, they could simply be mutations. This would save reading the row before updating.
#3 (comment)
#3 (comment)
#3 (comment)
#3 (comment)
#3 (comment)
#3 (comment)
#3 (comment)
This file could really use some test cases. This is where most of the important stuff is happening; The translation from DynamoDB to Spanner and back. Having test cases for it automatically also means that we have a sort of documentation of what is supported and what is not.
Originally posted by @olavloite in #3 (comment)
Would it be possible to somehow add a mapping somewhere, or other configuration option, to indicate whether the adapter should use INT64
or FLOAT64
here? The reason is that this prevents the usage of an INT64
column to be used as part of the primary key of a table, while that is a very common data type to use in the primary key.
Using FLOAT64
as part of a primary key is possible, but I would recommend against it if the actual number is (supposed to be) an integer, as there could be subtle rounding problems.
Consider for example the following statement:
SELECT CAST(9007199254740993 as FLOAT64);
This will not return 9007199254740993 but 9007199254740992 when you execute it in Cloud Spanner. (Note: This is not specific to Cloud Spanner, but a general limitation of floating point numbers.)
Another possibility could be to map numeric values in DynamoDB to the NUMERIC
data type in Cloud Spanner.
Originally posted by @olavloite in #3 (comment)
As far as I can tell, the current version only supports the following DynamoDB data types:
Could that be documented here?
Originally posted by @olavloite in #3 (comment)
This will currently only return data as type BOOL, S (String) and N (Number). Would it be an idea to:
Originally posted by @olavloite in #3 (comment)
Currently the project is using the base Golang linter golint
. An initial run of golangci-lint
found many more errors that could be fixed, creating a cleaner codebase.
The adapter supports only a subset of operations, they should be documented in the README.
The function changeTableNameForSP
appears in multiple places. Since it is the same everywhere it would ideal if it were moved to a utils or some other central location.
config/config.go
integrationtest/setup.go
service/services/services.go
storage/spanner.go
Why is this called parseRowForNull
and just parseRow
?
Originally posted by @olavloite in #3 (comment)
The example Golang application only has read operations, it should include all supported operations.
{
UnprocessedItems: {
}
}
hangs for a minute and then throws the following error. However, the operation succeeds and the entry is added to the table
SerializationError: failed decoding JSON RPC response
status code: 200, request id:
caused by: JSON value is not a structure ([]interface {}{})
func addNewCustomerBatch(svc *dynamodb.DynamoDB) {
fmt.Println("Running a BatchWriteItem operation to add customers")
result, err := svc.BatchWriteItem(&dynamodb.BatchWriteItemInput{
RequestItems: map[string][]*dynamodb.WriteRequest{
"Customer_Order": {
{
PutRequest: &dynamodb.PutRequest{
Item: map[string]*dynamodb.AttributeValue{
"PK": {
S: aws.String("CUST#0070070070"),
},
"SK": {
S: aws.String("EMAL#[email protected]"),
},
"customer_fname": {
S: aws.String("James"),
},
"customer_lname": {
S: aws.String("Bond"),
},
"customer_email": {
S: aws.String("[email protected]"),
},
"customer_id": {
S: aws.String("0070070070"),
},
"customer_addresses": {
S: aws.String("{Shipping: Casino Royal, Las Vegas, NY}"),
},
},
},
},
},
},
})
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case dynamodb.ErrCodeProvisionedThroughputExceededException:
fmt.Println("1")
fmt.Println(dynamodb.ErrCodeProvisionedThroughputExceededException, aerr.Error())
case dynamodb.ErrCodeResourceNotFoundException:
fmt.Println("2")
fmt.Println(dynamodb.ErrCodeResourceNotFoundException, aerr.Error())
case dynamodb.ErrCodeItemCollectionSizeLimitExceededException:
fmt.Println("3")
fmt.Println(dynamodb.ErrCodeItemCollectionSizeLimitExceededException, aerr.Error())
case dynamodb.ErrCodeRequestLimitExceeded:
fmt.Println("4")
fmt.Println(dynamodb.ErrCodeRequestLimitExceeded, aerr.Error())
case dynamodb.ErrCodeInternalServerError:
fmt.Println("5")
fmt.Println(dynamodb.ErrCodeInternalServerError, aerr.Error())
default:
fmt.Println("6")
fmt.Println(aerr.Error())
}
} else {
fmt.Println(err.Error())
}
return
}
fmt.Println(result)
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.