你好,我们在用 canal-go 来消费多个 Destination 的 binlog 做数据同步,但是发现多个实例只有一个能够竞争到并且连接到 canal-server。
const (
runningFlag = byte(0)
notRunningFlag = byte(0)
path = "/canal-consumer"
)
func NewClusterCanalConnector(canalNode *CanalClusterNode, username string, password string, destination string,
soTimeOut int32, idleTimeOut int32) (*ClusterCanalConnector,error) {
destinationPath := fmt.Sprintf("%s/%s", path, destination)
err := checkRootPath(canalNode.zkClient, destinationPath)
if err != nil {
return nil, err
}
currentSequence, err := createEphemeralSequence(canalNode.zkClient, destinationPath)
if err != nil {
return nil,err
}
cluster := &ClusterCanalConnector{
canalNode: canalNode,
username: username,
password: password,
destination: destination,
soTimeOut: soTimeOut,
idleTimeOut: idleTimeOut,
RetryTimes: 0,
currentSequence:currentSequence,
zkVersion: 0,
Path: destinationPath,
}
return cluster, nil
}