Code Monkey home page Code Monkey logo

flink-sql-lineage's Issues

Execute SQL: USE Flink16_memory.`default` 执行异常

2024-06-19 11:09:09,971 INFO main com.hw.lineage.flink.LineageServiceImpl.executeSql:158 - Execute SQL: USE Flink16_memory.default 启动异常

Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: A catalog with name [Flink16_memory] does not exist.

血缘解析 使用UUID()时血缘解析 血缘关系错误 flink版本1.14

使用作者的Test 添加 如下代码

    @Test
    public void testInsertSelectUUID() {
        String sql = "INSERT INTO dwd_hudi_users (id, name, birthday) " +
                "SELECT " +
                "   ROW_NUMBER() OVER (ORDER BY ts DESC) as id," +
                "   UUID() as name," +
                "   birthday " +
                "FROM" +
                "   ods_mysql_users";
    
        String[][] expectedArray = {
                {"ods_mysql_users", "ts", "dwd_hudi_users", "id",
                        "ROW_NUMBER() OVER (ORDER BY ts DESC NULLS LAST)"},
                {"ods_mysql_users", "name", "dwd_hudi_users", "name"},
                {"ods_mysql_users", "ts", "dwd_hudi_users", "birthday"}
        };
        
        analyzeLineage(sql, expectedArray);
    }

其中 {"ods_mysql_users", "ts", "dwd_hudi_users", "birthday"} 期望应该是 {"ods_mysql_users", "birthday", "dwd_hudi_users", "birthday"} name字段的sourceCol 是uuid函数 应该是没有列吧

UDTF 函数使用别名时解析异常

INSERT INTO
dwd_hudi_users
SELECT
length,
name,
word as company_name,
birthday,
ts,
DATE_FORMAT(birthday, 'yyyyMMdd')
FROM
ods_mysql_users,
LATERAL TABLE (my_split_udtf (name))
作者给的这个示例是可以解析出血缘关系;
如果sql更换为
INSERT INTO
dwd_hudi_users
SELECT
length,
column1 as name,
word as company_name,
birthday,
ts,
DATE_FORMAT(birthday, 'yyyyMMdd')
FROM
ods_mysql_users,
LATERAL TABLE (my_split_udtf (name)) as T (column1)
解析此sql是用2.0.0的RelMdColumnOrigins 会抛出 org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc cannot be cast to org.apache.calcite.rel.core.TableFunctionScan异常
使用main的RelMdColumnOrigins
mq.getColumnOrigins(rel.getRight(), iOutputColumn - nLeftColumns)
mappings 为null 并且 rel.getInputs().isEmpty() 返回set size 0
source column无法解析

The transform of lineage result is null when parsing UDAF

public class TestAggregateFunction extends AggregateFunction<String, TestAggregateFunction.TestAggregateAcc> {

    public void accumulate(TestAggregateAcc acc, String param1, String param2, String param3) {
        acc.test = param1 + param2 + param3;
    }

    @Override
    public String getValue(TestAggregateAcc accumulator) {
        return accumulator.test;
    }

    @Override
    public TestAggregateAcc createAccumulator() {
        return new TestAggregateAcc();
    }

    public static class TestAggregateAcc {
        public String test;
    }
}

create function test_aggregate as 'com.hw.lineage.flink.aggregatefunction.TestAggregateFunction'

INSERT INTO dwd_hudi_users SELECT id, name, test_aggregate(concat_ws('_', name, 'test'), name, 'test'), birthday, ts, DATE_FORMAT(birthday, 'yyyyMMdd') FROM ods_mysql_users group by id, name, birthday, ts

The transform of lineage result is null when parsing UDAF.
eq:
LineageResult(sourceCatalog=hive, sourceDatabase=default, sourceTable=ods_mysql_users, sourceColumn=name, targetCatalog=hive, targetDatabase=default, targetTable=dwd_hudi_users, targetColumn=company_name, transform=null)

[Bug]Flink cep sql column lineage parse error

Here is my test cep sql and ddl sql

CREATE TABLE `sink_table`(`agent_id` STRING, `room_id` STRING, `application_id` STRING, `type` STRING, `begin_time` BIGINT, `end_time` BIGINT) WITH(...)

CREATE TABLE `source_table`(`agent_id` STRING, `room_id` STRING, `create_time` BIGINT, `type` STRING, `application_id` STRING, `connect_time` BIGINT, `row_time` AS PROCTIME()) WITH(...)

insert into sink_table (agent_id,room_id,application_id,type,begin_time,end_time) 
select agent_id,room_id,application_id,type,begin_time,end_time 
	from source_table 
	match_recognize (partition by agent_id,room_id,application_id order by row_time 
	measures AF.type as type,last(BF.create_time) as begin_time,last(AF.create_time) as end_time 
	one row per match 
	after match SKIP PAST LAST ROW 
	pattern (BF+ AF) WITHIN INTERVAL '1' HOUR 
	define BF as BF.type = 'assign',AF as AF.type = 'pick_up' ) as T

As the code logic of the RelMdColumnOrigins, the lineage result of my cep sql is wrong, such as sink_table application_id column lineage

CROSS JOIN UNNEST 血缘解析报错,可能要扩展 RelMdColumnOrigins.getColumnOrigins 方法实现,大佬有空支持下。

CREATE TABLE kafka_source (
id bigint,
data ARRAY<row<c1 bigint,c2 string>>
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = 'hadoop101:9092',
'topic' = 't1',
'properties.group.id' = 'g1',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);

CREATE TABLE sink_print (
id bigint,
c1 bigint,
c2 string
) WITH (
'connector' = 'print'
);

insert into sink_print
SELECT id, t.c1, t.c2
FROM kafka_source
CROSS JOIN UNNEST(data) AS t(c1,c2)
;

IMG20240423113658

作业级别血缘实现方案探讨

本地运行了一下项目,使用了下基本功能,感觉还是很不错的,很厉害!请问 @HamaWhiteGG 有做任务间血缘的打算吗,我最近也在做flink实时平台的血缘工作可以一起探讨下,将 source、task、sink 之间的血缘展示出来。

Error when parsing UDAF which the number of input and udf argument is equal

image

public class TestAggregateFunction extends AggregateFunction<String, TestAggregateFunction.TestAggregateAcc> {

    public void accumulate(TestAggregateAcc acc, String param1, String param2, String param3) {
        acc.test = param1 + param2 + param3;
    }

    @Override
    public String getValue(TestAggregateAcc accumulator) {
        return accumulator.test;
    }

    @Override
    public TestAggregateAcc createAccumulator() {
        return new TestAggregateAcc();
    }

    public static class TestAggregateAcc {
        public String test;
    }
}

create function test_aggregate as 'com.hw.lineage.flink.aggregatefunction.TestAggregateFunction'

  String sql = "INSERT INTO dwd_hudi_users " +
                "SELECT " +
                "   id ," +
                "   name ," +
                "   test_aggregate(concat_ws('_', name, email), address, 'test')," +
                "   birthday ," +
                "   ts ," +
                "   DATE_FORMAT(birthday, 'yyyyMMdd') " +
                "FROM" +
                "   ods_mysql_user_detail group by id, name, birthday, ts ";
context.execute("CREATE TABLE IF NOT EXISTS ods_mysql_user_detail (" +
                "       id                  BIGINT PRIMARY KEY NOT ENFORCED ," +
                "       name                STRING                          ," +
                "       birthday            TIMESTAMP(3)                    ," +
                "       ts                  TIMESTAMP(3)                    ," +
                "       email               STRING                          ," +
                "       address             STRING                          ," +
                "       proc_time as proctime()                              " +
                ") WITH ( " +
                "       'connector' = 'mysql-cdc'            ," +
                "       'hostname'  = '127.0.0.1'       ," +
                "       'port'      = '3306'                 ," +
                "       'username'  = 'root'                 ," +
                "       'password'  = 'xxx'          ," +
                "       'server-time-zone' = 'Asia/Shanghai' ," +
                "       'database-name' = 'demo'             ," +
                "       'table-name'    = 'users' " +
                ")");

flinksql lineage result missed one column

here is my sql script

-- source1
DROP TABLE IF EXISTS `dwd_status_log`;
CREATE TABLE `dwd_status_log`
(
    mechine STRING,
    timenow STRING,
    `value` STRING,
    pre_value STRING,
    PRIMARY KEY (timenow) NOT ENFORCED
)  
WITH (
    'connector' = 'kafka',
    'topic' = 'dwd_status_log',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'localhost:9092',
    'format' = 'debezium-json'
);

-- source2
DROP TABLE IF EXISTS dim_equipment_info;
CREATE TABLE IF NOT EXISTS dim_equipment_info (
    id STRING 
    ,station_type STRING 
    ,equipment_no STRING 
    ,emp_name STRING 
    ,emp_phone STRING 
    ,vendor STRING 
    ,site STRING 
    ,`floor` STRING 
    ,line STRING 
    ,model STRING 
    ,stage STRING 
    ,id_value STRING 
    ,emp_info STRING
    ,PRIMARY KEY ( id ) NOT ENFORCED
) 
WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:4000/data_warehouse_p_10046?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&nullCatalogMeansCurrent=true',
   'table-name' = 'dim_equipment_info',
   'username' = 'root',
   'password' = 'xxx'
);


-- sink
DROP TABLE IF EXISTS dws_alarm_status_count;
CREATE TABLE IF NOT EXISTS dws_alarm_status_count (
    mechine_id STRING 
    ,mechine_name STRING 
    ,trigger_date STRING 
    ,alarm_status_count BIGINT 
    ,PRIMARY KEY ( mechine_id,trigger_date ) NOT ENFORCED
) 
WITH (
    'connector' = 'kafka',
    'topic' = 'dws_alarm_status_count',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'localhost:9092',
    'format' = 'debezium-json'
);

-- trans
INSERT INTO 
    dws_alarm_status_count 
SELECT
    dsl.mechine AS mechine_id,
    dei.equipment_no AS mechine_name,
    DATE_FORMAT(dsl.timenow, 'yyyy-MM-dd') AS `trigger_date`,
    COUNT(*) AS alarm_status_count
FROM 
    dwd_status_log dsl 
LEFT JOIN 
    dim_equipment_info dei 
ON 
    dsl.mechine = dei.id 
WHERE 
    dsl.`value` = '4' 
GROUP BY 
DATE_FORMAT(dsl.timenow, 'yyyy-MM-dd'), dsl.mechine,dei.equipment_no;

it missed the last column lineage data.
1688983293972

[Discussion] 关于在flink当中添加atlasHook的实现

目前flink社区对接atlas的进展: https://docs.google.com/document/d/1wSgzPdhcwt-SlNBBqL-Zb7g8fY6bN8JwHEg7GCdsBG8/edit#heading=h.mfdxvgyhozur

flink侧需要改动的poc验证: https://github.com/gyfora/atlas/blob/flink-bridge/addons/flink-bridge/src/main/java/org/apache/atlas/flink/hook/FlinkAtlasHook.java

前辈如果有兴趣的话可以参考下作者的实现,对flink的侵入性比较小. 也比较通用
现在atlas2.2.0这部分代码已经合并到主干了 flink这边还要做一些poc验证

main 分支测试报错

@Slf4j
public class SimpleTest extends AbstractBasicTest {

    @Before
    public void createTable() {
        createMysqlUser();
        createOdsUser();
    }

    @Test
    public void concat() {
        String sql = "insert into mysql_user " +
                "select id, birthday, concat(first_name, last_name) as full_name " +
                "from ods_user";
        List<Result> actualList = context.parseFieldLineage(sql);
        log.info("Linage Result: ");
        actualList.forEach(e -> log.info(e.toString()));
    }

    protected void createMysqlUser() {
        context.execute("DROP TABLE IF EXISTS mysql_user ");
        context.execute("CREATE TABLE IF NOT EXISTS mysql_user (" +
                "       id                        BIGINT           ," +
                "       birthday                  TIMESTAMP(3)     ," +
                "       full_name                 STRING            " +
                ") WITH ( " +
                "       'connector' = 'jdbc'                 ," +
                "       'url'       = 'jdbc:mysql://127.0.0.1:3306/demo?useSSL=false&characterEncoding=UTF-8'," +
                "       'username'  = 'root'                 ," +
                "       'password'  = 'xxx'          ," +
                "       'table-name'= 'mysql_user' " +
                ")"
        );
    }

    protected void createOdsUser() {
        context.execute("DROP TABLE IF EXISTS ods_user ");
        context.execute("CREATE TABLE IF NOT EXISTS ods_user (" +
                "       id                        BIGINT           ," +
                "       birthday                  TIMESTAMP(3)     ," +
                "       first_name                STRING           ," +
                "       last_name                 STRING           ," +
                "       company_name              STRING           " +
                ") WITH ( " +
                "       'connector' = 'jdbc'                 ," +
                "       'url'       = 'jdbc:mysql://127.0.0.1:3306/demo?useSSL=false&characterEncoding=UTF-8'," +
                "       'username'  = 'root'                 ," +
                "       'password'  = 'xxx'          ," +
                "       'table-name'= 'ods_user' " +
                ")"
        );
    }
}

如果去掉 ods_user 的 company_name 字段就能解析成功。我这个写法是有问题吗。

当where中包含字段等于常量时,会把源字段优化成常量,导致丢失改字段的血缘关系

当输入以下 FlinkSQL 时:
`
CREATE TABLE demo_log_01 (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '1'
);
CREATE TABLE demo_log_02 (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '1'
);
CREATE TABLE demo_log_05 (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING
) WITH (
  'connector' = 'print'
);
insert into demo_log_05
select a.user_id,b.item_id,b.behavior,a.dt,a.hh from 
(select  user_id,item_id,behavior,dt,hh from demo_log_01 where dt='2023-17-02') a 
left join 
(select  user_id,item_id,behavior,dt,hh from demo_log_02) b
on a.user_id = b.user_id

`

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.