Code Monkey home page Code Monkey logo

canal's Introduction

OceanBase Logo

English doc Chinese doc last commit stars building status license

Join Slack Stack Overflow

English | 中文版

OceanBase Database is a distributed relational database. It is developed entirely by Ant Group. The OceanBase Database is built on a common server cluster. Based on the Paxos protocol and its distributed structure, the OceanBase Database provides high availability and linear scalability. The OceanBase Database is not dependent on specific hardware architectures.

Key features

  • Transparent Scalability: 1,500 nodes, PB data and a trillion rows of records in one cluster.
  • Ultra-fast Performance: TPC-C 707 million tmpC and TPC-H 15.26 million QphH @30000GB.
  • Cost Efficiency: saves 70%–90% of storage costs.
  • Real-time Analytics: supports HTAP without additional cost.
  • Continuous Availability: RPO = 0(zero data loss) and RTO < 8s(recovery time)
  • MySQL Compatible: easily migrated from MySQL database.

See also key features for more details.

Quick start

See also Quick experience or Quick Start (Simplified Chinese) for more details.

🔥 Start with all-in-one

You can quickly deploy a stand-alone OceanBase Database to experience with the following commands:

Note: Linux Only

# download and install all-in-one package (internet connection is required)
bash -c "$(curl -s https://obbusiness-private.oss-cn-shanghai.aliyuncs.com/download-center/opensource/oceanbase-all-in-one/installer.sh)"
source ~/.oceanbase-all-in-one/bin/env.sh

# quickly deploy OceanBase database
obd demo

🐳 Start with docker

Note: We provide images on dockerhub, quay.io and ghcr.io. If you have problems pulling images from dockerhub, please try the other two registries.

  1. Start an OceanBase Database instance:

    # Deploy a mini standalone instance.
    docker run -p 2881:2881 --name oceanbase-ce -e MODE=mini -d oceanbase/oceanbase-ce
    
    # Deploy a mini standalone instance using image from quay.io.
    # docker run -p 2881:2881 --name oceanbase-ce -e MODE=mini -d quay.io/oceanbase/oceanbase-ce
    
    # Deploy a mini standalone instance using image from ghcr.io.
    # docker run -p 2881:2881 --name oceanbase-ce -e MODE=mini -d ghcr.io/oceanbase/oceanbase-ce
  2. Connect to the OceanBase Database instance:

    docker exec -it oceanbase-ce obclient -h127.0.0.1 -P2881 -uroot # Connect to the root user of the sys tenant.

See also Docker Readme for more details.

☸️ Start with Kubernetes

You can deploy and manage OceanBase Database instance in kubernetes cluster with ob-operator quickly. Refer to the document Quick Start for ob-operator to see details.

👨‍💻 Start developing

See OceanBase Developer Document to learn how to compile and deploy a manually compiled observer.

Roadmap

For future plans, see Product Iteration Progress. See also OceanBase Roadmap for more details.

Case study

OceanBase has been serving more than 1000 customers and upgraded their database from different industries, including Financial Services, Telecom, Retail, Internet, and more.

See also success stories and Who is using OceanBase for more details.

System architecture

Introduction to system architecture

Contributing

Contributions are highly appreciated. Read the development guide to get started.

License

OceanBase Database is licensed under the Mulan Public License, Version 2. See the LICENSE file for more info.

Community

Join the OceanBase community via:

canal's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar

canal's Issues

Startup Canal comes up with this error

Question

Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2021-10-21 15:42:52.717 [main] ERROR org.springframework.boot.SpringApplication - Application run failed
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'curatorClient': Invocation of init method failed; nested exception is java.lang.NoSuchMethodError: com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor()Lcom/google/common/util/concurrent/ListeningExecutorService;
at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:138) ~[spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:422) ~[spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1694) ~[spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:579) ~[spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:501) ~[spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:317) ~[spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:228) ~[spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:315) ~[spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:199) ~[spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:760) ~[spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:869) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:550) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:140) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:759) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:395) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:327) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]
at com.alibaba.otter.canal.adapter.launcher.CanalAdapterApplication.main(CanalAdapterApplication.java:19) [client-adapter.launcher-1.1.6-SNAPSHOT.jar:na]
Caused by: java.lang.NoSuchMethodError: com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor()Lcom/google/common/util/concurrent/ListeningExecutorService;
at org.apache.curator.framework.listen.ListenerContainer.addListener(ListenerContainer.java:41) ~[curator-framework-2.11.0.jar:na]
at org.apache.curator.framework.imps.CuratorFrameworkImpl.start(CuratorFrameworkImpl.java:257) ~[curator-framework-2.11.0.jar:na]
at com.alibaba.otter.canal.adapter.launcher.config.CuratorClient.init(CuratorClient.java:36) ~[client-adapter.launcher-1.1.6-SNAPSHOT.jar:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_242]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_242]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_242]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_242]
at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:365) ~[spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:308) ~[spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:135) ~[spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
... 16 common frames omitted

ob 社区4.3版本,使用ob-canal 报错

2024-04-30 11:17:06.236 [Thread-10] ERROR c.a.o.c.p.inbound.oceanbase.logproxy.LogProxyConnection - OceanBase LogProxyClient listener error :
com.oceanbase.clogproxy.client.exception.LogProxyClientException: Unsupported protocol version: 18944
at com.oceanbase.clogproxy.client.connection.ClientHandler.checkHeader(ClientHandler.java:256)
at com.oceanbase.clogproxy.client.connection.ClientHandler.handleHeader(ClientHandler.java:174)
at com.oceanbase.clogproxy.client.connection.ClientHandler.channelRead(ClientHandler.java:141)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.base/java.lang.Thread.run(Thread.java:833)

Canal Server 监听消息投递到Rocketmq中,运行2~3天,自动就不在监听日志变化了,但是进程还在运行中

canal.properties

#################################################
######### common argument #############
#################################################

tcp bind ip

canal.ip = 192.168.14.99

register ip to zookeeper

canal.register.ip = 192.168.14.99
canal.port = 11111
canal.metrics.pull.port = 11112

canal instance user/passwd

canal.user = canal

canal.passwd =

canal admin config

#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd =

admin auto register

#canal.admin.register.auto = true
#canal.admin.register.cluster =
#canal.admin.register.name =

canal.zkServers = 192.168.14.99:12181

flush data to zk

canal.zookeeper.flush.period = 1000
canal.withoutNetty = false

tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ

canal.serverMode = rocketMQ

flush meta cursor/parse position to file

canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000

memory store RingBuffer size, should be Math.pow(2,n)

canal.instance.memory.buffer.size = 1024

memory store RingBuffer used memory unit size , default 1kb

canal.instance.memory.buffer.memunit = 3500

meory store gets mode used MEMSIZE or ITEMSIZE

canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true

detecing config

canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false

support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery

canal.instance.transaction.size = 1024

mysql fallback connected to new master should fallback times

canal.instance.fallbackIntervalInSeconds = 60

network config

canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30

binlog filter config

canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = true
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = true
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = true

binlog format/image check

canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

binlog ddl isolation

canal.instance.get.ddl.isolation = false

parallel parser config

canal.instance.parser.parallel = true

concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()

#canal.instance.parser.parallelThreadSize = 16

disruptor ringbuffer size, must be power of 2

canal.instance.parser.parallelBufferSize = 256

table meta tsdb info

canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal

dump snapshot interval, default 24 hour

canal.instance.tsdb.snapshot.interval = 24

purge snapshot expire , default 360 hour(15 days)

canal.instance.tsdb.snapshot.expire = 360

#################################################
######### destinations #############
#################################################
canal.destinations = real_time

conf root dir

canal.conf.dir = ../conf

auto scan instance dir add/remove and start/stop instance

canal.auto.scan = true
canal.auto.scan.interval = 5

set this value to 'true' means that when binlog pos not found, skip to latest.

WARN: pls keep 'false' in production env, or if you know what you want.

canal.auto.reset.latest.pos.mode = false

canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/ob-file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

##################################################
######### MQ Properties #############
##################################################

aliyun ak/sk , support rds/mq

canal.aliyun.accessKey =
canal.aliyun.secretKey =
canal.aliyun.uid=

canal.mq.flatMessage = true
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100

Set this value to "cloud", if you want open message trace feature in aliyun.

canal.mq.accessChannel = local

canal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8

##################################################
######### Kafka #############
##################################################
kafka.bootstrap.servers = 127.0.0.1:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0

kafka.kerberos.enable = false
kafka.kerberos.krb5.file = ../conf/kerberos/krb5.conf
kafka.kerberos.jaas.file = ../conf/kerberos/jaas.conf

sasl demo

kafka.sasl.jaas.config = org.apache.kafka.common.security.scram.ScramLoginModule required \n username="alice" \npassword="alice-secret";

kafka.sasl.mechanism = SCRAM-SHA-512

kafka.security.protocol = SASL_PLAINTEXT

##################################################
######### RocketMQ #############
##################################################
rocketmq.producer.group = CANAL-SYNC-GROUP
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = 192.168.14.96:9876;192.168.14.97:9876;192.168.14.99:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag =

##################################################
######### RabbitMQ #############
##################################################
rabbitmq.host =
rabbitmq.virtual.host =
rabbitmq.exchange =
rabbitmq.username =
rabbitmq.password =
rabbitmq.queue =
rabbitmq.routingKey =
rabbitmq.deliveryMode =

##################################################
######### Pulsar #############
##################################################
pulsarmq.serverUrl =
pulsarmq.roleToken =
pulsarmq.topicTenantPrefix =

instance.properties

ob server info

canal.instance.oceanbase.rsList=192.168.14.99:2882:2881
canal.instance.oceanbase.username=datax@fuel_tenant
canal.instance.oceanbase.password=xxxxxxxxxx
canal.instance.oceanbase.startTimestamp=0
canal.instance.oceanbase.clusterUrl=
canal.instance.oceanbase.timezone=+8:00
canal.instance.oceanbase.workingMode=storage

set extraConfigs for libobcdc, format {'key1': 'value1', 'key2': 'value2'}

canal.instance.oceanbase.obcdc.extraConfigs={'store_service_path': './storage'}

ob log proxy info

canal.instance.oceanbase.logproxy.address=192.168.14.99:2983
canal.instance.oceanbase.logproxy.sslEnabled=false
#canal.instance.oceanbase.logproxy.serverCert=../conf/${canal.instance.destination:}/ca.crt
#canal.instance.oceanbase.logproxy.clientCert=../conf/${canal.instance.destination:}/client.crt
#canal.instance.oceanbase.logproxy.clientKey=../conf/${canal.instance.destination:}/client.key
#canal.instance.oceanbase.logproxy.clientId=

tenant name

canal.instance.oceanbase.tenant=fuel_tenant

exclude tenant name in target schema name

canal.instance.parser.excludeTenantInDbName=true

table regex, format: [tenant].[database].[table]

canal.instance.filter.regex=fuel_tenant..
#canal.instance.filter.regex=rl.fuel_system.sys_dept,rl.fuel_system.sys_enterprise,rl.fuel_system.sys_menu,rl.fuel_system.sys_role,rl.fuel_system.sys_post,rl.fuel_user.sys_user

table black regex

canal.instance.filter.black.regex=fuel_tenant.fuel_report_data.,fuel_tenant.fuel_lims.,fuel_tenant.xxl_job.,fuel_tenant.fuel_supervise.
#canal.instance.filter.black.regex=fuel_tenant.fuel_report_data.lims_result

table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)

#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch

table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)

#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

mq config

canal.mq.topic=real-time-sync-data
canal.mq.canalBatchSize=50

dynamic topic route by schema or table regex

#canal.mq.dynamicTopic=mytest1.user,mytest2\..,.\..*
#canal.mq.partition=6

hash partition config

canal.mq.partitionsNum=3
canal.mq.partitionHash=.\..
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#################################################

support oceanbase ddl

现阶段obcanal的ddl解析似乎存在问题,druid对oceanbase较复杂的语法并不支持解析。

举例:
mysql语法兼容,且可以通过obcanal同步到kafka

CREATE TABLE `sqlmonitorexob_sql_tf31` (
  `svr_ip` varchar(46) NOT NULL,
  `svr_port` bigint(20) NOT NULL,
  `tenant_id` bigint(20) NOT NULL,
  `request_id` bigint(20) NOT NULL,
  `trace_id` varchar(128) NOT NULL,
  `client_ip` varchar(46) NOT NULL,
  `client_port` bigint(20) NOT NULL,
  `tenant_name` varchar(64) NOT NULL,
  `effective_tenant_id` bigint(20) NOT NULL,
  `user_id` bigint(20) NOT NULL,
  `user_name` varchar(64) NOT NULL,
  `db_id` bigint(20) unsigned NOT NULL,
  `wait_time_micro` bigint(20) NOT NULL,
  `total_wait_time_micro` bigint(20) NOT NULL,
  `total_waits` bigint(20) NOT NULL,
  `rpc_count` bigint(20) NOT NULL,
  `plan_type` bigint(20) NOT NULL,
  `is_inner_sql` tinyint(4) NOT NULL,
  `is_executor_rpc` tinyint(4) NOT NULL,
  `is_hit_plan` tinyint(4) NOT NULL,
  `request_time` bigint(20) NOT NULL,
  `elapsed_time` bigint(20) NOT NULL,
  `net_time` bigint(20) NOT NULL,
  `net_wait_time` bigint(20) NOT NULL,
  `queue_time` bigint(20) NOT NULL,
  `decode_time` bigint(20) NOT NULL,
  `get_plan_time` bigint(20) NOT NULL,
  `execute_time` bigint(20) NOT NULL,
  `application_wait_time` bigint(20) unsigned NOT NULL,
  `concurrency_wait_time` bigint(20) unsigned NOT NULL,
  `user_io_wait_time` bigint(20) unsigned NOT NULL,
  `schedule_time` bigint(20) unsigned NOT NULL,
  `row_cache_hit` bigint(20) NOT NULL,
  `bloom_filter_cache_hit` bigint(20) NOT NULL,
  `block_cache_hit` bigint(20) NOT NULL,
  `block_index_cache_hit` bigint(20) NOT NULL,
  `disk_reads` bigint(20) NOT NULL,
  `execution_id` bigint(20) NOT NULL,
  `session_id` bigint(20) unsigned NOT NULL,
  `retry_cnt` bigint(20) NOT NULL,
  `table_scan` tinyint(4) NOT NULL,
  `consistency_level` bigint(20) NOT NULL,
  `memstore_read_row_count` bigint(20) NOT NULL,
  `ssstore_read_row_count` bigint(20) NOT NULL,
  `request_memory_used` bigint(20) NOT NULL,
  `expected_worker_count` bigint(20) NOT NULL,
  `used_worker_count` bigint(20) NOT NULL,
  `sched_info` varchar(16384) DEFAULT NULL,
  `fuse_row_cache_hit` bigint(20) NOT NULL,
  `user_client_ip` varchar(46) NOT NULL,
  `ps_stmt_id` bigint(20) NOT NULL,
  `transaction_hash` bigint(20) unsigned NOT NULL,
  `request_type` bigint(20) NOT NULL,
  `is_batched_multi_stmt` tinyint(4) NOT NULL,
  `ob_trace_info` varchar(4096) NOT NULL,
  `plan_hash` bigint(20) unsigned NOT NULL,
  `user_group` bigint(20) DEFAULT NULL,
  `lock_for_read_time` bigint(20) NOT NULL,
  `wait_trx_migrate_time` bigint(20) NOT NULL,
  PRIMARY KEY (`svr_ip`, `svr_port`, `tenant_id`, `request_id`),
  KEY `all_virtual_sql_audit_i1` (`tenant_id`, `request_id`)
)partition by key(svr_ip, svr_port) partitions 18;

oceanbase语法不兼容,obcanal不能正常解析

CREATE TABLE `sqlmonitorexob_sql_tf31` (
  `svr_ip` varchar(46) NOT NULL,
  `svr_port` bigint(20) NOT NULL,
  `tenant_id` bigint(20) NOT NULL,
  `request_id` bigint(20) NOT NULL,
  `trace_id` varchar(128) NOT NULL,
  `client_ip` varchar(46) NOT NULL,
  `client_port` bigint(20) NOT NULL,
  `tenant_name` varchar(64) NOT NULL,
  `effective_tenant_id` bigint(20) NOT NULL,
  `user_id` bigint(20) NOT NULL,
  `user_name` varchar(64) NOT NULL,
  `db_id` bigint(20) unsigned NOT NULL,
  `wait_time_micro` bigint(20) NOT NULL,
  `total_wait_time_micro` bigint(20) NOT NULL,
  `total_waits` bigint(20) NOT NULL,
  `rpc_count` bigint(20) NOT NULL,
  `plan_type` bigint(20) NOT NULL,
  `is_inner_sql` tinyint(4) NOT NULL,
  `is_executor_rpc` tinyint(4) NOT NULL,
  `is_hit_plan` tinyint(4) NOT NULL,
  `request_time` bigint(20) NOT NULL,
  `elapsed_time` bigint(20) NOT NULL,
  `net_time` bigint(20) NOT NULL,
  `net_wait_time` bigint(20) NOT NULL,
  `queue_time` bigint(20) NOT NULL,
  `decode_time` bigint(20) NOT NULL,
  `get_plan_time` bigint(20) NOT NULL,
  `execute_time` bigint(20) NOT NULL,
  `application_wait_time` bigint(20) unsigned NOT NULL,
  `concurrency_wait_time` bigint(20) unsigned NOT NULL,
  `user_io_wait_time` bigint(20) unsigned NOT NULL,
  `schedule_time` bigint(20) unsigned NOT NULL,
  `row_cache_hit` bigint(20) NOT NULL,
  `bloom_filter_cache_hit` bigint(20) NOT NULL,
  `block_cache_hit` bigint(20) NOT NULL,
  `block_index_cache_hit` bigint(20) NOT NULL,
  `disk_reads` bigint(20) NOT NULL,
  `execution_id` bigint(20) NOT NULL,
  `session_id` bigint(20) unsigned NOT NULL,
  `retry_cnt` bigint(20) NOT NULL,
  `table_scan` tinyint(4) NOT NULL,
  `consistency_level` bigint(20) NOT NULL,
  `memstore_read_row_count` bigint(20) NOT NULL,
  `ssstore_read_row_count` bigint(20) NOT NULL,
  `request_memory_used` bigint(20) NOT NULL,
  `expected_worker_count` bigint(20) NOT NULL,
  `used_worker_count` bigint(20) NOT NULL,
  `sched_info` varchar(16384) DEFAULT NULL,
  `fuse_row_cache_hit` bigint(20) NOT NULL,
  `user_client_ip` varchar(46) NOT NULL,
  `ps_stmt_id` bigint(20) NOT NULL,
  `transaction_hash` bigint(20) unsigned NOT NULL,
  `request_type` bigint(20) NOT NULL,
  `is_batched_multi_stmt` tinyint(4) NOT NULL,
  `ob_trace_info` varchar(4096) NOT NULL,
  `plan_hash` bigint(20) unsigned NOT NULL,
  `user_group` bigint(20) DEFAULT NULL,
  `lock_for_read_time` bigint(20) NOT NULL,
  `wait_trx_migrate_time` bigint(20) NOT NULL,
  PRIMARY KEY (`svr_ip`, `svr_port`, `tenant_id`, `request_id`),
  KEY `all_virtual_sql_audit_i1` (`tenant_id`, `request_id`) BLOCK_SIZE 16384 LOCAL
) DEFAULT CHARSET = utf8mb4 ROW_FORMAT = COMPACT COMPRESSION = 'zstd_1.3.8' REPLICA_NUM = 1 BLOCK_SIZE = 16384 USE_BLOOM_FILTER = FALSE TABLET_SIZE = 134217728 PCTFREE = 0
 partition by key(svr_ip, svr_port)
(partition p0,
partition p1,
partition p2,
partition p3,
partition p4,
partition p5,
partition p6,
partition p7,
partition p8,
partition p9,
partition p10,
partition p11,
partition p12,
partition p13,
partition p14,
partition p15,
partition p16,
partition p17)

otter 连接目标端ob

大佬麻烦请问一下,otter 目标端ob,test@test#testcluster使用这个用户名连接ob报错,这种怎么处理啊

oceanbase 同步到mysql 异常 unknow java.sql.Types - 0

异常信息如下:

pid:1 nid:1 exception:setl:com.alibaba.otter.node.etl.load.exception.LoadException: java.util.concurrent.ExecutionException: com.alibaba.otter.node.etl.load.exception.LoadException: com.alibaba.otter.node.etl.load.exception.LoadException: com.alibaba.otter.node.etl.load.exception.LoadException: java.lang.IllegalArgumentException: unknow java.sql.Types - 0
at com.alibaba.otter.node.etl.common.db.utils.SqlUtils.stringToSqlValue(SqlUtils.java:145)
at com.alibaba.otter.node.etl.load.loader.db.DbLoadAction$DbLoadWorker.doPreparedStatement(DbLoadAction.java:779)
at com.alibaba.otter.node.etl.load.loader.db.DbLoadAction$DbLoadWorker.access$800(DbLoadAction.java:523)
at com.alibaba.otter.node.etl.load.loader.db.DbLoadAction$DbLoadWorker$2$1.setValues(DbLoadAction.java:634)
at org.springframework.jdbc.core.JdbcTemplate$2.doInPreparedStatement(JdbcTemplate.java:816)
at org.springframework.jdbc.core.JdbcTemplate$2.doInPreparedStatement(JdbcTemplate.java:1)
at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:587)
at org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:812)
at org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:868)
at com.alibaba.otter.node.etl.load.loader.db.DbLoadAction$DbLoadWorker$2.doInTransaction(DbLoadAction.java:631)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:130)
at com.alibaba.otter.node.etl.load.loader.db.DbLoadAction$DbLoadWorker.doCall(DbLoadAction.java:623)
at com.alibaba.otter.node.etl.load.loader.db.DbLoadAction$DbLoadWorker.call(DbLoadAction.java:551)
at com.alibaba.otter.node.etl.load.loader.db.DbLoadAction.doTwoPhase(DbLoadAction.java:462)
at com.alibaba.otter.node.etl.load.loader.db.DbLoadAction.doLoad(DbLoadAction.java:275)
at com.alibaba.otter.node.etl.load.loader.db.DbLoadAction.load(DbLoadAction.java:161)
at com.alibaba.otter.node.etl.load.loader.db.DbLoadAction$$FastClassByCGLIB$$d932a4cb.invoke()
at net.sf.cglib.proxy.MethodProxy.invoke(MethodProxy.java:191)
at org.springframework.aop.framework.Cglib2AopProxy$DynamicAdvisedInterceptor.intercept(Cglib2AopProxy.java:618)
at com.alibaba.otter.node.etl.load.loader.db.DbLoadAction$$EnhancerByCGLIB$$80fd23c2.load()
at com.alibaba.otter.node.etl.load.loader.db.DataBatchLoader$2.call(DataBatchLoader.java:198)
at com.alibaba.otter.node.etl.load.loader.db.DataBatchLoader$2.call(DataBatchLoader.java:189)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

canal-for-ob 对 库表级的过滤不支持多个库表

  • I have searched the issues of this repository and believe that this is not a duplicate.
  • I have checked the FAQ of this repository and believe that this is not a duplicate.

environment

cdc同步ob增量数据到MQ

  • canal version
    1.1.6-alpha 1.1.7-alpha
  • ob version
    oceanbase-ce-4.2.1.3
    oblogproxy-2.0.0-101000012023121819

Issue Description

/Canal_Home/canal-for-ob/conf/example/instance.properties中的canal.instance.filter.regex参数设置为单表[tenant].[database].[table],启动可以正常处理,设置为[tenant].[database].[table1],[tenant].[database].[table2]时,启动日志报错,系统无法对oblogproxy设置过滤。

Exception trace

2024-01-15 09:58:33.366 [log-proxy-client-worker-1-thread-1] INFO  com.oceanbase.clogproxy.client.connection.ClientHandler - ClientId: 192.168.100.157_1213369_1705283913: rootserver_list=192.168.100.157:2882:2881, cluster_user=root@obmysql#obce-single, cluster_password=******, tb_white_list=obmysql.meta.peri_item,obmysql.meta.catalog, tb_black_list=|, start_timestamp=0, timezone=+8:00, working_mode=storage connecting LogProxy: 192.168.100.157:2983
2024-01-15 09:58:33.367 [Thread-13] INFO  com.oceanbase.clogproxy.client.connection.ClientStream - Reconnect successfully
2024-01-15 09:58:33.571 [log-proxy-client-worker-1-thread-1] ERROR com.oceanbase.clogproxy.client.connection.ClientHandler - LogProxy refused handshake request: code: 502
message: "Failed to parse configuration"

2024-01-15 09:58:33.572 [log-proxy-client-worker-1-thread-1] ERROR com.oceanbase.clogproxy.client.connection.ClientHandler - Exception occurred ClientId: 192.168.100.157_1213369_1705283913: rootserver_list=192.168.100.157:2882:2881, cluster_user=root@obmysql#obce-single, cluster_password=******, tb_white_list=obmysql.meta.peri_item,obmysql.meta.catalog, tb_black_list=|, start_timestamp=0, timezone=+8:00, working_mode=storage, with LogProxy: 192.168.100.157:2983
com.oceanbase.clogproxy.client.exception.LogProxyClientException: LogProxy refused handshake request: code: 502
message: "Failed to parse configuration"

	at com.oceanbase.clogproxy.client.connection.ClientHandler.handleErrorResponse(ClientHandler.java:217)
	at com.oceanbase.clogproxy.client.connection.ClientHandler.channelRead(ClientHandler.java:147)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
	at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at java.lang.Thread.run(Thread.java:750)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.