Code Monkey home page Code Monkey logo

pegasus-java-client's People

Contributors

acelyc111 avatar cauchy1988 avatar dependabot[bot] avatar foreverneverer avatar hycdong avatar levy5307 avatar qinzuoyan avatar shengofsun avatar totalo avatar zhongchaoqiang avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

pegasus-java-client's Issues

pegasus client logs need to be improved

There are some unreasonable aspects in the client logs, We want to optimize the logs to improve the performance、the efficiency of troubleshooting and so on. Here are some possible optimization points:

  • Pegasus client use the log4j dependency with poor performance. This problem is being solved in #70
  • Pegasus client exception log information is not detailed, for example the log of operation timeout:
    com.xiaomi.infra.pegasus.client.PException: 1.12-SNAPSHOT: com.xiaomi.infra.pegasus.rpc.ReplicationException: ERR_TIMEOUT: [table=table_name, timeout=200ms] Timeout on Future await: null
    from above:
    • We can't get the meta_server infomation: add it.
    • We can't get the key or value information: add it, but it is necessary to consider the display problem caused by too long(such as 65536) key or value. So we can choose display the first n(such as 100) characters of the key or value.
  • Based on practical experience, the client log usually mixs in user business log which is not conducive to troubleshooting, even that, user may close the client log. So we need control log behavior. For example, log to a special folder defaultly.

You can also offer some other good advice and list it in here:

  • problem 1
  • problem 2
    .......

query meta when an amount of ERR_TIMEOUT occurred but no ERR_SESSION_RESET

Reproduction

  1. All nodes are healthy at start.
>>> nodes -d
address               status              replica_count       primary_count       secondary_count     
172.21.0.21:34801     ALIVE               3                   1                   2                   
172.21.0.22:34801     ALIVE               5                   2                   3                   
172.21.0.23:34801     ALIVE               4                   2                   2                   
172.21.0.24:34801     ALIVE               5                   1                   4                   
172.21.0.25:34801     ALIVE               5                   2                   3                 
  1. Run ycsb.
./bin/ycsb load pegasus -s -P workloads/workload_pegasus -p "pegasus.config=file://./pegasus/conf/pegasus.properties" > outputLoad.txt
  1. Partition replica1(172.21.0.21) with rest of the nodes and ycsb client (packet loss rate is 100%)
docker run -it --rm -v /var/run/docker.sock:/var/run/docker.sock gaiaadm/pumba netem --duration 1h --tc-image gaiadocker/iproute2 loss --percent 100 pegasus_replica1_1
  1. After a while this node becomes unhealthy and kicked off by meta
>>> nodes -d
address               status              replica_count       primary_count       secondary_count     
172.21.0.21:34801     UNALIVE             0                   0                   0                   
172.21.0.22:34801     ALIVE               5                   2                   3                   
172.21.0.23:34801     ALIVE               4                   2                   2                   
172.21.0.24:34801     ALIVE               5                   2                   3                   
172.21.0.25:34801     ALIVE               5                   2                   3                   

total_node_count   : 5
alive_node_count   : 4
unalive_node_count : 1
  1. However for a long period the java client is still unconscious of the fail-over, so it retries until TCP's max retries time (usually 15min) reaches and finally gets ERR_SESSION_RESET error, which informs the client to retrieve the latest route table through meta.
2019-01-16 15:40:10:104 20 sec: 50889 operations; 1531.1 current ops/sec; est completion in 10 hours 54 minutes [INSERT: Count=15308, Max=3016703, Min=192, Avg=600.1, 90=552, 99=658, 99.9=4387, 99.99=254079] 
Retrying insertion, retry count: 1
2019-01-16 15:40:20:104 30 sec: 50892 operations; 0.3 current ops/sec; est completion in 16 hours 21 minutes [INSERT: Count=3, Max=3026943, Min=1519, Avg=1009738.67, 90=3026943, 99=3026943, 99.9=3026943, 99.99=3026943] [INSERT-FAILED: Count=1, Max=5009407, Min=5005312, Avg=5007360, 90=5009407, 99=5009407, 99.9=5009407, 99.99=5009407] 
Retrying insertion, retry count: 2

...

2019-01-16 15:55:40:104 950 sec: 50892 operations; 0 current ops/sec; est completion in 21 days 14 hours [INSERT: Count=0, Max=0, Min=9223372036854775807, Avg=NaN, 90=0, 99=0, 99.9=0, 99.99=0] [INSERT-FAILED: Count=1, Max=5001215, Min=4997120, Avg=4999168, 90=5001215, 99=5001215, 99.9=5001215, 99.99=5001215] 
Retrying insertion, retry count: 117
2019-01-16 15:55:50:103 960 sec: 57665 operations; 677.3 current ops/sec; est completion in 19 days 6 hours [INSERT: Count=6773, Max=1847295, Min=194, Avg=557.27, 90=333, 99=905, 99.9=3303, 99.99=17455] [INSERT-FAILED: Count=1, Max=5001215, Min=4997120, Avg=4999168, 90=5001215, 99=5001215, 99.9=5001215, 99.99=5001215] 
2019-01-16 15:56:00:103 970 sec: 78438 operations; 2077.3 current ops/sec; est completion in 14 days 7 hours [INSERT: Count=20773, Max=59487, Min=205, Avg=477.57, 90=580, 99=1158, 99.9=3153, 99.99=14071] [INSERT-FAILED: Count=0, Max=0, Min=9223372036854775807, Avg=NaN, 90=0, 99=0, 99.9=0, 99.99=0] 
2019-01-16 15:56:10:103 980 sec: 99983 operations; 2154.5 current ops/sec; est completion in 11 days 7 hours [INSERT: Count=21545, Max=13647, Min=352, Avg=460.73, 90=509, 99=917, 99.9=2807, 99.99=6699] [INSERT-FAILED: Count=0, Max=0, Min=9223372036854775807, Avg=NaN, 90=0, 99=0, 99.9=0, 99.99=0] 
2019-01-16 15:56:20:103 990 sec: 126495 operations; 2651.2 current ops/sec; est completion in 9 days 1 hours [INSERT: Count=26512, Max=30943, Min=250, Avg=374.47, 90=519, 99=777, 99.9=3103, 99.99=8051] [INSERT-FAILED: Count=0, Max=0, Min=9223372036854775807, Avg=NaN, 90=0, 99=0, 99.9=0, 99.99=0] 

bug: unable to reconnect a replica session

Version

1.11.9-thrift-0.11.0-inlined

Bug Analysis

12/15 08:12:36.048 [nioEventLoopGroup-9-10] [WARN ] c.x.i.p.r.a.ReplicaSession rpc_address(x.x.x.x:33801): actively close the session because it's not responding for 10 seconds
12/15 08:12:36.048 [nioEventLoopGroup-9-10] [INFO ] c.x.i.p.r.a.ReplicaSession channel rpc_address(x.x.x.x:33801) not connected, skip the close
12/15 08:12:46.150 [nioEventLoopGroup-9-35] [WARN ] c.x.i.p.r.a.TableHandler offline_idm: replica server(rpc_address(x.x.x.x:33801)) doesn't serve gpid(gpid(2.43)), operator(com.xiaomi.infra.pegasus.operator.rrdb_multi_get_operator@1755b308), try(1), error_code(ERR_SESSION_RESET), need query meta

After 10sec of continuous timeouts, the session was actively closed, but when it found itself not connected already, it skipped to close.

12/15 08:12:46.153: After 3ms, it queried meta for configuration. However timeouts still, the above process repeated, which means the reconnect somehow failed.

12/15 08:12:46.218 [nioEventLoopGroup-9-3] [WARN ] c.x.i.p.r.a.TableHandler offline_idm: replica server(rpc_address(x.x.x.x:33801)) rpc timeout for gpid(gpid(2.13)), operator(com.xiaomi.infra.pegasus.operator.rrdb_multi_get_operator@21c63849), try(1), error_code(ERR_TIMEOUT), not retry
com.xiaomi.infra.pegasus.client.PException: 1.11.9-thrift-0.11.0-inlined: com.xiaomi.infra.pegasus.rpc.ReplicationException: ERR_TIMEOUT: [table=offline_idm, timeout=200ms] Timeout on Future await: null

The pegasus cluster was stable, the table configuration was unchanged.

  public int asyncSend(client_operator op, Runnable callbackFunc, long timeoutInMilliseconds) {
   ...
      synchronized (pendingSend) {
        cache = fields;
        if (cache.state == ConnState.CONNECTED) {
          write(entry, cache);
        } else {
          pendingSend.offer(entry);
          if (cache.state == ConnState.DISCONNECTED) {
            cache = new VolatileFields();
            cache.state = ConnState.CONNECTING;
            fields = cache;
            needConnect = true;
          }
        }
      }
      if (needConnect) {
        doConnect();
      }

Apparently, if the connection state is DISCONNECTED, doConnect will definitely be called (because needConnect must be turned true). However, we couldn't find any logs indicating that Bootstrap.connect had completed.

  ChannelFuture doConnect() {
    try {
      // we will receive the channel connect event in DefaultHandler.ChannelActive
      return boot.connect(address.get_ip(), address.get_port())
          .addListener(
              new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                  if (channelFuture.isSuccess()) {
                    logger.info(
                        "{}: start to async connect to target, wait channel to active", name());
                  } else {
                    logger.warn(
                        "{}: try to connect to target failed: ", name(), channelFuture.cause());
                    markSessionDisconnect();
                  }
                }
              });
    } catch (UnknownHostException ex) {
      logger.error("invalid address: {}", address.toString());
      assert false;
      return null; // unreachable
    }
  }

We can infer that this session was CONNECTING or DISCONNECTED at that time, but somehow the connect call blocked and couldn't accomplish.

    boot.group(rpcGroup)
        .channel(ClusterManager.getSocketChannelClass())
        .option(ChannelOption.TCP_NODELAY, true)
        .option(ChannelOption.SO_KEEPALIVE, true)
        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, socketTimeout)
        .handler(
            new ChannelInitializer<SocketChannel>() {
              @Override
              public void initChannel(SocketChannel ch) {
                ChannelPipeline pipeline = ch.pipeline();
                pipeline.addLast("ThriftEncoder", new ThriftFrameEncoder());
                pipeline.addLast("ThriftDecoder", new ThriftFrameDecoder(this_));
                pipeline.addLast("ClientHandler", new ReplicaSession.DefaultHandler());
              }
            });

Normally the call wouldn't be blocked even if the host is unreachable, because we have set the CONNECT_TIMEOUT_MILLIS, which guarantees to end the connection when it times out.

fix extreme deadlock

遇到的问题描述:
1、我们在恶劣的生产机器环境下发现pegasus-java-client可能会发生死锁现象
2、因为我们应用一次取的数据很多,所以我们初始化客户端的时候配置的超时时间很大与sessionResetTimeWindowMs恰好相同
3、于是在生产环境因为各种原因读取发生超时,导致了以下调用逻辑而发生了死锁, 这里为了简化逻辑描述,假设我们刚开始配置了两个异步nio线程, 即 ’asyncWorkers' 设置成2, 这两个线程分别记做 线程-1, 线程-2 ; 然后我们有3个replicaserver,java client初始化的时候 产生的ReplicaSession结构体和replicaserver 1:1 对应, 这里记做 Replicasession1 , ReplicaSession2, ReplicaSession3; 并且每个 ReplicaSession 在连接对应的replicaserver时,内部的nettychannel会绑定一个nio线程, 这里也为了描述方便,设定对应绑定的关系如下: ReplicaSession1---线程1, ReplicaSession2---线程2, ReplicaSession3---线程1; 发生死锁的流程图如下:
image
4、 如上图, 发生死锁的原因是因为, ReplicaSession.addTimer 的超时流程 会被随机调度到 线程1 或者是 线程2; 然后发生超时后 tryNotifyFailureWithSeqID 方法 会调用closeSession函数来关闭session,但是关闭session的同步操作要在对应的Replicasession里的nettychannel绑定好的线程内部执行的; 于是就可能发生图上的交叉情况 发生死锁;
5、 这个死锁的概率 常规情况应该是概率很小的, 和我司的极限使用 情况有关系
6、给出的修复方法 是在初始化ReplicaSession的时候 就将 超时调度的线程 和 nettychannel线程强制设置成1个, 那么就不会发生上图交叉的情况 而发生死锁了
代码见 同名 pull request

Double-Checked Locking

PegasusTable table = tableMap.get(tableName);
if (table == null) {
synchronized (tableMapLock) {
table = tableMap.get(tableName);
if (table == null) {
try {
TableOptions options = new TableOptions(new PegasusHasher(), backupRequestDelayMs);
Table internalTable = cluster.openTable(tableName, options);
table = new PegasusTable(this, internalTable);
} catch (Throwable e) {
throw new PException(e);
}
tableMap.put(tableName, table);

Double-Checked Locking is widely cited and used as an efficient method for implementing lazy initialization in a multithreaded environment.
Unfortunately, it will not work reliably in a platform independent way when implemented in Java, without additional synchronization.
Declare a volatile local variable table offers a much more elegant solution

bug: client can't recover from a replica-server failure.

  • 2019/9/11 17:00. Our SRE stopped one instance of replica-server in our staging environment
    trying to simulate the problem java-client can't recover.

  • 2019/9/11 17:00. Some of our clients recovered right away while replica-server restarted, but some couldn't reconnect and kept retrying with ERR_TIMEOUT error.

2019-09-11 17:12:48,659 ERROR [nioEventLoopGroup-4-1] [com.xiaomi.message.mixin.fsm.ruleset.SaveAckRuleSet$3$1.operationComplete(SaveAckRuleSet.java:169)] - SaveB2CAckInfo fail, appId=2882303761517479657 msgId=sms-1c33f956-7dad-4327-8986-aa400cb4a8b2, ex:
com.xiaomi.infra.pegasus.client.PException: com.xiaomi.infra.pegasus.rpc.ReplicationException: ERR_TIMEOUT
	at com.xiaomi.infra.pegasus.client.PegasusTable$8.onCompletion(PegasusTable.java:376)
	at com.xiaomi.infra.pegasus.rpc.async.ClientRequestRound.thisRoundCompletion(ClientRequestRound.java:51)
	at com.xiaomi.infra.pegasus.rpc.async.TableHandler.tryDelayCall(TableHandler.java:314)
	at com.xiaomi.infra.pegasus.rpc.async.TableHandler.onRpcReply(TableHandler.java:295)
	at com.xiaomi.infra.pegasus.rpc.async.TableHandler$3.run(TableHandler.java:326)
	at com.xiaomi.infra.pegasus.rpc.async.ReplicaSession.tryNotifyWithSequenceID(ReplicaSession.java:226)
	at com.xiaomi.infra.pegasus.rpc.async.ReplicaSession.access$300(ReplicaSession.java:32)
	at com.xiaomi.infra.pegasus.rpc.async.ReplicaSession$4.run(ReplicaSession.java:270)
	at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
	at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:123)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:380)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
	at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
	at java.lang.Thread.run(Thread.java:748)
Caused by: com.xiaomi.infra.pegasus.rpc.ReplicationException: ERR_TIMEOUT
	... 15 more
  • 2019-09-11 17:12:48, the errors remained, we stopped the test.

Client Version

1.11.5-thrift-0.11.0-inlined-release

feature: backup request enhancement

Pegasus release backup request in 2.0.0, sometimes backup request will make cluser load heavier, client should support a throttling plan to restrict it.

feature: add a app create interface to the java client

[what i need?]
As mentioned in the pegasus issue : apache/incubator-pegasus#924
we wanna to create new app in container circumstances such as docker in k8s, but it is not convenient and comfortable to add the "run.sh" and "all the executed pegasus bin files" to the docker which running business code, so we think that pegasus need an app create interface

[what i will do]
i will add two new interface to the java client which are 'createApp' and 'isAppHealthy';

  1. The 'createApp' interface‘s logic is almost the same with the 'replication_ddl_client.h'
  2. The reason why 'isAppHealthy' interface is needed is that we can not make sure that the newly created app is fully healthy after the successful calling of 'createApp' interface;users need this interface to ensure each partition of the newly created app has enough replicas;

[how can i do]
I will reuse the 'MetaSession' class which already in the java client to send the ’RPC_CM_CREATE_APP‘ rpc to the server side, And considering the two added interface has 'timeoutMs' parameter, I refactor this class code simply;

bug: batchMultiGet may not fetch all data required by the input parameter

问题发生的原因如下:

'batchMultiGet' 接口,签名: public void batchMultiGet(List<Pair<byte[], List<byte[]>>> keys, List values, int timeout);

如上, 它的keys参数的每一个元素的类型是Pair<byte[], List<byte[]>>,代表了一个hashkey下多个sortedkey

这个接口会把每一个Pair<byte[], List<byte[]>>都单独向服务端异步发送一个 MULTI_GET Rpc请求, 但是发送Rpc请求的请求结构体中默默加入了两个限制:(1) sortedKey的条数限制 :目前写死的值是100 (2) 获取的value的总的bytes的限制 : 目前写死的是1000000 byte

服务端在收到这个MULTI_GET Rpc请求后,在组回复包时,会一边挨个扫得到的每条数据结果,一边对已获得数据的条数和数据的总大小进行统计,一旦发现 :(1) 已组包得数据的条数大于等于100 (2) 已组包的数据bytes总量大于等于1000000byte 就不继续将后续的数据的结果放入回复结构体了, 但是它会设置response.error=7(incomplete) 用来告诉客户端自己没有取全

客户端这边收到了回复请求,将结果塞入了MultiGetResult这个结构体中, 从这个结构体里 的allFetched字段也能知道此次MULTI_GET的rpc请求结果没有取全

但是 ’batchMultiGet‘ 在异步拿到所有MULTI_GET rpc结果后,组装最终的输出参数时直接忽视了MultiGetResult中的allFetched,它直接把能够取得的结果塞入输出参数中

综上所述, 这导致了接口使用者在使用’batchMultiGet'接口时 一旦某个Pair<byte[], List<byte[]>>中sortedkey数量超过100 或者 在服务端里保存的value总数据量超过1000000byte, 接口取到的数据 就是不全的 且 接口使用者完全无感知问题已出现

解决办法:

(1)在输出参数类型 ’HashKeyData' 这个结构体中, 加入一个字段‘allFetched’, 默认值为True; 在batchMultiGet 接口应用场景中,HashKeyData.allFetched直接拷贝MultiGetResult.allFetched的值; 这样接口使用者有办法知道自己获取的数据是否完整;

(2) 在原生客户端初始化的参数里 加上 设置这个 “100”条和“1000000byte”的配置

remove log4j from dependencies.

I found this java client depends on log4j:

    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>1.7.2</version>
    </dependency>

it's not good to bind a slf4j implementation like log4j in the library, many applications use other slf4j bindings like logback or log4j2.

Double-Checked Locking

if (table == null) {
synchronized (tableMapLock) {
table = tableMap.get(tableName);
if (table == null) {

Double-Checked Locking is widely cited and used as an efficient method for implementing lazy initialization in a multithreaded environment.
Unfortunately, it will not work reliably in a platform independent way when implemented in Java, without additional synchronization.
declares the table field volatile offers a much more elegant solution

see #111

Client API need be refactored and enhanced

Now the Java client API has follow problem:

  • redundant: it contain such as batchGet, batchGet2, batchMultiGet and so on, we expect it can be simply and clear
  • deficient: it just offer based set/get/del, but no auto-retry, auto-compressed, we expect it can have more auto function.
  • no support server-duplication: the latest Pegasus-Server offer duplication cluster, but the client can't support visit two cluster meanwhile.

Potential Roadmap:

  • Simply API #122 #124

  • Support auto-compress #123 #126

  • Refactor backup-request using interceptor #123 #125

  • Support auto-retry

  • Support RateLimiter

  • Support auto-serialize

  • Support dualClient to connect two cluster

refactor and simplify the API

TODO(EN)

相关Issue:#121

总体原则:简单可扩展

  • 接口的数量要尽量少:繁杂的API接口不利于用户区分使用和开发人员后期维护
  • 可扩展性强:功能的添加和变更不应改动API接口,避免用户升级客户端后,出现兼容性问题
  • 风格应趋于统一化:所有的API接口应有统一风格的参数传递和返回类型

当前问题

目前接口的主要问题即接口繁杂,而纠其根源是扩展性差导致的,具体表现为

  • 过多的重载:为了扩展同一类型请求的不同行为,当前往往使用参数重载的方式。每当添加新参数或新功能,就需增加重载方法。这将引起接口数量持续膨胀。典型为每个写接口都被重载了两个版本:

    public void set(byte[] hashKey, byte[] sortKey, byte[] value, int ttlSeconds, int timeout /*ms*/)
        throws PException;
    public void set(byte[] hashKey, byte[] sortKey, byte[] value, int timeout /*ms*/)
        throws PException;
  • 多条数据批量读写:几乎每个基础接口(set/get/delete...)都被封装了两个batch方法:batchXXX和batchXXX2,而实际上每个batch方法的代码往往是重复的。例如:

     public void batchSet(List<SetItem> items, int timeout /*ms*/) throws PException;
     public int batchSet2(List<SetItem> items, List<PException> results, int timeout /*ms*/)
        throws PException;
    
     public void batchMultiDel(List<Pair<byte[], List<byte[]>>> keys, int timeout /*ms*/)
         throws PException;
     public int batchMultiDel2(
         List<Pair<byte[], List<byte[]>>> keys, List<PException> results, int timeout /*ms*/)
         throws PException;
  • 返回结果不统一:目前查询的返回结果包括value,pair<hashKey,value>, pair<sortKey, value>,一方面用户需要区分获取的数据含义,另一方面如果获取数据包含额外信息(如timestamp),则根本无法扩展

解决方案

  • 封装:对请求参数和返回结果都进行封装,如果需要变更参数或者增加参数,只需改动封装对象,而不必添加重载接口,例如:

    public class Set implements Serializable {
        public byte[] hashKey;
        public byte[] sortKey;
        public byte[] value;
        public int ttlSeconds; // 0 means no ttl
    }
    
    //仅保留一个接口
    public void set(Set set, int timeout)

    但是,将请求和返回结果进行封装的重构会导致现有接口被废弃,受影响的接口较多,该方案暂时搁置#124

  • Batch接口重构:当前每个基础接口都封装了对应的batch接口,这使API的数量大大增加。batch操作应该设计可扩展的Batch抽象类,使得任何single操作都可以扩展为batch操作,从而降低冗余的batch接口数量:

    // 所有的批量操作都可以用Batch实现,如batchSet,可以让Request=Set,
    // Response=SetResult。理论上任何RPC都可以通过Batch类实现批处理。
    class Batch<Request, Response> {
      void commit(List<Request> resuests, List<Response> responses){}
    }

详细设计

Batch接口重构

我们支持以下操作进行批量处理,它们目前在 PegasusClientInterface/PegasusTableInterface 都有对应的 batchXXX 接口:

  1. get,multiGet
  2. del,multiDel,set,multiSet

那么下面给出具体重构方式:

public abstract class Batch<Request, Response> {

    final PegasusTableInterface table;
    final int timeout;

    FutureGroup<Response> futureGroup;

    public Batch(PegasusTableInterface table, int timeout) {
        this.table = table;
        this.timeout = timeout;
    }

    //无返回数据,任意请求失败,则抛出异常,适用于写操作
    public void commit(List<Request> requests) throws PException {
        assert (!requests.isEmpty());
        asyncCommit(requests).waitAllCompleteOrOneFail(null, timeout);
    }

    //有返回数据,且仅需要缓存结果值,任意请求失败,则抛出异常,已经成功的存放在responses中,适用于读操作
    public void commit(List<Request> requests, List<Response> responses) throws PException {
        assert (!requests.isEmpty());
        asyncCommit(requests).waitAllCompleteOrOneFail(responses, timeout);
    }

    //有返回数据,需要获取所有请求结果值和异常值,失败的请求把异常和结果存放在responses,适用于读写操作
    public void commitWaitAllComplete(List<Request> requests, List<Pair<PException,Response>> responses) throws PException {
        assert (!requests.isEmpty());
        asyncCommit(requests).waitAllcomplete(responses, timeout);
    }

    private FutureGroup<Response> asyncCommit(List<Request> requests){
        futureGroup = new FutureGroup<>(requests.size());
        for (Request request : requests) {
            futureGroup.add(asyncCommit(request));
        }
        return futureGroup;
    }

    //抽象方法,使用任意基础操作(set, get)实现该方法,则该基础方法即可扩展为batch操作
    abstract Future<Response> asyncCommit(Request request);
}

Example

Get操作基于Batch类重构的结果:

public class Get  {
    public byte[] hashKey;
    public byte[] sortKey;

    public Get(byte[] hashKey) {
        this.hashKey = hashKey;
    }

    public Get(byte[] hashKey, byte[] sortKey) {
        this.hashKey = hashKey;
        this.sortKey = sortKey;
    }
}

public class BatchGet extends Batch<Get, byte[]> {
    public BatchGet(PegasusTableInterface table, int timeout) {
        super(table, timeout);
    }

    @Override
    Future<byte[]> asyncCommit(Get get) {
        return table.asyncGet(get.hashKey, get.sortKey, timeout);
    }
}

Set操作基于Batch类重构的结果:

public class Set {
    public byte[] hashKey;
    public byte[] sortKey;
    public byte[] value;
    public int ttlSeconds; // 0 means no ttl

    public Set(byte[] hashKey, byte[] sortKey, byte[] value) {
        this(hashKey, sortKey, value, 0);
    }

    public Set(byte[] hashKey, byte[] sortKey, byte[] value, int ttlSeconds) {
        assert (value != null && ttlSeconds >= 0);
        this.hashKey = hashKey;
        this.sortKey = sortKey;
        this.value = value;
        this.ttlSeconds = ttlSeconds;
    }
}

public class BatchSet extends Batch<Set, Void> {
    public BatchSet(PegasusTableInterface table, int timeout) {
        super(table, timeout);
    }

    @Override
    public Future<Void> asyncCommit(Set set) {
        return table.asyncSet(set.hashKey, set.sortKey, set.value, timeout);
    }
}

新的Batch接口使用示例:

public class PegasusTest {

    public static void main(String[] args) throws PException {
        PegasusTableInterface table = PegasusClientFactory.getSingletonClient().openTable("temp");
        Batch<Get,byte[]> batch = new BatchGet(table, 1000);
        List<Get> requests = new ArrayList<>();
        List<byte[]> responses = new ArrayList<>();
        requests.add(new Get("hashKey1".getBytes(),"sortKey1".getBytes()));
        requests.add(new Get("hashKey2".getBytes(),"sortKey2".getBytes()));
        batch.commit(requests, responses);
    }
}

用户也可以自定义实现batch操作(仅需使用asyncXXX实现asyncCommit):

public class PegasusTest {

    public static void main(String[] args) throws PException {
        PegasusTableInterface table = PegasusClientFactory.getSingletonClient().openTable("temp");
        
        Batch<MultiGet, MultiGetResult> multiGetbatch =  new Batch<MultiGet, MultiGetResult>(table,1000) {
            @Override
            public Future<MultiGetResult> asyncCommit(MultiGet multiGet) {
                return table.asyncMultiGet(multiGet.hashKey, multiGet.sortKeys,timeout);
            }
        };

        List<MultiGet> multiGetsRequests = new ArrayList<>();
        List<MultiGetResult> multiGetsResponses = new ArrayList<>();
        multiGetbatch.commit(multiGetsRequests, multiGetsResponses);
    }
}

使用该方案后,包括checkAndSet等所有async接口都可以方便的扩展为batch操作。相关PR: #129

参考

[1] 良好的RPC接口设计,需要注意这些方面
[2] 主流的数据库接口:Hbase-ClientAmazonDynamoDB-Client

support option to enable compression and hide details of compression from users

This issue is based on #121 and is the details of Support auto-compress.

This feature is simple:

class ZstdWrapper {
        // .......

public static byte[] tryDecompress(byte[] src) {
    byte[] decompressedValue;
    try {
      decompressedValue = decompress(src);
    } catch (PException e) {
      // decompress fail
      decompressedValue = src;
    }
    return decompressedValue;
  }
}


class PegasusTable {
//set
value = autoCompress? ZstdWrapper.compress(value) : value
//get
byte[] value = autoCompress? ZstdWrapper.tryDecompress(value) : value;
}

unit test broken

commit ffa86ba
break unit test in TestScan by code in pegasus/rpc/async/ReplicaSessionTest.java.

fill in partition_hash in thrift header

Thrift header has a reserved filed called 'partition_hash' , default value is 0, server doesn't use this filed currently. However, it will be used in server supporting partition split, so we should fill in correct value of it.

support backup request on client side

The proposal resides in apache/incubator-pegasus#251.

一、背景

Backup request功能可以优化业务在服务抖动时读延迟的长尾问题,适合对一致性要求低的用户。

二、设计

  1. 增加一个配置项,让用户选择是否启用backup request功能。
  2. 在ReplicaConfiguration中添加Map<rpc_address, ReplicaSession> allSessions成员,保存所有的session。
  3. 在TableHandler::initTableConfiguration中:
    1. 如果没有开启backup request,则只创建primary副本的session连接放入allSessions。
    2. 如果开启了backup reqeust,secondary也同样要创建连接, 将所有创建的session存入allSessions。
  4. 在TableHandler::call中:
    1. 如果是写操作,则要沿用已有逻辑,将请求发往primary。
    2. 如果是读操作,则向allSessions中的所有session都发送请求, 只接收最快的response,其他都忽略(backup reqeust未开启时,allSessions中只有primary session,也相当于只向primary发送)。
      1. 增加is_success(boolean类型), 初始状态下为false,表示请求还没有成功返回,并将其传递给TableHandler::onRpcReply。
      2. 如果isEmpty(),说明还没有和该partition建立好连接,则tryQueryMeta并重新建立连接。
  5. 在TableHandler::onRpcReply中,处理请求response:
    1. 如果is_success=true,则说明之前已经有response返回了,则直接忽略。
    2. 如果is_success=false, 说明该response是第一个返回的, 则按照之前的逻辑处理该请求,处理完之后设置is_success=true。

备注:

  • 需要对is_success的处理进行加锁。
  • 需要对client_operator中添加一个字段,标记是读操作还是写操作。
  • 在4和5中,对于是否开启了backup request是透明的,无需添加额外逻辑判断。只需要在3中创建session时做一下判断。

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.