Code Monkey home page Code Monkey logo

blog's Introduction

Blog

Just use the issue of the repo as a blog

blog's People

Contributors

dubin555 avatar

Watchers

James Cloos avatar  avatar

blog's Issues

airflow 0.1版本源码走读

背景

很早以前就看过airflow的代码, 由于公司提供的离线平台功能少, 部分任务依赖的解决方案其实是参照airflow的, 当时就粗略读过airflow 0.1版的代码, 最近又重新看了看, 这里记录一下。

开始

这里我主要关心4个问题

  • job间的依赖
  • 外部依赖
  • 重试
  • 数据源

首先看到airflow的run方法, 跟进去, 里面TaskInstance的run方法,TaskInstance的属性

    __tablename__ = "task_instance"

    task_id = Column(String(ID_LEN), primary_key=True)
    dag_id = Column(String(ID_LEN), primary_key=True)
    execution_date = Column(DateTime, primary_key=True)
    start_date = Column(DateTime)
    end_date = Column(DateTime)
    duration = Column(Integer)
    state = Column(String(20))
    try_number = Column(Integer)
    hostname = Column(String(1000))
    unixname = Column(String(1000)) 

基本是表达当前任务的状态。
TI的run方法有点长, 基本是各种判断,最后走到 task_copy.execute(self.execution_date) , 里面是 各个operator的execute,这里对于普通的operator来说, 只要执行就可以了, 看到这里就可以回答第四个问题, 对于0.1版本来说, MySQL, Hive operator会把连接信息存到DB里, 这里利用Python得到MySQL或者Hive的记录。
对于第二个问题, 外部依赖,可以看到这样的代码

class BaseSensorOperator(BaseOperator):

   @apply_defaults
   def __init__(
           self,
           poke_interval=60,
           timeout=60*60*24*7,
           *args, **kwargs):
       super(BaseSensorOperator, self).__init__(*args, **kwargs)
       self.poke_interval = poke_interval
       self.timeout = timeout

       # Since a sensor pokes in a loop, no need for higher level retries
       self.retries = 0

   def poke(self):
       '''
       Function that the sensors defined while deriving this class should
       override.
       '''
       raise Exception('Override me.')

   def execute(self, execution_date):
       started_at = datetime.now()
       while not self.poke():
           sleep(self.poke_interval)
           if (datetime.now() - started_at).seconds > self.timeout:
               raise Exception('Snap. Time is OUT.')
       logging.info("Success criteria met. Exiting.")

如果poke返回false的话, 会等待直到poke返回true。
以上是对于单个dag里的单个task的执行流程。

那如果是调度任务呢, 需要一直运行的?
这里是一个MasterJob, 会进到_execute方法,里面大概是

while True:
    for dag in dags:
        for task in dag.tasks:
            if task.is_runnable():
                execute()....

这里就可以回答其余的问题, job间依赖和重试怎么解决

    def is_runnable(self):
        """
        Returns a boolean on whether the task instance has met all dependencies
        and is ready to run. It considers the task's state, the state
        of its dependencies, depends_on_past and makes sure the execution
        isn't in the future.
        """
        if self.execution_date > datetime.now() - self.task.schedule_interval:
            return False
        elif self.state == State.UP_FOR_RETRY and not self.ready_for_retry():
            return False
        elif self.state in State.runnable() and self.are_dependencies_met():
            return True
        else:
            return False

会检查DB中的上游task是不是已经完成, 还有执行的合法与否。

结束。

Monitoring vs Observability

微信公众号:进击的大数据
关注大数据方面技术。问题或建议,请公众号留言。

Monitoring vs Observability

Monitoring 指的是监控, 重点强调系统是不是正在工作。
Observability 更强调可观测, 例如某一个组件或者servie挂了, 能不能追踪到根源。

本文的起因也是因为看到了一个不错的视频,这里主要是照顾下没法fq的同志, 文章会对视频里的内容有小部分的不同, 有些是自己的理解。

能fq的同志可以看下原版的视频。
地址:https://www.youtube.com/watch?v=ACL_YVPD3gw

主要分为四个部分

  • HealthCheck
  • Metrics
  • Logging
  • Tracing

HealchCheck

健康监控主要的目的在于:

  • 是否工作
  • 可不可以完成任务
  • 负载如何

完成healthcheck的方式有很多种,可以是广播模式,例如一个负责监控的service定期向所有的agent发请求,得到所有agent的health情况,再做进一步汇总,也可以是Register模式,例如利用Zookeeper或者etcd来检测健康状况。

这两种我都做过,区别在于:

  • 广播模式的情况下,每个需要上报的agent可以做到轻量,而监控方需要自己调配监控的频率,数据的存储(例如需要监控的机器, service,以及agent的返回结果)等等。
  • Register模式下, 以ZooKeeper为例, 每个ZooKeeper的客户端都可以成为监控方, 同时只需要知道ZooKeeper地址就可以, 这一点针对于可能的扩容缩容帮助比较大。
  • 总结下来就是如果不需要agent之间的协同工作, 只是用于HealthCheck的话, 广播模式就足够了。

一个典型的healthcheck响应可以包括

GET http://1.1.1.1:8080/health
200 OK
{
    "service": "registration-service",
    "healthy":true,
    "workload":{"healthy":true},
    "dependencies":[
        {"name":"cassandra","healthy":true},
        {"name":"billing-service","healthy":true}
    ]
}

Metrics

metric的scope可以是从底层到更上层的:
System metrics(CPU, memory) -> Application metrics(Error rates) -> Business metrics
metric收集:
Metrics --> Metrics collector && Metrics query engine <-- Dashboard, Alert
开源的方案可以参照Prometheus & Grafana
这里有两点需要说明一下:

  • 不是每一个metric都要alert, 例如网络偶尔的异常
  • 粒度太细, 例如针对每个customerId, 可以理解很多业务为了尽快上线, 把逻辑放到这里来做, 不过针对特定业务的告警, 应该放到更细化的实时处理里面来做, 之后会单独写一篇, 并配代码。

Logging

  • 集中
  • 可搜索
  • 串联
  • 结构化
    例如:
ERROR [SVC=A][trace=a1b2c3] Failed to process order cause:xxxx
ERROR [SVC=F][trace=a1b2c3] Failed to process order cause:xxxx
ERROR [SVC=G][trace=a1b2c3] Failed to process order cause:xxxx
ERROR [SVC=B][trace=a1b2c3] Failed to process order cause:DB timeout

结构化的日志才能有威力, 例如某个版本的代码, 或者某个数据中心挂了导致的metric下降, 总体来说体现不出来, 要具体的细化才能反应出来

Trace

尾声

  • 少侵入, 在业务发展的初期, 尽量在少侵入的条件下, 来完成。
  • 傻瓜化, 术业有专攻, 每个人的知识面不一样, 百分之80的人也许只用百分之20的基本功能, 这些基本功能让别人一次跑通。
  • 可视化, 将来的决策可能依赖于这些这些dashboard, 做决策的人可能是老板, 也可能是运维, 也可能是运营, 也可能是开发。

给Structured Streaming添加Sink的方式

背景:

在数据平台部门中, 当需求井喷了之后, 对每个需求做定制化的编码已经不现实了。这时候一般都需要与业务部门合作,这时最好有一个可视化的开发UI, 后端有个service层, 用户所有对数据处理的逻辑通过配置文件,或者纯SQL来表达。这时用户有看实时结果的需求来验证代码或者配置的正确性。

已有的方案及不足:

在其他博客有看到通过websocket来拿当前的spark日志, 包括spark运行时的日志, 这样就可以把streaming的sink改为console模式来获得一部分的日志。
不足之处在于, console模式获得的日志其实是dataframe.show()的结果, 对于前端的交互可视化会比较差。那可不可以做到运行时获得一部分数据并以类似json的方式传回来

源码

先看下console的实现方式,从start方法入手:

def start(): StreamingQuery = {
     ...
    if (source == "memory") {
    ...
    } else if (source == "foreach") {
    ...  
    } else {
      val ds = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
      val disabledSources = df.sparkSession.sqlContext.conf.disabledV2StreamingWriters.split(",")
      val sink = ds.newInstance() match {
        case w: StreamWriteSupport if !disabledSources.contains(w.getClass.getCanonicalName) => w
        case _ =>
          val ds = DataSource(
            df.sparkSession,
            className = source,
            options = extraOptions.toMap,
            partitionColumns = normalizedParCols.getOrElse(Nil))
          ds.createSink(outputMode)
      }
...

其中有个lookupDataSource方法,

       case sources =>
          // There are multiple registered aliases for the input. If there is single datasource
          // that has "org.apache.spark" package in the prefix, we use it considering it is an
          // internal datasource within Spark.
          val sourceNames = sources.map(_.getClass.getName)
          val internalSources = sources.filter(_.getClass.getName.startsWith("org.apache.spark"))
          if (internalSources.size == 1) {
            logWarning(s"Multiple sources found for $provider1 (${sourceNames.mkString(", ")}), " +
              s"defaulting to the internal datasource (${internalSources.head.getClass.getName}).")
            internalSources.head.getClass
          } else {
            throw new AnalysisException(s"Multiple sources found for $provider1 " +
              s"(${sourceNames.mkString(", ")}), please specify the fully qualified class name.")
          }

看来类名要以org.apache.spark开头才行, 按console的方式, 实现一下自己的Debug sink,

class DebugWriter(schema: StructType, options: DataSourceOptions) extends StreamWriter {

  assert(SparkSession.getActiveSession.isDefined)
  protected val spark = SparkSession.getActiveSession.get

  implicit val formats = org.json4s.DefaultFormats

  override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {

    printRows(messages, schema, s"Batch: $epochId")
  }

  override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}

  override def createWriterFactory(): DataWriterFactory[Row] = PackedRowWriterFactory

  protected def printRows(
                           commitMessages: Array[WriterCommitMessage],
                           schema: StructType,
                           printMessage: String): Unit = {
    val rows = commitMessages.collect {
      case PackedRowCommitMessage(rs) => rs
    }.flatten


    val sample = spark
      .createDataFrame(rows.take(10).toList.asJava, schema)
      .toJSON
      .collect()

    val message = Serialization.writePretty(Map("message" -> sample))

    // scalastyle:off println
    println("-------------------------------------------")
    println(message)
    println("-------------------------------------------")
    // scalastyle:off println
  }
}

checkpoint 的地址必须要指定, 因为源代码中可以看到useTempCheckpointLocation = source == "console", 随机一个/tmp下的目录即可
这样json格式的debug用的部分dataframe就可以打印出来了, 我们可以上传到redis或者kafka用于前端画图, 方便用户debug, 查看列类型等等。

topK问题

topK问题

静态数据

如果是静态数据, 例如要算一个List的topK, 最简单的方法就是用优先队列, 这里用PriorityQueue代替, 如果是topK大, 那就是最小堆来解决

for (int i: list) {
    if (queue.size()  < k) {
        queue.offer(i);
    } else {
        if (i > queue.peek()) {
            queue.poll();
            queue.offer(i);
       }
    }
}

动态数据

如果问题变成了, 一个动态的字符串流, 需要计算实时的当前word count的topK,该怎么实现呢?

最简单的方案(基于内存)

实现

string stream --> hashmap --> priorityqueue

结论

受限于单机内存
没有持久化

基于数据库

实现

对于每一个word, 都有一个对应的count, 当接到一个word时, 相应的count自增

结论

对于QPS不高的应用, 足够了

数据库方案升级

Master --> hash --> Slave, 将word count表进行水平拆分, 对于client想获得topK时, 要merge各个分表的topK

结论

QPS取决于各个分表的读写QPS

HashMap的升级方案

针对于单机HashMap方案受限于单机内存, 可进行如下的改进:
Client --> put word: Master --> hash --> slave --> update slave local hashmap
Client --> get topK: Merge every local hashmap to get topK

结论

针对于单机hashmap不能扩容的问题, 改进成根据hash%(N台slave)的方式。
仍然有容错的问题

HashMap方案的容错

一般实时的topK问题, 是没有强一致的需求的,并且K一般都是可变的,有可能是3, 有可能是5, 这时可以有一些改进。
首先针对K可变的问题, 将hashmap + priorityqueue 方案换成treemap实现即可。
针对容错的问题, 可以将内存的实现替换成cache --> async task --> storage.

结论

针对于没有强一致的需求, 可定期将内存数据同步到持久化存储, 这个思路跟接下来的checkpoint类似。

容错

如果遇到slave下线的情况, 这里有两个思路

  1. 一致性hash, 这样就要求有一个强一致的meta data服务来存储各个slave的状态, 这里就需要slave的数据是全局共享的或者是多replica的。
  2. 周期性的checkpoint, 全局共享的数据存储, 如果是Flink方式来实现, 这是一个可行的思路。
    data --> key by --> task manager --> update local hashmap --> update local priorityqueue --> sink the priorityqueue data

结论

这里讨论了两个容错的方案, 从实现的角度难度来说, 利用现有开源方案是更好的选择, 例如方案

follow up

如果是大部分的word, 都只出现1次或者很少, 要怎么优化
可以通过Bloom Filter先过滤一部分, 这样会有一部分误判的概率, 不过不影响最终结果。

带有ttl功能的HashMap

如何设计一个带有ttl功能的HashMap

背景

如何设计一个带有ttl功能的HashMap, 例如一个ttl为5s的HashMap, 当put(key, value) 之后, 5秒内, get(key)为value, 5秒后,get(key) 为空, 其中ttl的时间不是那个严格, 主要是为了限制HashMap所用的内存不是无限增长。

思路

按照环形时间轮的思路, 将一个HashMap,分成12个HashMap, curBucket指向当前的桶。
当put元素时, 根据当前的seconds%12, 将元素插入到指定的HashMap当中,同时更新当前的curBucket。
当get元素时, 查找curBucket及其之前的一个bucket。
在此基础上, 异步删除过期数据(不属于curBucket和之前一个bucket的数据)

代码

import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class TtlMap<K, V> {
    private final int N; // count of bucket
    private final int ttl;
    private volatile int curBucket = -1;
    private volatile int prevBucket = -1;
    private final List<Map<K, V>> maps;
    private final ScheduledExecutorService cleanService;
    public TtlMap(int ttl) {
        this.N = 60 / ttl;
        this.ttl = ttl;
        this.maps = new ArrayList<>(this.N);
        for (int i = 0; i < N; i++) {
            maps.add(i, null);
        }
        cleanService = Executors.newSingleThreadScheduledExecutor();
        cleanService.scheduleAtFixedRate(
                () -> cleanOldData()
                , 0, ttl, TimeUnit.SECONDS);
    }

    public void put(K key, V value) {
        long seconds = System.currentTimeMillis()/1000;
        int bucket = (int) seconds % N;
        if (bucket != curBucket) {
            // move forward the curBucket
            curBucket = bucket;
            prevBucket = getPrev(curBucket);
        }
        if (maps.get(curBucket) == null) {
            maps.add(curBucket, new HashMap<>());
        }
        maps.get(curBucket).put(key, value);
    }

    public V get(K key) {
        if (curBucket == -1 || prevBucket == -1) return null;
        if (maps.get(curBucket) != null && maps.get(curBucket).containsKey(key)) {
            return maps.get(curBucket).get(key);
        }
        if (maps.get(prevBucket) != null && maps.get(prevBucket).containsKey(key)) {
            return maps.get(prevBucket).get(key);
        }
        return null;
    }

    private int getPrev(int i) {
        return i != 0? i - 1: N - 1;
    }

    private void cleanOldData() {
        for (int i = 0; i < N; i++) {
            // bypass the current and prev bucket
            if (i == curBucket || i == prevBucket) continue;
            this.maps.set(i, null);
        }
    }

    public void stop() {
        cleanService.shutdown();
        for (int i = 0; i < N; i++) {
            // help GC
            maps.set(i, null);
        }
    }

    public static void main(String[] args) throws Exception {
        TtlMap<String, String> map = new TtlMap<>(5);
        map.put("key1", "value1");
        map.put("key2", "value2");
        System.out.println("0 seconds, get the key1 and key2");
        System.out.println("key1 --> " + map.get("key1") + "\tkey2--> " + map.get("key2"));
        Thread.sleep(5000);
        System.out.println("5 seconds, update key2");
        map.put("key2", "value2");
        Thread.sleep(2000);
        System.out.println("7 seconds, get the key1 and key2");
        System.out.println("key1 --> " + map.get("key1") + "\tkey2--> " + map.get("key2"));
        map.put("key3", "value3");
        Thread.sleep(5000);
        System.out.println("12 seconds");
        System.out.println("key1 --> " + map.get("key1") + "\tkey2--> " + map.get("key2"));
        map.stop();
    }
}

结果

0 seconds, get the key1 and key2
key1 --> value1	key2--> value2
5 seconds, update key2
7 seconds, get the key1 and key2
key1 --> null	key2--> value2
12 seconds
key1 --> null	key2--> null

用循环不变式证明一个Spark处理数据的正确性问题

背景:

日志有相应的timestamp_date和arrive_date, timestamp_date表示该记录发生在哪一天, arrive_date表示日志到达日期, 目前每天的到达记录统一放在一张表, 以日期结尾, 例如, XXX_2018-07-01的表中包含了所有arrive_date是2018-07-01的日期, 但因为数据延迟的原因, 其中包含了很多timestamp是其他天的数据,例如2018-06-XX的数据。 我们处理时间用process_date来表示, 目前数据是T+1的, 即今天处理昨天的全量数据。
处理的日志需要做些聚合操作, 举例,{“timestamp”:”2018-06-30”, “name”:”db”,”age”:29, “category”:”Genius”, “arrive_date”:”2018-07-01"} 这样的一条日志, 会落在XXX_2018-07-01的表中, 如果希望统计的是对category进行聚合操作得到avg(age), 我们希望得到的相应地聚合结果的primary key包含timestamp, arrive_date和category, 增量的落到DB当中。 当数据延迟出现时, 也不会对记录造成影响, 会增量的补数据进去。
那怎么证明这样做是对的呢? 用循环不变式来证明一下。证明这样做是没错的, 即证明每条日志都不会对其他同类别的日志产生影响, 每当同类别的其他日志出现的时候, 该日志都会参与计算, 并覆盖最终的结果。

证明开始:

初始化:

针对任一条日志, 当处理系统(例如Spark), 未读取该条记录的时候(若实时读取, 有可能该记录还未写入), 该条记录不会影响现有结果(会缺数据, 但不会导致错误)。

保持:

当该条日志到达时, 该条日志与其他同类别日志都参与了计算,因为相同的arrive_date的数据都在一张表中,当process_date = arrive_date + 1时, 该表是完整的, 并覆盖了最终结果(靠primary key或者rowkey保证)。

终止:

当process_date > arrive_date + 1 时, 即数据到达的时间的数据表已经处理过了,新到的数据的arrive_date > 该条日志的arrive_date, 即跟该条日志同类别的数据不会再到达。

线上ES写入超时问题排查

线上ES写入超时问题排查

背景

先说下场景, 我们有实时数据需要用Structured Streaming 写入ElasticSearch, 写入峰值速度大概10,000条/s,某晚开始, 出现大量Request timeout报警。

使用方式

  • bulk actions 3000
  • bulk size 15MB
  • bulk concurrent request 0
  • BackoffPolicy.exponentialBackoff

排查

  1. 首先通过Spark UI,观察到当前批次, 已经运行了9个小时, 接着看到堆栈卡在了获得锁这里
        class Flush implements Runnable {
    
            @Override
            public void run() {
                synchronized (BulkProcessor.this) {
                    if (closed) {
                        return;
                    }
                    if (bulkRequest.numberOfActions() == 0) {
                        return;
                    }
                    execute();
                }
            }
        }
  1. 第一反应应该是死锁问题, 先查找下可能的race condition在哪里? ES的BulkProcessor里面的synchronized方法如下:
    awaitClose, add, flush, awaitClose方法我们是在ForeachSink的close方法中调用的, 所以有竞态的应该只有add方法。
  2. add方法中, 最终条用的是BulkRequestHandler中的execute方法
        public void execute(BulkRequest bulkRequest, long executionId) {
            Runnable toRelease = () -> {};
            boolean bulkRequestSetupSuccessful = false;
            try {
                listener.beforeBulk(executionId, bulkRequest);
                semaphore.acquire();
                toRelease = semaphore::release;
                CountDownLatch latch = new CountDownLatch(1);
                retry.withBackoff(consumer, bulkRequest, new ActionListener<BulkResponse>() {
                    @Override
                    public void onResponse(BulkResponse response) {
                        try {
                            listener.afterBulk(executionId, bulkRequest, response);
                        } finally {
                            semaphore.release();
                            latch.countDown();
                        }
                    }
    
                    @Override
                    public void onFailure(Exception e) {
                        try {
                            listener.afterBulk(executionId, bulkRequest, e);
                        } finally {
                            semaphore.release();
                            latch.countDown();
                        }
                    }
                });
                bulkRequestSetupSuccessful = true;
                if (concurrentRequests == 0) {
                    latch.await();
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                logger.info(() -> new ParameterizedMessage("Bulk request {} has been cancelled.", executionId), e);
                listener.afterBulk(executionId, bulkRequest, e);
            } catch (Exception e) {
                logger.warn(() -> new ParameterizedMessage("Failed to execute bulk request {}.", executionId), e);
                listener.afterBulk(executionId, bulkRequest, e);
            } finally {
                if (bulkRequestSetupSuccessful == false) {  // if we fail on client.bulk() release the semaphore
                    toRelease.run();
                }
            }
        }

其中的latch.await() 无超时的阻塞, 初步怀疑是这里导致的, 某种原因, onResponse 和 onFailure都没有回调。
可以看到执行写入操作的是Retry这个类, 其中的execute方法是在当前线程运行的, retry方法是在scheduler中执行的,scheduler是线程为1 的线程池,
execute方法

            public void execute(BulkRequest bulkRequest) {
                this.currentBulkRequest = bulkRequest;
                consumer.accept(bulkRequest, this);
            }
			
		private void retry(BulkRequest bulkRequestForRetry) {
            assert backoff.hasNext();
            TimeValue next = backoff.next();
            logger.trace("Retry of bulk request scheduled in {} ms.", next.millis());
            Runnable command = scheduler.preserveContext(() -> this.execute(bulkRequestForRetry));
            retryCancellable = scheduler.schedule(command, next, ThreadPool.Names.SAME);
        }
  1. 也就是说当每条数据需要写入到ES的时候, 检查当前满足写入条件(大小, 条数)--> 当前线程执行写入操作 --> 如果出错并且配置了retry policy的话 --> 由core size为1的scheduler来执行重试操作

  2. 接下来看Flush操作, Flush 也是由scheduler来执行的

  3. race condition
    client线程 --- add -- 当前线程尝试写入-- 失败 -- 由scheduler执行retry操作, 自己阻塞在countdownlatch的await方法
    时间到达fixed rate的时间, Flush在scheduler开始运行, 但是没有获得对象锁, 等待add方法释放锁。死锁发生。

  4. 条件发生比较苛刻, 我们发生的时候也是由于ES 频繁GC导致了大量的超时和重试。

  5. 也有人已经发现了这个bug, es的issue 47599 和 44556

  6. 按照社区的建议, 关掉了retry policy

BackPressure的常见场景和应对

常见的场景

生产者和消费者的速度不匹配, 这时候就需要回压机制(也有翻译成背压)
Producer —> Queue —> Consumer
100event/s vs 50event/s

常见的解决方案

  • 控制, 从源头减少流量, 或者扩容consumer来增加处理速率, 或者交换双方的状态信息。
  • 缓冲, 用Queue将不匹配的流量匹配。
  • 丢弃, 当consumer过载时, 直接丢弃或者自定义策略。

Trade Off

如果说想做一个回压系统, 自然对producer和consumer的逻辑侵入越少越好。首先能想到的就是利用缓冲, 这样producer和consumer的逻辑都不会进行任何变动。
当设计缓冲时, 单机的缓冲可以是简单的内存队列或者分布式的kafka, 对于缓冲, 最理想的肯定是无限的缓冲, 但是一旦缓冲是无限的, 并且长期生产者速率大于消费者, 这样的话, 部分消息永远都不会消费, 失去了无限缓冲的意义, 同时由于有限的IO, DISK资源, 无限的缓冲也是不可能的, 所以缓冲也是有限的, 最终还是要做相应的控制和丢弃策略。

常见方案

我所知道的几个常见的大数据系统在回压时所用的不同的解决方案

控制

  • 内存队列可以通过比较简单的Condition或者信号量, 做一些基本的控制策略, 做到生产者消费者交换信息。
  • TCP 拥塞和窗口机制, 接收端返回当前的WIN大小, 当发送端速度过快时, 由于内核缓冲无法将数据发送到socket的buffer, 因此send函数被阻塞, 这样实现了回压。
  • Flink的回压机制跟TCP的回压很像, 各个operator都有相应的input gate buffer, 当消费者速度过慢时,生产者的消息阻塞在写入下游算子的buffer那里, 最终阻塞在Kafka connector的poll方法上。

缓冲

  • Kafka消息队列, 通过多partition文件, 利用大的硬盘资源可以做到接近无限的缓冲, 在接近耗尽容量的时候, 通过过期策略, 来控制资源。

丢弃

  • 线程池提交时, 有相应的提交队列和拒绝策略。
  • 有些服务降级时, 直接返回默认界面, 也算是一种丢弃策略。

拉模式

  • Kafka消费者是拉模式, 但为了防止生产者消费者lag越来越大, 一定要有监控消费延迟。
  • Spark微批处理也是一种拉模式, 在默认情况下, 会拉取所有最新数据,虽然Spark的流处理也有回压处理, 但更多的目的是保护下游系统, 不在讨论范围。

尾声

在实际生产中可以参考以上成熟系统的方式来定义自己的回压处理。

实时平台构建

近期完成了公司实时作业平台的构建, 把一些经验先记录一下。

选型

数据采集

Server 端采用的是Flume 和 Logstash
Native端采用的自研的一个go service

数据队列

公认的Kafka

数据处理

鉴于公司对于Spark技术栈比较熟, 批处理也有相应的积累, 所以大部分常用的场景采用的都是Structured streaming, 一小部分需要session window 或者 CEP的业务采用Flink完成。

数据存储

Kafka, Phonenix, Ignite, Oracle

主要讲下基于Structured streaming的服务

背景

目前日志量大概每天几T, 大概可以分为用量日志, 质量日志, 行为日志等等。 需要用这些日志来进行实时统计, 告警。 需要流-流join, 流-维度表join的场景。

架构

大概使用了 Akka + Zookeeper + Redis + Spark + Spring Boot.
每个Actor都将自己的地址向Zookeeper注册,用户提交job配置的时候, 会通过一个query balancer 来找到自己合适的spark session来执行任务, 用户停止job或者查看job状态的时候也会通过query balancer。
这里面还是有不少坑的, 例如

  1. 都是实时的job, 如果job抛异常了, 外部怎么感知?(最暴力的方法, try-catch全部包起来, 一般抛异常, 向外部存储写状态和异常信息)
  2. 如果是机器突然断电, 死机, 来不急写外部存储就挂了?(更暴力, 每个StreamingQuery都开一个定时任务, 每隔5秒写一个ttl为10秒的Redis key)
  3. Structured streaming 都是实时的job, 怎么拿到每个批次的输出, 像console一样?(可以看我另一个issure #2 , 其中是写到Redis里, 按BatchId 来存储的sorted set)
  4. 怎么kill掉job?(如果Actor当前正在执行任务, 就从当前的zookeeper server list移除自己, kill的请求会通过其他的Actor来kill掉streamingQuery, ps. job提交成功之后, job与运行该job的actor group的映射关系会写到外部存储, actor group信息也会在zookeeper中体现)
  5. job之间怎么防止互相影响?(暴力, 当前job name是唯一的, 里面创建的所有的sql table view都有自己job name的前缀)
  6. 如果spark session都yarn干掉了, 怎么释放掉其他同组的Actor?(每个Actor都有定时器, 来看sparkContext是不是挂了, 挂了的话, 把自己干掉)
  7. 有没有单点?(大部分没有, Zookeeper 三台, Redis 三台哨兵模式: RDB + AOF, RDB每小时一份, 保存两天, 每天一份保存一周, 一天一次上传HDFS, Spring Boot倒是单点, 不过也没多大请求, 目前没多大事)
  8. 怎么扩容? (目前只能手动spark2-submit, 不需要改动任何东西, 当然里面的配置要改一下, 动态调整executor数量)
  9. 可以自己写代码吗, 不想用网页版的?(也可以, 单机版和网页版底层用的StreamingContext都是一样的。
  10. 怎么控制job的数量?(控制Actor的并发量, 以及zookeeper的上下线)

等等, 其他的也说不明白了, 只能说坑不少。做完是费了很多劲的。

感谢

谢谢我自己, 之前也做过类似批处理的, 所以这次做实时的第一版大概一周就写好了, 结果发现,实时的要比批的难多了。。。改了2周多。。。半个月前完成的差不多了。前端还写了一周多, 写不明白了, 放弃了React, 找前端同事写了。

后记

已经上线了。
下次把做批处理的和Flink的也记录下。

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.