Code Monkey home page Code Monkey logo

flink-rookie's People

Contributors

springmoon avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

flink-rookie's Issues

Flink Cdc MySQL 整库同步到 StarRocks

大佬,在CustomDebeziumDeserializationSchema 里面的 parseRecord这个方法 里面的 枚举值 好像列举的不全吧? 假如我mysql端的数据类型(decimal、date、datetime)在这个枚举里面找不到,数据就会被丢弃,比如我一行数据有三列,假如有一列数据类型匹配不上,就会变成两列 。。。。 从而导入失败(ps:不知道我这样理解的对不对。。。)

WordCountDistinct 中关于pv统计存在问题

代码中关于pv的统计如下

 val input = env.addSource(new RadomFunction)
      .map(s => {
        val tmp = s.split(",")
        (tmp(0), tmp(1))
      })
      .keyBy(0)
      .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
      .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))
      .evictor(TimeEvictor.of(Time.seconds(0), true))
      .process(new ProcessWindowFunction[(String, String), (String, String, Long), Tuple, TimeWindow] {
        /*
        这是使用state是因为,窗口默认只会在创建结束的时候触发一次计算,然后数据结果,
        如果长时间的窗口,比如:一天的窗口,要是等到一天结束在输出结果,那还不如跑批。
        所有大窗口会添加trigger,以一定的频率输出中间结果。
        加evictor 是因为,每次trigger,触发计算是,窗口中的所有数据都会参与,所以数据会触发很多次,比较浪费,加evictor 驱逐已经计算过的数据,就不会重复计算了
        驱逐了已经计算过的数据,导致窗口数据不完全,所以需要state 存储我们需要的中间结果
         */
        var wordState: MapState[String, String] = _
        var pvCount: ValueState[Long] = _

        override def open(parameters: Configuration): Unit = {
          // new MapStateDescriptor[String, String]("word", classOf[String], classOf[String])
          wordState = getRuntimeContext.getMapState(new MapStateDescriptor[String, String]("word", classOf[String], classOf[String]))
          pvCount = getRuntimeContext.getState[Long](new ValueStateDescriptor[Long]("pvCount", classOf[Long]))
        }

        override def process(key: Tuple, context: Context, elements: Iterable[(String, String)], out: Collector[(String, String, Long)]): Unit = {


          var pv = 0;
          val elementsIterator = elements.iterator
          // 遍历窗口数据,获取唯一word
          while (elementsIterator.hasNext) {
            pv += 1
            val word = elementsIterator.next()._2
            wordState.put(word, null)
          }
          // add current
          pvCount.update(pvCount.value() + pv)
          var count: Long = 0
          val wordIterator = wordState.keys().iterator()
          while (wordIterator.hasNext) {
            wordIterator.next()
            count += 1
          }
          // uv
          out.collect((key.getField(0), "uv", count))
          out.collect(key.getField(0), "pv", pv)

        }
      })

关键代码块存在的问题

          var pv = 0;//此处初始化为0
          val elementsIterator = elements.iterator
          // 遍历窗口数据,获取唯一word
          while (elementsIterator.hasNext) { //循环中累加 当前数据的pv  此时pv的值为当前的pv 不包括状态中维护的
            pv += 1
            val word = elementsIterator.next()._2
            wordState.put(word, null)
          }
          // add current
          pvCount.update(pvCount.value() + pv) //此时更新pv状态,但是pv并没有加上状态中的值

改正
关键代码块存在的问题

          var pv = 0;//此处初始化为0
          val elementsIterator = elements.iterator
          // 遍历窗口数据,获取唯一word
          while (elementsIterator.hasNext) { 
            pv += 1
            val word = elementsIterator.next()._2
            wordState.put(word, null)
          }
           //当前pv+状态中的pv
          pv+=pvCount.value()
          // add current
          pvCount.update(pv) //此时更新pv状态只需要用计算好的pv 即可

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.