Code Monkey home page Code Monkey logo

flink-sql-lineage's People

Contributors

asteriachiu avatar dependabot[bot] avatar hamawhitegg avatar tonybobam avatar yangstar avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

flink-sql-lineage's Issues

flinksql lineage result missed one column

here is my sql script

-- source1
DROP TABLE IF EXISTS `dwd_status_log`;
CREATE TABLE `dwd_status_log`
(
    mechine STRING,
    timenow STRING,
    `value` STRING,
    pre_value STRING,
    PRIMARY KEY (timenow) NOT ENFORCED
)  
WITH (
    'connector' = 'kafka',
    'topic' = 'dwd_status_log',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'localhost:9092',
    'format' = 'debezium-json'
);

-- source2
DROP TABLE IF EXISTS dim_equipment_info;
CREATE TABLE IF NOT EXISTS dim_equipment_info (
    id STRING 
    ,station_type STRING 
    ,equipment_no STRING 
    ,emp_name STRING 
    ,emp_phone STRING 
    ,vendor STRING 
    ,site STRING 
    ,`floor` STRING 
    ,line STRING 
    ,model STRING 
    ,stage STRING 
    ,id_value STRING 
    ,emp_info STRING
    ,PRIMARY KEY ( id ) NOT ENFORCED
) 
WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:4000/data_warehouse_p_10046?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&nullCatalogMeansCurrent=true',
   'table-name' = 'dim_equipment_info',
   'username' = 'root',
   'password' = 'xxx'
);


-- sink
DROP TABLE IF EXISTS dws_alarm_status_count;
CREATE TABLE IF NOT EXISTS dws_alarm_status_count (
    mechine_id STRING 
    ,mechine_name STRING 
    ,trigger_date STRING 
    ,alarm_status_count BIGINT 
    ,PRIMARY KEY ( mechine_id,trigger_date ) NOT ENFORCED
) 
WITH (
    'connector' = 'kafka',
    'topic' = 'dws_alarm_status_count',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'localhost:9092',
    'format' = 'debezium-json'
);

-- trans
INSERT INTO 
    dws_alarm_status_count 
SELECT
    dsl.mechine AS mechine_id,
    dei.equipment_no AS mechine_name,
    DATE_FORMAT(dsl.timenow, 'yyyy-MM-dd') AS `trigger_date`,
    COUNT(*) AS alarm_status_count
FROM 
    dwd_status_log dsl 
LEFT JOIN 
    dim_equipment_info dei 
ON 
    dsl.mechine = dei.id 
WHERE 
    dsl.`value` = '4' 
GROUP BY 
DATE_FORMAT(dsl.timenow, 'yyyy-MM-dd'), dsl.mechine,dei.equipment_no;

it missed the last column lineage data.
1688983293972

当where中包含字段等于常量时,会把源字段优化成常量,导致丢失改字段的血缘关系

当输入以下 FlinkSQL 时:
`
CREATE TABLE demo_log_01 (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '1'
);
CREATE TABLE demo_log_02 (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '1'
);
CREATE TABLE demo_log_05 (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING
) WITH (
  'connector' = 'print'
);
insert into demo_log_05
select a.user_id,b.item_id,b.behavior,a.dt,a.hh from 
(select  user_id,item_id,behavior,dt,hh from demo_log_01 where dt='2023-17-02') a 
left join 
(select  user_id,item_id,behavior,dt,hh from demo_log_02) b
on a.user_id = b.user_id

`

failed to deploy with docker-compose

minimal steps to reproduce:

sh sbin/start-docker-compose.sh

output

INFO] /Users/haojiangfeng/codes/chainbase/cornflake-deployment/flink-sql-lineage/lineage-flink1.14.x/src/main/java:-1: info: compiling
[INFO] Compiling 5 source files to /Users/haojiangfeng/codes/chainbase/cornflake-deployment/flink-sql-lineage/lineage-flink1.14.x/target/classes at 1694156256794
[ERROR] error: scala.reflect.internal.MissingRequirementError: object java.lang.Object in compiler mirror not found.
[ERROR]         at scala.reflect.internal.MissingRequirementError$.signal(MissingRequirementError.scala:17)
[ERROR]         at scala.reflect.internal.MissingRequirementError$.notFound(MissingRequirementError.scala:18)
[ERROR]         at scala.reflect.internal.Mirrors$RootsBase.$anonfun$getModuleOrClass$4(Mirrors.scala:54)
[ERROR]         at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:54)
[ERROR]         at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:45)
[ERROR]         at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:45)
[ERROR]         at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:66)
[ERROR]         at scala.reflect.internal.Mirrors$RootsBase.getClassByName(Mirrors.scala:102)
[ERROR]         at scala.reflect.internal.Mirrors$RootsBase.getRequiredClass(Mirrors.scala:105)
[ERROR]         at scala.reflect.internal.Definitions$DefinitionsClass.ObjectClass$lzycompute(Definitions.scala:267)
[ERROR]         at scala.reflect.internal.Definitions$DefinitionsClass.ObjectClass(Definitions.scala:267)
[ERROR]         at scala.reflect.internal.Definitions$DefinitionsClass.init(Definitions.scala:1447)
[ERROR]         at scala.tools.nsc.Global$Run.<init>(Global.scala:1126)
[ERROR]         at scala.tools.nsc.MainClass.doCompile(Main.scala:24)
[ERROR]         at scala.tools.nsc.Driver.process(Driver.scala:55)
[ERROR]         at scala.tools.nsc.Driver.main(Driver.scala:68)
[ERROR]         at scala.tools.nsc.Main.main(Main.scala)
[ERROR]         at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
[ERROR]         at java.base/java.lang.reflect.Method.invoke(Method.java:578)
[ERROR]         at scala_maven_executions.MainHelper.runMain(MainHelper.java:164)
[ERROR]         at scala_maven_executions.MainWithArgsInFile.main(MainWithArgsInFile.java:26)
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary for flink-sql-lineage 1.0.0:
[INFO] 
[INFO] flink-sql-lineage .................................. SUCCESS [  0.838 s]
[INFO] lineage-common ..................................... SUCCESS [  2.371 s]
[INFO] lineage-loader ..................................... SUCCESS [  1.567 s]
[INFO] lineage-flink1.14.x ................................ FAILURE [  9.781 s]
[INFO] lineage-flink1.16.x ................................ SKIPPED
[INFO] lineage-client ..................................... SKIPPED
[INFO] lineage-web ........................................ SKIPPED
[INFO] lineage-server ..................................... SKIPPED
[INFO] lineage-server-domain .............................. SKIPPED
[INFO] lineage-server-infrastructure ...................... SKIPPED
[INFO] lineage-server-application ......................... SKIPPED
[INFO] lineage-server-interfaces .......................... SKIPPED
[INFO] lineage-server-start ............................... SKIPPED
[INFO] lineage-dist ....................................... SKIPPED
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  14.733 s
[INFO] Finished at: 2023-09-08T14:57:37+08:00
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal net.alchim31.maven:scala-maven-plugin:3.3.1:compile (scala-compile-first) on project lineage-flink1.14.x: wrap: org.apache.commons.exec.ExecuteException: Process exited with an error: 1 (Exit value: 1) -> [Help 1]
[ERROR] 
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR] 
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException
[ERROR] 
[ERROR] After correcting the problems, you can resume the build with the command
[ERROR]   mvn <args> -rf :lineage-flink1.14.x
Finish maven package
Build, create, and start containers with Docker Compose
[+] Running 1/1
 ! lineage-server Warning                                                                                                                                                          5.5s 
[+] Building 1.7s (6/8)                                                                                                                                                 docker:orbstack
 => [lineage-server internal] load build definition from Dockerfile                                                                                                                0.0s
 => => transferring dockerfile: 484B                                                                                                                                               0.0s
 => [lineage-server internal] load .dockerignore                                                                                                                                   0.0s
 => => transferring context: 2B                                                                                                                                                    0.0s
 => [lineage-server internal] load metadata for docker.io/library/openjdk:8-jdk-alpine                                                                                             1.5s
 => [lineage-server internal] load build context                                                                                                                                   0.0s
 => => transferring context: 2B                                                                                                                                                    0.0s
 => CANCELED [lineage-server 1/4] FROM docker.io/library/openjdk:8-jdk-alpine@sha256:94792824df2df33402f201713f932b58cb9de94a0cd524164a0f2283343547b3                              0.1s
 => => resolve docker.io/library/openjdk:8-jdk-alpine@sha256:94792824df2df33402f201713f932b58cb9de94a0cd524164a0f2283343547b3                                                      0.0s
 => => sha256:94792824df2df33402f201713f932b58cb9de94a0cd524164a0f2283343547b3 1.64kB / 1.64kB                                                                                     0.0s
 => => sha256:ad0c0c268e337200ace4256a6c40ea7eb01c467bde519aae8a6dc8c6ac103d53 947B / 947B                                                                                         0.0s
 => => sha256:e4105db9d4690c236b378feec3c07e3dbcc9efbd7e4e51d0a5df9a3b01b9e372 3.40kB / 3.40kB                                                                                     0.0s
 => ERROR [lineage-server 2/4] COPY lineage-dist/dist/flink-sql-lineage-1.0.0.tgz /opt/workspace/                                                                                  0.0s
------
 > [lineage-server 2/4] COPY lineage-dist/dist/flink-sql-lineage-1.0.0.tgz /opt/workspace/:
------
failed to solve: failed to compute cache key: failed to calculate checksum of ref 25e18e2d-1858-4647-846d-49be8142f7f5::9lhxl66okxasrzr93bi5l5z1y: failed to walk /var/lib/docker/tmp/buildkit-mount572364169/lineage-dist/dist: lstat /var/lib/docker/tmp/buildkit-mount572364169/lineage-dist/dist: no such file or directory

After I removed the dependency of flink-1.14.x, it still showed 'compiled failed'.
The docker compose process failed because of the missing tgz file.

complete logs:

Start maven packaging
[INFO] Scanning for projects...
[INFO] 
[INFO] ------------------< com.hw.lineage:flink-sql-lineage >------------------
[INFO] Building flink-sql-lineage 1.0.0
[INFO]   from pom.xml
[INFO] --------------------------------[ pom ]---------------------------------
[INFO] 
[INFO] --- clean:3.1.0:clean (default-clean) @ flink-sql-lineage ---
[INFO] 
[INFO] --- spotless:2.27.1:check (default) @ flink-sql-lineage ---
[INFO] Sorting file /var/folders/2k/ftdpqjh92g31qr9cwddx6cvr0000gn/T/pom3976339903314232846.xml
[INFO] Pom file is already sorted, exiting
[INFO] 
[INFO] --- source:3.2.1:jar-no-fork (attach-sources) @ flink-sql-lineage ---
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  1.067 s
[INFO] Finished at: 2023-09-08T16:11:36+08:00
[INFO] ------------------------------------------------------------------------
Finish maven package
Build, create, and start containers with Docker Compose
[+] Running 1/1
 ! lineage-server Warning                                                                                                                                                                                                                                                                                          6.2s 
[+] Building 3.3s (6/8)                                                                                                                                                                                                                                                                                 docker:orbstack
 => [lineage-server internal] load build definition from Dockerfile                                                                                                                                                                                                                                                0.0s
 => => transferring dockerfile: 484B                                                                                                                                                                                                                                                                               0.0s
 => [lineage-server internal] load .dockerignore                                                                                                                                                                                                                                                                   0.0s
 => => transferring context: 2B                                                                                                                                                                                                                                                                                    0.0s
 => [lineage-server internal] load metadata for docker.io/library/openjdk:8-jdk-alpine                                                                                                                                                                                                                             3.2s
 => [lineage-server internal] load build context                                                                                                                                                                                                                                                                   0.0s
 => => transferring context: 2B                                                                                                                                                                                                                                                                                    0.0s
 => CANCELED [lineage-server 1/4] FROM docker.io/library/openjdk:8-jdk-alpine@sha256:94792824df2df33402f201713f932b58cb9de94a0cd524164a0f2283343547b3                                                                                                                                                              0.1s
 => => resolve docker.io/library/openjdk:8-jdk-alpine@sha256:94792824df2df33402f201713f932b58cb9de94a0cd524164a0f2283343547b3                                                                                                                                                                                      0.0s
 => => sha256:94792824df2df33402f201713f932b58cb9de94a0cd524164a0f2283343547b3 1.64kB / 1.64kB                                                                                                                                                                                                                     0.0s
 => => sha256:ad0c0c268e337200ace4256a6c40ea7eb01c467bde519aae8a6dc8c6ac103d53 947B / 947B                                                                                                                                                                                                                         0.0s
 => => sha256:e4105db9d4690c236b378feec3c07e3dbcc9efbd7e4e51d0a5df9a3b01b9e372 3.40kB / 3.40kB                                                                                                                                                                                                                     0.0s
 => ERROR [lineage-server 2/4] COPY lineage-dist/dist/flink-sql-lineage-1.0.0.tgz /opt/workspace/                                                                                                                                                                                                                  0.0s
------
 > [lineage-server 2/4] COPY lineage-dist/dist/flink-sql-lineage-1.0.0.tgz /opt/workspace/:
------
failed to solve: failed to compute cache key: failed to calculate checksum of ref 25e18e2d-1858-4647-846d-49be8142f7f5::tz9r05h1yopyoygg7hfz8c6iu: failed to walk /var/lib/docker/tmp/buildkit-mount1522522218/lineage-dist/dist: lstat /var/lib/docker/tmp/buildkit-mount1522522218/lineage-dist/dist: no such file or directory
Lists all container information
NAME      IMAGE     COMMAND   SERVICE   CREATED   STATUS    PORTS


Deployment completed
It takes tens of seconds for the service to start, please wait a moment.

Swagger API: http://127.0.0.1:8194/swagger-ui/index.html
Knife4j API: http://127.0.0.1:8194/doc.html
Quick Catalog API: http://127.0.0.1:8194/catalogs/1
Index Page: http://127.0.0.1:8194/index.html

作业级别血缘实现方案探讨

本地运行了一下项目,使用了下基本功能,感觉还是很不错的,很厉害!请问 @HamaWhiteGG 有做任务间血缘的打算吗,我最近也在做flink实时平台的血缘工作可以一起探讨下,将 source、task、sink 之间的血缘展示出来。

[Bug]Flink cep sql column lineage parse error

Here is my test cep sql and ddl sql

CREATE TABLE `sink_table`(`agent_id` STRING, `room_id` STRING, `application_id` STRING, `type` STRING, `begin_time` BIGINT, `end_time` BIGINT) WITH(...)

CREATE TABLE `source_table`(`agent_id` STRING, `room_id` STRING, `create_time` BIGINT, `type` STRING, `application_id` STRING, `connect_time` BIGINT, `row_time` AS PROCTIME()) WITH(...)

insert into sink_table (agent_id,room_id,application_id,type,begin_time,end_time) 
select agent_id,room_id,application_id,type,begin_time,end_time 
	from source_table 
	match_recognize (partition by agent_id,room_id,application_id order by row_time 
	measures AF.type as type,last(BF.create_time) as begin_time,last(AF.create_time) as end_time 
	one row per match 
	after match SKIP PAST LAST ROW 
	pattern (BF+ AF) WITHIN INTERVAL '1' HOUR 
	define BF as BF.type = 'assign',AF as AF.type = 'pick_up' ) as T

As the code logic of the RelMdColumnOrigins, the lineage result of my cep sql is wrong, such as sink_table application_id column lineage

UDTF 函数使用别名时解析异常

INSERT INTO
dwd_hudi_users
SELECT
length,
name,
word as company_name,
birthday,
ts,
DATE_FORMAT(birthday, 'yyyyMMdd')
FROM
ods_mysql_users,
LATERAL TABLE (my_split_udtf (name))
作者给的这个示例是可以解析出血缘关系;
如果sql更换为
INSERT INTO
dwd_hudi_users
SELECT
length,
column1 as name,
word as company_name,
birthday,
ts,
DATE_FORMAT(birthday, 'yyyyMMdd')
FROM
ods_mysql_users,
LATERAL TABLE (my_split_udtf (name)) as T (column1)
解析此sql是用2.0.0的RelMdColumnOrigins 会抛出 org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc cannot be cast to org.apache.calcite.rel.core.TableFunctionScan异常
使用main的RelMdColumnOrigins
mq.getColumnOrigins(rel.getRight(), iOutputColumn - nLeftColumns)
mappings 为null 并且 rel.getInputs().isEmpty() 返回set size 0
source column无法解析

[Discussion] 关于在flink当中添加atlasHook的实现

目前flink社区对接atlas的进展: https://docs.google.com/document/d/1wSgzPdhcwt-SlNBBqL-Zb7g8fY6bN8JwHEg7GCdsBG8/edit#heading=h.mfdxvgyhozur

flink侧需要改动的poc验证: https://github.com/gyfora/atlas/blob/flink-bridge/addons/flink-bridge/src/main/java/org/apache/atlas/flink/hook/FlinkAtlasHook.java

前辈如果有兴趣的话可以参考下作者的实现,对flink的侵入性比较小. 也比较通用
现在atlas2.2.0这部分代码已经合并到主干了 flink这边还要做一些poc验证

main 分支测试报错

@Slf4j
public class SimpleTest extends AbstractBasicTest {

    @Before
    public void createTable() {
        createMysqlUser();
        createOdsUser();
    }

    @Test
    public void concat() {
        String sql = "insert into mysql_user " +
                "select id, birthday, concat(first_name, last_name) as full_name " +
                "from ods_user";
        List<Result> actualList = context.parseFieldLineage(sql);
        log.info("Linage Result: ");
        actualList.forEach(e -> log.info(e.toString()));
    }

    protected void createMysqlUser() {
        context.execute("DROP TABLE IF EXISTS mysql_user ");
        context.execute("CREATE TABLE IF NOT EXISTS mysql_user (" +
                "       id                        BIGINT           ," +
                "       birthday                  TIMESTAMP(3)     ," +
                "       full_name                 STRING            " +
                ") WITH ( " +
                "       'connector' = 'jdbc'                 ," +
                "       'url'       = 'jdbc:mysql://127.0.0.1:3306/demo?useSSL=false&characterEncoding=UTF-8'," +
                "       'username'  = 'root'                 ," +
                "       'password'  = 'xxx'          ," +
                "       'table-name'= 'mysql_user' " +
                ")"
        );
    }

    protected void createOdsUser() {
        context.execute("DROP TABLE IF EXISTS ods_user ");
        context.execute("CREATE TABLE IF NOT EXISTS ods_user (" +
                "       id                        BIGINT           ," +
                "       birthday                  TIMESTAMP(3)     ," +
                "       first_name                STRING           ," +
                "       last_name                 STRING           ," +
                "       company_name              STRING           " +
                ") WITH ( " +
                "       'connector' = 'jdbc'                 ," +
                "       'url'       = 'jdbc:mysql://127.0.0.1:3306/demo?useSSL=false&characterEncoding=UTF-8'," +
                "       'username'  = 'root'                 ," +
                "       'password'  = 'xxx'          ," +
                "       'table-name'= 'ods_user' " +
                ")"
        );
    }
}

如果去掉 ods_user 的 company_name 字段就能解析成功。我这个写法是有问题吗。

血缘解析 使用UUID()时血缘解析 血缘关系错误 flink版本1.14

使用作者的Test 添加 如下代码

    @Test
    public void testInsertSelectUUID() {
        String sql = "INSERT INTO dwd_hudi_users (id, name, birthday) " +
                "SELECT " +
                "   ROW_NUMBER() OVER (ORDER BY ts DESC) as id," +
                "   UUID() as name," +
                "   birthday " +
                "FROM" +
                "   ods_mysql_users";
    
        String[][] expectedArray = {
                {"ods_mysql_users", "ts", "dwd_hudi_users", "id",
                        "ROW_NUMBER() OVER (ORDER BY ts DESC NULLS LAST)"},
                {"ods_mysql_users", "name", "dwd_hudi_users", "name"},
                {"ods_mysql_users", "ts", "dwd_hudi_users", "birthday"}
        };
        
        analyzeLineage(sql, expectedArray);
    }

其中 {"ods_mysql_users", "ts", "dwd_hudi_users", "birthday"} 期望应该是 {"ods_mysql_users", "birthday", "dwd_hudi_users", "birthday"} name字段的sourceCol 是uuid函数 应该是没有列吧

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.