hamawhitegg / flink-sql-lineage Goto Github PK
View Code? Open in Web Editor NEWThe Lineage Analysis system for FlinkSQL supports advanced syntax such as Watermark, UDTF, CEP, Windowing TVFs, and CTAS.
License: Apache License 2.0
The Lineage Analysis system for FlinkSQL supports advanced syntax such as Watermark, UDTF, CEP, Windowing TVFs, and CTAS.
License: Apache License 2.0
有一些任务业务逻辑服务,需要用flink java 编写任务,代码可能存在sql 访问hive 表,或者通过api 方便访问表。
最好将PROCTIME()类型的字段放到最后,不要放到第一个
here is my sql script
-- source1
DROP TABLE IF EXISTS `dwd_status_log`;
CREATE TABLE `dwd_status_log`
(
mechine STRING,
timenow STRING,
`value` STRING,
pre_value STRING,
PRIMARY KEY (timenow) NOT ENFORCED
)
WITH (
'connector' = 'kafka',
'topic' = 'dwd_status_log',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'debezium-json'
);
-- source2
DROP TABLE IF EXISTS dim_equipment_info;
CREATE TABLE IF NOT EXISTS dim_equipment_info (
id STRING
,station_type STRING
,equipment_no STRING
,emp_name STRING
,emp_phone STRING
,vendor STRING
,site STRING
,`floor` STRING
,line STRING
,model STRING
,stage STRING
,id_value STRING
,emp_info STRING
,PRIMARY KEY ( id ) NOT ENFORCED
)
WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:4000/data_warehouse_p_10046?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&nullCatalogMeansCurrent=true',
'table-name' = 'dim_equipment_info',
'username' = 'root',
'password' = 'xxx'
);
-- sink
DROP TABLE IF EXISTS dws_alarm_status_count;
CREATE TABLE IF NOT EXISTS dws_alarm_status_count (
mechine_id STRING
,mechine_name STRING
,trigger_date STRING
,alarm_status_count BIGINT
,PRIMARY KEY ( mechine_id,trigger_date ) NOT ENFORCED
)
WITH (
'connector' = 'kafka',
'topic' = 'dws_alarm_status_count',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'debezium-json'
);
-- trans
INSERT INTO
dws_alarm_status_count
SELECT
dsl.mechine AS mechine_id,
dei.equipment_no AS mechine_name,
DATE_FORMAT(dsl.timenow, 'yyyy-MM-dd') AS `trigger_date`,
COUNT(*) AS alarm_status_count
FROM
dwd_status_log dsl
LEFT JOIN
dim_equipment_info dei
ON
dsl.mechine = dei.id
WHERE
dsl.`value` = '4'
GROUP BY
DATE_FORMAT(dsl.timenow, 'yyyy-MM-dd'), dsl.mechine,dei.equipment_no;
当输入以下 FlinkSQL 时:
`
CREATE TABLE demo_log_01 (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1'
);
CREATE TABLE demo_log_02 (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1'
);
CREATE TABLE demo_log_05 (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING
) WITH (
'connector' = 'print'
);
insert into demo_log_05
select a.user_id,b.item_id,b.behavior,a.dt,a.hh from
(select user_id,item_id,behavior,dt,hh from demo_log_01 where dt='2023-17-02') a
left join
(select user_id,item_id,behavior,dt,hh from demo_log_02) b
on a.user_id = b.user_id
`
minimal steps to reproduce:
sh sbin/start-docker-compose.sh
output
INFO] /Users/haojiangfeng/codes/chainbase/cornflake-deployment/flink-sql-lineage/lineage-flink1.14.x/src/main/java:-1: info: compiling
[INFO] Compiling 5 source files to /Users/haojiangfeng/codes/chainbase/cornflake-deployment/flink-sql-lineage/lineage-flink1.14.x/target/classes at 1694156256794
[ERROR] error: scala.reflect.internal.MissingRequirementError: object java.lang.Object in compiler mirror not found.
[ERROR] at scala.reflect.internal.MissingRequirementError$.signal(MissingRequirementError.scala:17)
[ERROR] at scala.reflect.internal.MissingRequirementError$.notFound(MissingRequirementError.scala:18)
[ERROR] at scala.reflect.internal.Mirrors$RootsBase.$anonfun$getModuleOrClass$4(Mirrors.scala:54)
[ERROR] at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:54)
[ERROR] at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:45)
[ERROR] at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:45)
[ERROR] at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:66)
[ERROR] at scala.reflect.internal.Mirrors$RootsBase.getClassByName(Mirrors.scala:102)
[ERROR] at scala.reflect.internal.Mirrors$RootsBase.getRequiredClass(Mirrors.scala:105)
[ERROR] at scala.reflect.internal.Definitions$DefinitionsClass.ObjectClass$lzycompute(Definitions.scala:267)
[ERROR] at scala.reflect.internal.Definitions$DefinitionsClass.ObjectClass(Definitions.scala:267)
[ERROR] at scala.reflect.internal.Definitions$DefinitionsClass.init(Definitions.scala:1447)
[ERROR] at scala.tools.nsc.Global$Run.<init>(Global.scala:1126)
[ERROR] at scala.tools.nsc.MainClass.doCompile(Main.scala:24)
[ERROR] at scala.tools.nsc.Driver.process(Driver.scala:55)
[ERROR] at scala.tools.nsc.Driver.main(Driver.scala:68)
[ERROR] at scala.tools.nsc.Main.main(Main.scala)
[ERROR] at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
[ERROR] at java.base/java.lang.reflect.Method.invoke(Method.java:578)
[ERROR] at scala_maven_executions.MainHelper.runMain(MainHelper.java:164)
[ERROR] at scala_maven_executions.MainWithArgsInFile.main(MainWithArgsInFile.java:26)
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary for flink-sql-lineage 1.0.0:
[INFO]
[INFO] flink-sql-lineage .................................. SUCCESS [ 0.838 s]
[INFO] lineage-common ..................................... SUCCESS [ 2.371 s]
[INFO] lineage-loader ..................................... SUCCESS [ 1.567 s]
[INFO] lineage-flink1.14.x ................................ FAILURE [ 9.781 s]
[INFO] lineage-flink1.16.x ................................ SKIPPED
[INFO] lineage-client ..................................... SKIPPED
[INFO] lineage-web ........................................ SKIPPED
[INFO] lineage-server ..................................... SKIPPED
[INFO] lineage-server-domain .............................. SKIPPED
[INFO] lineage-server-infrastructure ...................... SKIPPED
[INFO] lineage-server-application ......................... SKIPPED
[INFO] lineage-server-interfaces .......................... SKIPPED
[INFO] lineage-server-start ............................... SKIPPED
[INFO] lineage-dist ....................................... SKIPPED
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 14.733 s
[INFO] Finished at: 2023-09-08T14:57:37+08:00
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal net.alchim31.maven:scala-maven-plugin:3.3.1:compile (scala-compile-first) on project lineage-flink1.14.x: wrap: org.apache.commons.exec.ExecuteException: Process exited with an error: 1 (Exit value: 1) -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException
[ERROR]
[ERROR] After correcting the problems, you can resume the build with the command
[ERROR] mvn <args> -rf :lineage-flink1.14.x
Finish maven package
Build, create, and start containers with Docker Compose
[+] Running 1/1
! lineage-server Warning 5.5s
[+] Building 1.7s (6/8) docker:orbstack
=> [lineage-server internal] load build definition from Dockerfile 0.0s
=> => transferring dockerfile: 484B 0.0s
=> [lineage-server internal] load .dockerignore 0.0s
=> => transferring context: 2B 0.0s
=> [lineage-server internal] load metadata for docker.io/library/openjdk:8-jdk-alpine 1.5s
=> [lineage-server internal] load build context 0.0s
=> => transferring context: 2B 0.0s
=> CANCELED [lineage-server 1/4] FROM docker.io/library/openjdk:8-jdk-alpine@sha256:94792824df2df33402f201713f932b58cb9de94a0cd524164a0f2283343547b3 0.1s
=> => resolve docker.io/library/openjdk:8-jdk-alpine@sha256:94792824df2df33402f201713f932b58cb9de94a0cd524164a0f2283343547b3 0.0s
=> => sha256:94792824df2df33402f201713f932b58cb9de94a0cd524164a0f2283343547b3 1.64kB / 1.64kB 0.0s
=> => sha256:ad0c0c268e337200ace4256a6c40ea7eb01c467bde519aae8a6dc8c6ac103d53 947B / 947B 0.0s
=> => sha256:e4105db9d4690c236b378feec3c07e3dbcc9efbd7e4e51d0a5df9a3b01b9e372 3.40kB / 3.40kB 0.0s
=> ERROR [lineage-server 2/4] COPY lineage-dist/dist/flink-sql-lineage-1.0.0.tgz /opt/workspace/ 0.0s
------
> [lineage-server 2/4] COPY lineage-dist/dist/flink-sql-lineage-1.0.0.tgz /opt/workspace/:
------
failed to solve: failed to compute cache key: failed to calculate checksum of ref 25e18e2d-1858-4647-846d-49be8142f7f5::9lhxl66okxasrzr93bi5l5z1y: failed to walk /var/lib/docker/tmp/buildkit-mount572364169/lineage-dist/dist: lstat /var/lib/docker/tmp/buildkit-mount572364169/lineage-dist/dist: no such file or directory
After I removed the dependency of flink-1.14.x, it still showed 'compiled failed'.
The docker compose process failed because of the missing tgz
file.
complete logs:
Start maven packaging
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------< com.hw.lineage:flink-sql-lineage >------------------
[INFO] Building flink-sql-lineage 1.0.0
[INFO] from pom.xml
[INFO] --------------------------------[ pom ]---------------------------------
[INFO]
[INFO] --- clean:3.1.0:clean (default-clean) @ flink-sql-lineage ---
[INFO]
[INFO] --- spotless:2.27.1:check (default) @ flink-sql-lineage ---
[INFO] Sorting file /var/folders/2k/ftdpqjh92g31qr9cwddx6cvr0000gn/T/pom3976339903314232846.xml
[INFO] Pom file is already sorted, exiting
[INFO]
[INFO] --- source:3.2.1:jar-no-fork (attach-sources) @ flink-sql-lineage ---
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 1.067 s
[INFO] Finished at: 2023-09-08T16:11:36+08:00
[INFO] ------------------------------------------------------------------------
Finish maven package
Build, create, and start containers with Docker Compose
[+] Running 1/1
! lineage-server Warning 6.2s
[+] Building 3.3s (6/8) docker:orbstack
=> [lineage-server internal] load build definition from Dockerfile 0.0s
=> => transferring dockerfile: 484B 0.0s
=> [lineage-server internal] load .dockerignore 0.0s
=> => transferring context: 2B 0.0s
=> [lineage-server internal] load metadata for docker.io/library/openjdk:8-jdk-alpine 3.2s
=> [lineage-server internal] load build context 0.0s
=> => transferring context: 2B 0.0s
=> CANCELED [lineage-server 1/4] FROM docker.io/library/openjdk:8-jdk-alpine@sha256:94792824df2df33402f201713f932b58cb9de94a0cd524164a0f2283343547b3 0.1s
=> => resolve docker.io/library/openjdk:8-jdk-alpine@sha256:94792824df2df33402f201713f932b58cb9de94a0cd524164a0f2283343547b3 0.0s
=> => sha256:94792824df2df33402f201713f932b58cb9de94a0cd524164a0f2283343547b3 1.64kB / 1.64kB 0.0s
=> => sha256:ad0c0c268e337200ace4256a6c40ea7eb01c467bde519aae8a6dc8c6ac103d53 947B / 947B 0.0s
=> => sha256:e4105db9d4690c236b378feec3c07e3dbcc9efbd7e4e51d0a5df9a3b01b9e372 3.40kB / 3.40kB 0.0s
=> ERROR [lineage-server 2/4] COPY lineage-dist/dist/flink-sql-lineage-1.0.0.tgz /opt/workspace/ 0.0s
------
> [lineage-server 2/4] COPY lineage-dist/dist/flink-sql-lineage-1.0.0.tgz /opt/workspace/:
------
failed to solve: failed to compute cache key: failed to calculate checksum of ref 25e18e2d-1858-4647-846d-49be8142f7f5::tz9r05h1yopyoygg7hfz8c6iu: failed to walk /var/lib/docker/tmp/buildkit-mount1522522218/lineage-dist/dist: lstat /var/lib/docker/tmp/buildkit-mount1522522218/lineage-dist/dist: no such file or directory
Lists all container information
NAME IMAGE COMMAND SERVICE CREATED STATUS PORTS
Deployment completed
It takes tens of seconds for the service to start, please wait a moment.
Swagger API: http://127.0.0.1:8194/swagger-ui/index.html
Knife4j API: http://127.0.0.1:8194/doc.html
Quick Catalog API: http://127.0.0.1:8194/catalogs/1
Index Page: http://127.0.0.1:8194/index.html
在执行计划中,
FlinkLogicalTableFunctionScan
和
LogicalWatermarkAssigner
两种类型的RelNode无法解析获取到源字段
Is there a plan to support Flink1.16
本地运行了一下项目,使用了下基本功能,感觉还是很不错的,很厉害!请问 @HamaWhiteGG 有做任务间血缘的打算吗,我最近也在做flink实时平台的血缘工作可以一起探讨下,将 source、task、sink 之间的血缘展示出来。
Here is my test cep sql and ddl sql
CREATE TABLE `sink_table`(`agent_id` STRING, `room_id` STRING, `application_id` STRING, `type` STRING, `begin_time` BIGINT, `end_time` BIGINT) WITH(...)
CREATE TABLE `source_table`(`agent_id` STRING, `room_id` STRING, `create_time` BIGINT, `type` STRING, `application_id` STRING, `connect_time` BIGINT, `row_time` AS PROCTIME()) WITH(...)
insert into sink_table (agent_id,room_id,application_id,type,begin_time,end_time)
select agent_id,room_id,application_id,type,begin_time,end_time
from source_table
match_recognize (partition by agent_id,room_id,application_id order by row_time
measures AF.type as type,last(BF.create_time) as begin_time,last(AF.create_time) as end_time
one row per match
after match SKIP PAST LAST ROW
pattern (BF+ AF) WITHIN INTERVAL '1' HOUR
define BF as BF.type = 'assign',AF as AF.type = 'pick_up' ) as T
As the code logic of the RelMdColumnOrigins, the lineage result of my cep sql is wrong, such as sink_table application_id
column lineage
INSERT INTO
dwd_hudi_users
SELECT
length,
name,
word as company_name,
birthday,
ts,
DATE_FORMAT(birthday, 'yyyyMMdd')
FROM
ods_mysql_users,
LATERAL TABLE (my_split_udtf (name))
作者给的这个示例是可以解析出血缘关系;
如果sql更换为
INSERT INTO
dwd_hudi_users
SELECT
length,
column1 as name,
word as company_name,
birthday,
ts,
DATE_FORMAT(birthday, 'yyyyMMdd')
FROM
ods_mysql_users,
LATERAL TABLE (my_split_udtf (name)) as T (column1)
解析此sql是用2.0.0的RelMdColumnOrigins 会抛出 org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc cannot be cast to org.apache.calcite.rel.core.TableFunctionScan异常
使用main的RelMdColumnOrigins
mq.getColumnOrigins(rel.getRight(), iOutputColumn - nLeftColumns)
mappings 为null 并且 rel.getInputs().isEmpty() 返回set size 0
source column无法解析
目前flink社区对接atlas的进展: https://docs.google.com/document/d/1wSgzPdhcwt-SlNBBqL-Zb7g8fY6bN8JwHEg7GCdsBG8/edit#heading=h.mfdxvgyhozur
flink侧需要改动的poc验证: https://github.com/gyfora/atlas/blob/flink-bridge/addons/flink-bridge/src/main/java/org/apache/atlas/flink/hook/FlinkAtlasHook.java
前辈如果有兴趣的话可以参考下作者的实现,对flink的侵入性比较小. 也比较通用
现在atlas2.2.0这部分代码已经合并到主干了 flink这边还要做一些poc验证
@Slf4j
public class SimpleTest extends AbstractBasicTest {
@Before
public void createTable() {
createMysqlUser();
createOdsUser();
}
@Test
public void concat() {
String sql = "insert into mysql_user " +
"select id, birthday, concat(first_name, last_name) as full_name " +
"from ods_user";
List<Result> actualList = context.parseFieldLineage(sql);
log.info("Linage Result: ");
actualList.forEach(e -> log.info(e.toString()));
}
protected void createMysqlUser() {
context.execute("DROP TABLE IF EXISTS mysql_user ");
context.execute("CREATE TABLE IF NOT EXISTS mysql_user (" +
" id BIGINT ," +
" birthday TIMESTAMP(3) ," +
" full_name STRING " +
") WITH ( " +
" 'connector' = 'jdbc' ," +
" 'url' = 'jdbc:mysql://127.0.0.1:3306/demo?useSSL=false&characterEncoding=UTF-8'," +
" 'username' = 'root' ," +
" 'password' = 'xxx' ," +
" 'table-name'= 'mysql_user' " +
")"
);
}
protected void createOdsUser() {
context.execute("DROP TABLE IF EXISTS ods_user ");
context.execute("CREATE TABLE IF NOT EXISTS ods_user (" +
" id BIGINT ," +
" birthday TIMESTAMP(3) ," +
" first_name STRING ," +
" last_name STRING ," +
" company_name STRING " +
") WITH ( " +
" 'connector' = 'jdbc' ," +
" 'url' = 'jdbc:mysql://127.0.0.1:3306/demo?useSSL=false&characterEncoding=UTF-8'," +
" 'username' = 'root' ," +
" 'password' = 'xxx' ," +
" 'table-name'= 'ods_user' " +
")"
);
}
}
如果去掉 ods_user 的 company_name 字段就能解析成功。我这个写法是有问题吗。
使用作者的Test 添加 如下代码
@Test
public void testInsertSelectUUID() {
String sql = "INSERT INTO dwd_hudi_users (id, name, birthday) " +
"SELECT " +
" ROW_NUMBER() OVER (ORDER BY ts DESC) as id," +
" UUID() as name," +
" birthday " +
"FROM" +
" ods_mysql_users";
String[][] expectedArray = {
{"ods_mysql_users", "ts", "dwd_hudi_users", "id",
"ROW_NUMBER() OVER (ORDER BY ts DESC NULLS LAST)"},
{"ods_mysql_users", "name", "dwd_hudi_users", "name"},
{"ods_mysql_users", "ts", "dwd_hudi_users", "birthday"}
};
analyzeLineage(sql, expectedArray);
}
其中 {"ods_mysql_users", "ts", "dwd_hudi_users", "birthday"} 期望应该是 {"ods_mysql_users", "birthday", "dwd_hudi_users", "birthday"} name字段的sourceCol 是uuid函数 应该是没有列吧
[ERROR] Failed to execute goal com.github.eirslett:frontend-maven-plugin:1.8.0:yarn (yarn install) on project lineage-server-start: Failed to run task: 'yarn install' failed. org.apache.commons.exec.ExecuteException: Process exited with an error: 1 (Exit value: 1) -> [Help 1]
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.