springmoon / flink-rookie Goto Github PK
View Code? Open in Web Editor NEWFlink 菜鸟公众号代码地址
License: Apache License 2.0
Flink 菜鸟公众号代码地址
License: Apache License 2.0
大佬,在CustomDebeziumDeserializationSchema 里面的 parseRecord这个方法 里面的 枚举值 好像列举的不全吧? 假如我mysql端的数据类型(decimal、date、datetime)在这个枚举里面找不到,数据就会被丢弃,比如我一行数据有三列,假如有一列数据类型匹配不上,就会变成两列 。。。。 从而导入失败(ps:不知道我这样理解的对不对。。。)
代码中关于pv的统计如下
val input = env.addSource(new RadomFunction)
.map(s => {
val tmp = s.split(",")
(tmp(0), tmp(1))
})
.keyBy(0)
.window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))
.evictor(TimeEvictor.of(Time.seconds(0), true))
.process(new ProcessWindowFunction[(String, String), (String, String, Long), Tuple, TimeWindow] {
/*
这是使用state是因为,窗口默认只会在创建结束的时候触发一次计算,然后数据结果,
如果长时间的窗口,比如:一天的窗口,要是等到一天结束在输出结果,那还不如跑批。
所有大窗口会添加trigger,以一定的频率输出中间结果。
加evictor 是因为,每次trigger,触发计算是,窗口中的所有数据都会参与,所以数据会触发很多次,比较浪费,加evictor 驱逐已经计算过的数据,就不会重复计算了
驱逐了已经计算过的数据,导致窗口数据不完全,所以需要state 存储我们需要的中间结果
*/
var wordState: MapState[String, String] = _
var pvCount: ValueState[Long] = _
override def open(parameters: Configuration): Unit = {
// new MapStateDescriptor[String, String]("word", classOf[String], classOf[String])
wordState = getRuntimeContext.getMapState(new MapStateDescriptor[String, String]("word", classOf[String], classOf[String]))
pvCount = getRuntimeContext.getState[Long](new ValueStateDescriptor[Long]("pvCount", classOf[Long]))
}
override def process(key: Tuple, context: Context, elements: Iterable[(String, String)], out: Collector[(String, String, Long)]): Unit = {
var pv = 0;
val elementsIterator = elements.iterator
// 遍历窗口数据,获取唯一word
while (elementsIterator.hasNext) {
pv += 1
val word = elementsIterator.next()._2
wordState.put(word, null)
}
// add current
pvCount.update(pvCount.value() + pv)
var count: Long = 0
val wordIterator = wordState.keys().iterator()
while (wordIterator.hasNext) {
wordIterator.next()
count += 1
}
// uv
out.collect((key.getField(0), "uv", count))
out.collect(key.getField(0), "pv", pv)
}
})
关键代码块存在的问题
var pv = 0;//此处初始化为0
val elementsIterator = elements.iterator
// 遍历窗口数据,获取唯一word
while (elementsIterator.hasNext) { //循环中累加 当前数据的pv 此时pv的值为当前的pv 不包括状态中维护的
pv += 1
val word = elementsIterator.next()._2
wordState.put(word, null)
}
// add current
pvCount.update(pvCount.value() + pv) //此时更新pv状态,但是pv并没有加上状态中的值
改正
关键代码块存在的问题
var pv = 0;//此处初始化为0
val elementsIterator = elements.iterator
// 遍历窗口数据,获取唯一word
while (elementsIterator.hasNext) {
pv += 1
val word = elementsIterator.next()._2
wordState.put(word, null)
}
//当前pv+状态中的pv
pv+=pvCount.value()
// add current
pvCount.update(pv) //此时更新pv状态只需要用计算好的pv 即可
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.