hamawhitegg / flink-sql-lineage Goto Github PK
View Code? Open in Web Editor NEWThe Lineage Analysis system for FlinkSQL supports advanced syntax such as Watermark, UDTF, CEP, Windowing TVFs, and CTAS.
License: Apache License 2.0
The Lineage Analysis system for FlinkSQL supports advanced syntax such as Watermark, UDTF, CEP, Windowing TVFs, and CTAS.
License: Apache License 2.0
2024-06-19 11:09:09,971 INFO main com.hw.lineage.flink.LineageServiceImpl.executeSql:158 - Execute SQL: USE Flink16_memory.default
启动异常
Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: A catalog with name [Flink16_memory] does not exist.
最好将PROCTIME()类型的字段放到最后,不要放到第一个
使用作者的Test 添加 如下代码
@Test
public void testInsertSelectUUID() {
String sql = "INSERT INTO dwd_hudi_users (id, name, birthday) " +
"SELECT " +
" ROW_NUMBER() OVER (ORDER BY ts DESC) as id," +
" UUID() as name," +
" birthday " +
"FROM" +
" ods_mysql_users";
String[][] expectedArray = {
{"ods_mysql_users", "ts", "dwd_hudi_users", "id",
"ROW_NUMBER() OVER (ORDER BY ts DESC NULLS LAST)"},
{"ods_mysql_users", "name", "dwd_hudi_users", "name"},
{"ods_mysql_users", "ts", "dwd_hudi_users", "birthday"}
};
analyzeLineage(sql, expectedArray);
}
其中 {"ods_mysql_users", "ts", "dwd_hudi_users", "birthday"} 期望应该是 {"ods_mysql_users", "birthday", "dwd_hudi_users", "birthday"} name字段的sourceCol 是uuid函数 应该是没有列吧
INSERT INTO
dwd_hudi_users
SELECT
length,
name,
word as company_name,
birthday,
ts,
DATE_FORMAT(birthday, 'yyyyMMdd')
FROM
ods_mysql_users,
LATERAL TABLE (my_split_udtf (name))
作者给的这个示例是可以解析出血缘关系;
如果sql更换为
INSERT INTO
dwd_hudi_users
SELECT
length,
column1 as name,
word as company_name,
birthday,
ts,
DATE_FORMAT(birthday, 'yyyyMMdd')
FROM
ods_mysql_users,
LATERAL TABLE (my_split_udtf (name)) as T (column1)
解析此sql是用2.0.0的RelMdColumnOrigins 会抛出 org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc cannot be cast to org.apache.calcite.rel.core.TableFunctionScan异常
使用main的RelMdColumnOrigins
mq.getColumnOrigins(rel.getRight(), iOutputColumn - nLeftColumns)
mappings 为null 并且 rel.getInputs().isEmpty() 返回set size 0
source column无法解析
在执行计划中,
FlinkLogicalTableFunctionScan
和
LogicalWatermarkAssigner
两种类型的RelNode无法解析获取到源字段
[ERROR] Failed to execute goal com.github.eirslett:frontend-maven-plugin:1.8.0:yarn (yarn install) on project lineage-server-start: Failed to run task: 'yarn install' failed. org.apache.commons.exec.ExecuteException: Process exited with an error: 1 (Exit value: 1) -> [Help 1]
public class TestAggregateFunction extends AggregateFunction<String, TestAggregateFunction.TestAggregateAcc> {
public void accumulate(TestAggregateAcc acc, String param1, String param2, String param3) {
acc.test = param1 + param2 + param3;
}
@Override
public String getValue(TestAggregateAcc accumulator) {
return accumulator.test;
}
@Override
public TestAggregateAcc createAccumulator() {
return new TestAggregateAcc();
}
public static class TestAggregateAcc {
public String test;
}
}
create function test_aggregate as 'com.hw.lineage.flink.aggregatefunction.TestAggregateFunction'
INSERT INTO dwd_hudi_users SELECT id, name, test_aggregate(concat_ws('_', name, 'test'), name, 'test'), birthday, ts, DATE_FORMAT(birthday, 'yyyyMMdd') FROM ods_mysql_users group by id, name, birthday, ts
The transform of lineage result is null when parsing UDAF.
eq:
LineageResult(sourceCatalog=hive, sourceDatabase=default, sourceTable=ods_mysql_users, sourceColumn=name, targetCatalog=hive, targetDatabase=default, targetTable=dwd_hudi_users, targetColumn=company_name, transform=null)
Here is my test cep sql and ddl sql
CREATE TABLE `sink_table`(`agent_id` STRING, `room_id` STRING, `application_id` STRING, `type` STRING, `begin_time` BIGINT, `end_time` BIGINT) WITH(...)
CREATE TABLE `source_table`(`agent_id` STRING, `room_id` STRING, `create_time` BIGINT, `type` STRING, `application_id` STRING, `connect_time` BIGINT, `row_time` AS PROCTIME()) WITH(...)
insert into sink_table (agent_id,room_id,application_id,type,begin_time,end_time)
select agent_id,room_id,application_id,type,begin_time,end_time
from source_table
match_recognize (partition by agent_id,room_id,application_id order by row_time
measures AF.type as type,last(BF.create_time) as begin_time,last(AF.create_time) as end_time
one row per match
after match SKIP PAST LAST ROW
pattern (BF+ AF) WITHIN INTERVAL '1' HOUR
define BF as BF.type = 'assign',AF as AF.type = 'pick_up' ) as T
As the code logic of the RelMdColumnOrigins, the lineage result of my cep sql is wrong, such as sink_table application_id
column lineage
CREATE TABLE kafka_source
(
id
bigint,
data
ARRAY<row<c1 bigint,c2 string>>
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = 'hadoop101:9092',
'topic' = 't1',
'properties.group.id' = 'g1',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
CREATE TABLE sink_print
(
id bigint,
c1 bigint,
c2 string
) WITH (
'connector' = 'print'
);
insert into sink_print
SELECT id, t.c1, t.c2
FROM kafka_source
CROSS JOIN UNNEST(data) AS t(c1,c2)
;
有一些任务业务逻辑服务,需要用flink java 编写任务,代码可能存在sql 访问hive 表,或者通过api 方便访问表。
Is there a plan to support Flink1.16
本地运行了一下项目,使用了下基本功能,感觉还是很不错的,很厉害!请问 @HamaWhiteGG 有做任务间血缘的打算吗,我最近也在做flink实时平台的血缘工作可以一起探讨下,将 source、task、sink 之间的血缘展示出来。
public class TestAggregateFunction extends AggregateFunction<String, TestAggregateFunction.TestAggregateAcc> {
public void accumulate(TestAggregateAcc acc, String param1, String param2, String param3) {
acc.test = param1 + param2 + param3;
}
@Override
public String getValue(TestAggregateAcc accumulator) {
return accumulator.test;
}
@Override
public TestAggregateAcc createAccumulator() {
return new TestAggregateAcc();
}
public static class TestAggregateAcc {
public String test;
}
}
create function test_aggregate as 'com.hw.lineage.flink.aggregatefunction.TestAggregateFunction'
String sql = "INSERT INTO dwd_hudi_users " +
"SELECT " +
" id ," +
" name ," +
" test_aggregate(concat_ws('_', name, email), address, 'test')," +
" birthday ," +
" ts ," +
" DATE_FORMAT(birthday, 'yyyyMMdd') " +
"FROM" +
" ods_mysql_user_detail group by id, name, birthday, ts ";
context.execute("CREATE TABLE IF NOT EXISTS ods_mysql_user_detail (" +
" id BIGINT PRIMARY KEY NOT ENFORCED ," +
" name STRING ," +
" birthday TIMESTAMP(3) ," +
" ts TIMESTAMP(3) ," +
" email STRING ," +
" address STRING ," +
" proc_time as proctime() " +
") WITH ( " +
" 'connector' = 'mysql-cdc' ," +
" 'hostname' = '127.0.0.1' ," +
" 'port' = '3306' ," +
" 'username' = 'root' ," +
" 'password' = 'xxx' ," +
" 'server-time-zone' = 'Asia/Shanghai' ," +
" 'database-name' = 'demo' ," +
" 'table-name' = 'users' " +
")");
here is my sql script
-- source1
DROP TABLE IF EXISTS `dwd_status_log`;
CREATE TABLE `dwd_status_log`
(
mechine STRING,
timenow STRING,
`value` STRING,
pre_value STRING,
PRIMARY KEY (timenow) NOT ENFORCED
)
WITH (
'connector' = 'kafka',
'topic' = 'dwd_status_log',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'debezium-json'
);
-- source2
DROP TABLE IF EXISTS dim_equipment_info;
CREATE TABLE IF NOT EXISTS dim_equipment_info (
id STRING
,station_type STRING
,equipment_no STRING
,emp_name STRING
,emp_phone STRING
,vendor STRING
,site STRING
,`floor` STRING
,line STRING
,model STRING
,stage STRING
,id_value STRING
,emp_info STRING
,PRIMARY KEY ( id ) NOT ENFORCED
)
WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:4000/data_warehouse_p_10046?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&nullCatalogMeansCurrent=true',
'table-name' = 'dim_equipment_info',
'username' = 'root',
'password' = 'xxx'
);
-- sink
DROP TABLE IF EXISTS dws_alarm_status_count;
CREATE TABLE IF NOT EXISTS dws_alarm_status_count (
mechine_id STRING
,mechine_name STRING
,trigger_date STRING
,alarm_status_count BIGINT
,PRIMARY KEY ( mechine_id,trigger_date ) NOT ENFORCED
)
WITH (
'connector' = 'kafka',
'topic' = 'dws_alarm_status_count',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'debezium-json'
);
-- trans
INSERT INTO
dws_alarm_status_count
SELECT
dsl.mechine AS mechine_id,
dei.equipment_no AS mechine_name,
DATE_FORMAT(dsl.timenow, 'yyyy-MM-dd') AS `trigger_date`,
COUNT(*) AS alarm_status_count
FROM
dwd_status_log dsl
LEFT JOIN
dim_equipment_info dei
ON
dsl.mechine = dei.id
WHERE
dsl.`value` = '4'
GROUP BY
DATE_FORMAT(dsl.timenow, 'yyyy-MM-dd'), dsl.mechine,dei.equipment_no;
目前flink社区对接atlas的进展: https://docs.google.com/document/d/1wSgzPdhcwt-SlNBBqL-Zb7g8fY6bN8JwHEg7GCdsBG8/edit#heading=h.mfdxvgyhozur
flink侧需要改动的poc验证: https://github.com/gyfora/atlas/blob/flink-bridge/addons/flink-bridge/src/main/java/org/apache/atlas/flink/hook/FlinkAtlasHook.java
前辈如果有兴趣的话可以参考下作者的实现,对flink的侵入性比较小. 也比较通用
现在atlas2.2.0这部分代码已经合并到主干了 flink这边还要做一些poc验证
@Slf4j
public class SimpleTest extends AbstractBasicTest {
@Before
public void createTable() {
createMysqlUser();
createOdsUser();
}
@Test
public void concat() {
String sql = "insert into mysql_user " +
"select id, birthday, concat(first_name, last_name) as full_name " +
"from ods_user";
List<Result> actualList = context.parseFieldLineage(sql);
log.info("Linage Result: ");
actualList.forEach(e -> log.info(e.toString()));
}
protected void createMysqlUser() {
context.execute("DROP TABLE IF EXISTS mysql_user ");
context.execute("CREATE TABLE IF NOT EXISTS mysql_user (" +
" id BIGINT ," +
" birthday TIMESTAMP(3) ," +
" full_name STRING " +
") WITH ( " +
" 'connector' = 'jdbc' ," +
" 'url' = 'jdbc:mysql://127.0.0.1:3306/demo?useSSL=false&characterEncoding=UTF-8'," +
" 'username' = 'root' ," +
" 'password' = 'xxx' ," +
" 'table-name'= 'mysql_user' " +
")"
);
}
protected void createOdsUser() {
context.execute("DROP TABLE IF EXISTS ods_user ");
context.execute("CREATE TABLE IF NOT EXISTS ods_user (" +
" id BIGINT ," +
" birthday TIMESTAMP(3) ," +
" first_name STRING ," +
" last_name STRING ," +
" company_name STRING " +
") WITH ( " +
" 'connector' = 'jdbc' ," +
" 'url' = 'jdbc:mysql://127.0.0.1:3306/demo?useSSL=false&characterEncoding=UTF-8'," +
" 'username' = 'root' ," +
" 'password' = 'xxx' ," +
" 'table-name'= 'ods_user' " +
")"
);
}
}
如果去掉 ods_user 的 company_name 字段就能解析成功。我这个写法是有问题吗。
当输入以下 FlinkSQL 时:
`
CREATE TABLE demo_log_01 (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1'
);
CREATE TABLE demo_log_02 (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1'
);
CREATE TABLE demo_log_05 (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING
) WITH (
'connector' = 'print'
);
insert into demo_log_05
select a.user_id,b.item_id,b.behavior,a.dt,a.hh from
(select user_id,item_id,behavior,dt,hh from demo_log_01 where dt='2023-17-02') a
left join
(select user_id,item_id,behavior,dt,hh from demo_log_02) b
on a.user_id = b.user_id
`
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.