import boto3
from decimal import Decimal
def main():
# 1 - Create Client
ddb = boto3.resource('dynamodb',
endpoint_url='http://localhost:8000',
region_name="dummy",
aws_access_key_id="dummy",
aws_secret_access_key='dummy')
# 2 - Create the Table
ddb.create_table(TableName="mytest",
# 定义字段
AttributeDefinitions=[
{
'AttributeName': 'TransactionId',
'AttributeType': 'S'
},
],
KeySchema=[
{
'AttributeName': 'TransactionId',
'KeyType': 'HASH'
}
],
ProvisionedThroughput={
'ReadCapacityUnits': 10,
'WriteCapacityUnits': 10
})
print('Successfully created Table')
table = ddb.Table('mytest')
input = {
'TransactionId': '9a0',
'number_col': 213,
}
# 3 - Insert Data
table.put_item(Item=input)
print('Successfully put item')
# 4 - Scan Table
scanResponse = table.scan(TableName='mytest')
items = scanResponse['Items']
for item in items:
print(item)
main()
# current configuration version, do not modify.
# 当前配置文件的版本号,请不要修改该值。
conf.version = 3
# id
id = nimo-shake
# log file name,all log will be printed in stdout if log.file is empty
# 日志文件,不配置将打印到stdout (e.g. dynamo-shake.log )
log.file =
# log level: "none", "error", "warn", "info", "debug". default is "info".
log.level = info
# log buffer,enabling it to make every log print but performance maybe decrease.
# log buffer选项,不启用将会降低性能但保证退出时每条log都被打印,否则,退出时可能有最后几条log丢失
log.buffer = false
# pprof port. not used currently.
system_profile = 9330
# restful port, not used currently. 查看metric
# 全量和增量的restful监控端口,可以用curl查看内部监控metric统计情况。详见wiki。
full_sync.http_port = 9341
incr_sync.http_port = 9340
# sync mode. currently, only support "full".
# all: full sync and increase sync.
# full: full sync only.
# incr: increase sync only.
# 同步的类型。
# all: 全量+增量同步
# full: 全量同步
# incr: 增量同步
sync_mode = full
# dynamodb configuration. leave empty if not set.
# 源端dynamodb的账号信息配置,source.session_token和source.region没有可以留空
source.access_key_id = dummy
source.secret_access_key = dummy
source.session_token =
source.region = dummy
# 源端如果是endpoint类型,可以配置该参数,启用该参数则上述source参数失效
# 例如:http://100.123.124.125:1010
source.endpoint_url = http://192.168.23.113:8000
# source.endpoint_url =
# max_retries in session once failed
source.session.max_retries = 3
# session timeout, 0 means disable. unit: ms.
source.session.timeout = 3000
# filter collection split by semicolon(;). at most one of these two parameters can be given.
# if the filter.collection.black is not empty, the given collection will be filtered while others collection passed.
# if the filter.collection.white is not empty, the given collection will be passed while others collection filtered.
# all the namespace will be passed if no condition given.
# E.g., "filter.collection.white = c1;c2" means only c1 and c2 passed while the others filtered.
# 表粒度黑白名单过滤,白名单表示通过,黑名单表示过滤,这两个参数不能同时指定,都指定表示全部通过。分号分隔不同表。
# 举例:"filter.collection.white = c1;c2"表示c1和c2表通过,剩下的都被过滤掉。
filter.collection.white =
filter.collection.black =
# qps limit for each table.
# 对表级别限速
# the scan call(Scan) per second.
# 全量阶段,我们调用的是scan命令,这个参数表示一秒钟最多调用的scan个数
qps.full = 1000
# the limit batch number in one query. default is 128.
# 1次query内部的条数大小
qps.full.batch_num = 128
# the query call(GetRecords) per second.
# 增量阶段,我们调用的是GetRecords命令,这个参数表示一秒钟最多调用的GetRecords个数
qps.incr = 1000
# the limit batch number in one query. default is 128.
# 1次query内部的条数大小
qps.incr.batch_num = 128
# target mongodb configuration, currently, only supports sync to mongodb.
# 目的端配置, 目前支持mongodb和aliyun_dynamo_proxy
target.type = mongodb
# target mongodb address, e.g., mongodb://username:[email protected]:3791,10.1.1.2:3792
# 如果是mongodb,此处需要配置目的mongodb的连接串,否则请配置阿里云dynamodb的连接串
# 例如:http://dds-xxxx:3717
target.address = mongodb://root:[email protected]:27017/?authSource=admin
# target moongodb type, replica or sharding.
# 目的mongodb类型, 副本集选择replica,分片集群请选择sharding
target.mongodb.type = replica
# how to solve if target mongodb has the same name table.
# "drop" means drop the table before syncing.
# "rename" means rename current table which timestamp suffix, e.g., c1 -> c1.2019-07-01Z12:10:11
# rename仅支持target.type=mongodb的情况
# 如果目的端已经有重名的表,rename将会对原来的表进行重命名,添加
# 时间戳后缀,比如c1变为c1.2019-07-01Z12:10:11;drop表示删除目的表;留空表示不处理。
target.db.exist = drop
# only sync schema without any data.
# 仅同步数据结构,不同步任何数据。
sync_schema_only = false
# full sync configuration.
# 全量同步参数
# how many tables will be synced at the same time.
# 表级别并发度,1次最多同步表的数目
full.concurrency = 4
# how many reading threads working in one table.
# 表内文档的并发度,1个表最多有几个线程同时并发读取源端,对应Scan接口的TotalSegments
full.read.concurrency = 1
# how many writing threads working in one table.
# 表内文档的并发度,1个表最多有几个线程同时并发写入目的端
full.document.concurrency = 4
# how many doc batched in one writing request
# 一次聚合写入多少条数据,如果目的端是DynamoDB协议最大配置25
full.document.write.batch = 25
# the number of parsers which do parse dynamodb to mongodb.
# 表内解析线程个数,用户转换dynamo协议到目的端对应协议
full.document.parser = 2
# enable sync user created indexes?
# 主键索引primaryKey会默认创建索引,除此以外是否同步用户自建的索引,
full.enable_index.user = true
# change insert to update when duplicate key error found
# 全量同步,目的端碰到相同的key,是否将insert改为update
full.executor.insert_on_dup_update = true
# increase sync configuration.
# 增量同步参数
# 增量同步并发参数,1次最多抓一个shard
increase.concurrency = 16
# 增量同步,insert语句目的端碰到相同的key,是否将insert改为update
increase.executor.insert_on_dup_update = true
# 增量同步,update语句目的端不存在key,是否将update改为upsert
increase.executor.upsert = true
# checkpoint存储的类型,可以是mongodb(target.type必须是mongodb),也可以是
# file,本地落盘
checkpoint.type = file
# checkpoint存储的地址,如果是目的端是mongodb则存储目的端的mongodb的地址,不配置默认目的mongodb
# 如果是file则填写相对路径(例如checkpoint),不配置默认名为checkpoint
checkpoint.address =
# checkpoint存储在目的mongodb中的db名,默认以"$id-checkpoint"存储
checkpoint.db =
# convert.type = raw