Code Monkey home page Code Monkey logo

estante's People

Contributors

dependabot[bot] avatar takegue avatar

Watchers

 avatar

estante's Issues

BigQuery において 一時的な TVFやモデルを作る:

BigQueryにおける一時的なテーブルやUDFの作成

create temp functioncreate temp table 構文を利用することで、一時的なテーブルやUDFを作ることができる.
ただしこの temp 指示子は、限定的で TVFやPROCEDURE、BigQueryMLの modelなどでは作成することができない。

この記事では temp指示子が サポートされていないリソース類について作成する方法について解説する。

解決方法: BigQueryにより作成される一時隠しデータセットを間借りする

create temp table 構文により作成されるリソースは、BigQueryのシステム上では
一時的な隠しデータセットを生成している。
これは上記を実行したジョブから、作成されたテーブルのメタデータから確認できる。

そしてこれはSQL上からは一時的に参照できるデータセットであり、次の特徴を持つ。

  • GUI上では確認できない
  • INFORMATION_SCHEMA のメタデータは 利用できない
  • 時間経過と共に削除される
  • 作成者にしか参照できない。

image

このデータセットにUDFやTVFなどを作成することで一時的なTVFの作成を達成することができる

PoCコード

次のコードが上記を達成するコードとなる

declare temp_dataset string;
execute immediate format("create or replace temporary table `%s` as (select 1 as a limit 0)", generate_uuid());

set temp_dataset = (
  select as value
    ifnull(destination_table.dataset_id, error('not found job')) 
  from `region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT`
  where job_id = @@last_job_id
    and creation_time >= current_timestamp() - interval 30 minute
  limit 1
)
;
set @@dataset_id = temp_dataset;


-- table の作成と利用
create table `temp_table_test1`
as select 1 as a;
select * from `temp_table_test1`;

-- TVF の作成と利用
execute immediate format("""
  create or replace table function `%s.hoge`()
  as select 1 as a;
"""
  , temp_dataset
);
execute immediate format("select * from `%s.hoge`()", temp_dataset)

Limitation

  • hidden dataset配下の TVFの参照には @@dataset_idが効かないため、 execute immediate で動的なSQLを実行する必要がある

テーブルのラベルにパーティション情報を埋め込む

概要

BigQueryのパーティションは大きいサイズのテーブルに便利な機能であるが
テーブルの状態として、パーティションがどこまで存在するかなどの情報は不明慮で、実際に使ってみるまではわからない。

INFORMATION_SCHEMA.PARTITIONS を使えばこの情報を取得することができるが
テーブルのメタ情報に埋め込んでしまうのが利用者にとって便利である。

demo

データセットの名前を渡すと、ラベルの情報を更新するプロシージャを作成する。
実際にはデータ更新後に呼び出しラベル情報を更新する運用を想定している。

image

  • partition-max: 最大のパーティション
  • partition-min: 最小のパーティション
  • partiion-skip: 非連続に存在するパーティションの数. 例えば日付パーティションで、特定の日付がないと分析上困るるので記録する

プロシージャ

BigQuery Scriptを利用し、ラベル情報を埋め込むプロシージャを作成した:

https://github.com/takegue/estante/blob/master/codes/bigquery/fn/update_tables_labels_for_partition/ddl.sql#L1-L91

BigQuery: 配列に対してFULL JOINを行う

unnestにおいてfull joinはエラーとなる

次のようなクエリはエラーとなる。

select  *
from unnest([1, 3]) as lhs
full join unnest([1,2]) as rhs

unnest() オペレーターは right join に対応していないため、 full join も記述できない

解決策

concatenateした配列を用意し、これに対して left joinすることで full joinと同じ出力を得ることができる。

select distinct 
  lhs, rhs 
from unnest([1,2] || [1, 3]) as both
left join unnest([1,2]) as lhs on both = lhs
left join unnest([1,3]) as rhs on both = rhs

BigQueryのINFORMATION_SCHEMAのlabels情報をパースする

概要

INFORMATION_SCHEMA.TABLE_OPTIONS で得られる オプションの値である
option_value は STRING型であり、中身がJSON形式ではなくSQL形式である.
e.g ['[STRUCT("hoge", "fuga"), STRUCT("hoge", "fugb")]'

これは非常に扱いづらいので、パースして利用できるようにする

方法

JSON型に変換してしまい、そこからSQL型の構造に変換する

select 
  labels
from unnest(['[STRUCT("hoge", "fuga"), STRUCT("hoge", "fugb")]']) v
left join unnest([struct(
    array(
      select as struct
        label[0] as key, label[1] as value 
      from unnest(json_extract_array(parse_json(replace(replace(replace(v, "STRUCT", ""), '(', '['), ')', ']')))) as label
    ) as labels
)]) as v2

UDF

UDFにしておくと便利かもしれない

create or replace function `fn.get_bqlabel_from_option_value`(
label_option_value string
)
returns array<struct<key string, value string>>
options(description="""Get BigQuery Labels from INFORMATION_SCHEMA.TABLE_OPTIONS's option_value
Arguments
====
label_option_value: label's option_value in BigQuery INFORMATION_SCHEMA Format like `'[STRUCT("hoge", "fuga"), STRUCT("hoge", "fugb")]'`
""")
as (
array(
select as struct
string(label[0]), string(label[1]) as value
from unnest(json_extract_array(parse_json(replace(replace(replace(label_option_value, "STRUCT", ""), '(', '['), ')', ']')))) as label
)
);

マイクロサービスとしてBigQueryデータセットを運用する

  • データセットを一つのサービスとして捉えると都合が良い
  • viewをIDLとして運用する
  • 集計用のTVFあるいはKPIを提供するのもサービスの責任と都合がよい

パッケージ/ライブラリ パターン: エンティティ系

商品や顧客、URLなど特定の対象に関して
構造情報化するためのデータセット
このデータセットが公開するプライマリキーは分析の際の集計単位として利用されるため、特別な名前としてエンティティキーとする。

どのような構成にするべきか

  • core: 主要なパブリックI/F。データセット利用者は基本的にはこれを利用する
  • snapshot: 特定の日時のcoreを再現する(必要あれば)
  • history: 日時単位でのcore再現する(必要あればそれを)

パッケージ/ライブラリ パターン: イベントログ系

ダッシュボードや分析において再利用することが前提のデータセット。
Data Vaultでいうlinkテーブル
レコード単体の利用というよりも、レコード中に含まれるエンティティキーを利用した分析が主な利用
その中でもイベントログ

ユーザ行動の汎用的に利用されるだろう

どのような構成にするべきか

  • core: 主要なパブリックI/F。データセット利用者は基本的にはこれを利用する。

  • zattr_xxxx: coreを実現するために必要な属性テーブルなど

  • zmonitor_xxxx: データ監視用

  • zdatasorure: 実装テーブル. データに関するカラム追加やデータ削除などオペレーションに都合がよい形式がよい

  1. zdatasourceはwhereなどで安易にフィルタしないほうがよい
  • レコードの異常値の検知がzmonitorで可能なようにする
  • coreで指定するときにはwhereで絞れば良いので利用者にとっては問題がない
  1. マテリアイズドビューの活用
  • viewに関するパフォーマンスが問題になる場合(例えばjoin)
  • データの状態に応じて、viewの公開状態を変えたい場合

前計算が必要になる。
こういうときにマテビューが便利

ダッシュボード系

  • dashboard連携用物理テーブル:
  • analytics_stats: 集計関数
  • signal: データソース。デバッグや異常レコードの追跡のために必要

物理化すると良い(クラスタインデックス含め)

good

bad

GitHub Copilotがやってきた

GitHub Copilotがやってきた

特に有名なOSSコミッターではないが、GitHub Copilotがフリーで使えるようになったので
せっかくなので使って近未来を感じてみる:

image

Neovimでの setup

https://docs.github.com/en/copilot/getting-started-with-github-copilot/getting-started-with-github-copilot-in-neovim

ここらへんを見ながら、プラグインマネージャにcopoilot ポチポチしながら
プラグインのインストールを行う。

動かしてみる

# Good example for a GitHub Copilot beginner

ここらへんまで入力して、neovimであれば次のコマンドを入力する

:Copilot panel

こんな感じで候補が出る。 (ただ候補が出るまではまぁまぁ時間がかかる)

# Good example for a GitHub Copilot beginner

import os
import sys
import time
import datetime
import argparse
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import matplotlib.cbook as cbook
import matplotlib.ticker as ticker
import matplotlib.colors as colors
import matplotlib.cm as cm
import matplotlib.mlab as mlab
import matplotlib.gridspec as gridspec
import matplotlib.image as mpimg
import matplotlib.patches as mpatches
import matplotlib.lines as mlines
import matplotlib.transforms as mtransforms
import matplotlib.text as mtext
import matplotlib.font_manager as font_manager
import matplotlib.ticker as mticker
import matplotlib.axis as maxis
import matplotlib.artist as martist
import matplotlib.backends.backend_agg as agg
import matplotlib.backends.backend_pdf as pdf
import matplotlib.backends.backend_svg as svg
import matplotlib.backends.backend_ps as ps
import matplotlib.backends.backend_gtkagg as gtkagg
import matplotlib.backends.backend_macosx as macosx
import matplotlib.backends.backend_mpl as mpl
import matplotlib.backends.backend_qt4agg as qt4agg
import matplotlib.backends.backend_qt5agg as qt5agg
import matplotlib.backends.backend_tkagg as tkagg
import matplotlib.backends.backend_webagg as webagg
import matplotlib.backends.backend_agg as agg
import matplotlib.backends.backend_cairo as cairo
import matplotlib.backends.backend_gdk as gdk
import matplotlib.backends.backend_ps as ps
import matplotlib.backends.backend_svg as svg
import matplotlib.backends.backend_template as template
import matplot

# Good example for a GitHub Copilot beginner.

import sys
import os
import time
import datetime
import random
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.python.framework import ops
from tensorflow.python.framework import dtypes
from tensorflow.python.ops import gen_array_ops
from tensorflow.python.ops import gen_math_ops
from tensorflow.python.ops import state_ops
from tensorflow.python.ops import control_flow_ops
from tensorflow.python.ops import array_ops
from tensorflow.python.ops import math_ops
from tensorflow.python.ops import random_ops
from tensorflow.python.ops import variables
from tensorflow.python.ops import logging_ops
from tensorflow.python.ops import nn_ops
from tensorflow.python.ops import data_flow_ops
from tensorflow.python.ops import variable_scope as vs
from tensorflow.python.ops import gradients as tf_gradients
from tensorflow.python.ops import image_ops
from tensorflow.python.ops import summary_ops
from tensorflow.python.ops import histogram_ops
from tensorflow.python.ops import variable_scope
from tensorflow.python.ops import variables as tf_variables
from tensorflow.python.ops import gradients as tf_gradients
from tensorflow.python.ops import logging as tf_logging
from tensorflow.python.ops import nn as tf_nn
from tensorflow.python.ops import math_ops as tf_math_ops
from tensorflow.python.ops import array_ops as tf_array_ops
from tensorflow.python.ops import random_ops as tf_random_ops
from tensorflow.python.ops import variable_scope as tf_variable_scope
from tensorflow.python.ops import variables as tf_variables
from tensorflow.python.ops import gradients as tf_gradients
from tensorflow.python.ops import logging as tf_logging
from tensorflow.python.ops

# ...

Github Copilotは matplotlibかtensorflow推しということがよくわかる

GitHub Copilotから見えるコーディングの未来

言ってしまえば GitHub` Copilotは、超賢くなったsnippet 生成器である。

「ググってStackOverflow を読んで、それを解釈してコード片をコピペする」って動作を
アプリケーション化したものであるが、使ってみた感想としてはかなり未来を感じた。

こういうユースケースでは刺さるなと思った点をいくつか書いていこうと思う。

Pythonでグラフを描画する

notebookを利用する際に matplotlib で描画するケースというのはよくあると思う。
特にpandasなどを利用して描画するケースは頻出である。
こういった時に大変なのが、グラフの描画オプションを調整したりするところである。
たくさんあるオプションの中で適切な描画オプションを選び調整する必要がある。

# Scatter plot from order dataframe

# Read in the data
df = pd.read_csv('order_data.csv')

# Create a scatter plot of the data
plt.scatter(df['order_id'], df['order_total'])

# Add a title and axis labels
plt.title('Order Total vs. Order ID')
plt.xlabel('Order ID')
plt.ylabel('Order Total')

# Show the plot
plt.show()

上記は簡単な例ではあるが
覚える気のないオプション込みでの生成ができるようになるとかなりありがたいと思う。

定番の集計SQLを生成する

SQLというのはパターン化しやすい題材で、テーブルの形が決まっていれば
大体利用するSQLというのはだいたい形が決まってくる。

こういう退屈なSQLというのも Copilotの出番だ

-- Show unique users and revenue daily from order table
SELECT DATE(order_date) AS order_date, COUNT(DISTINCT user_id) AS unique_users, SUM(total_price) AS revenue
FROM order_table
GROUP BY DATE(order_date);

order tableを適宜いじる必要があるが、上々なSQLが生成されているように感じる。

その他、「新規ユーザの売り上げを知りたい」といった要求に対しても
それっぽいクエリを出してくれる。

-- Show order amount by newbie user from order table
SELECT
    o.user_id,
    SUM(o.amount) AS amount
FROM
    orders o
WHERE
    o.user_id IN (
        SELECT
            u.id
        FROM
            users u
        WHERE
            u.role = 'newbie'
    )
GROUP BY
    o.user_id;

こういったSQL生成にはかなり魅力を感じた。
初心者が初手で利用するには十分なんじゃないだろうか

BigQueryのタイムゾーンに関するプラクティス

BigQueryにおけるタイムゾーンの取り扱い

BigQueryにおけるタイムゾーンの取り扱いは問題を起こしやすい。
特にDATE型やdatetime型に丸め込まれた場合に問題が発生しやすい。

CURRENT_DATE()CURRENT_DATETIME() で得られる日付はデフォルトではUTC である*1
また 日付パーティションもtimestampからの変換を行っている場合はデフォルトでUTCとなる。

特定のタイムゾーンでの日付を得るためには明示的にタイムゾーンを与えなければならない

select 
  CURRENT_DATE('Asia/Tokyo')
  , CURRENT_DATETIME('Asia/Tokyo')
  , timestamp('2022-01-01', 'Asia/Tokyo')
  , timestamp_trunc(current_timestamp(), HOUR, 'Asia/Tokyo')

BigQuery コンソール上の timestampの表記の注意点

注意点としてBigQueryのコンソール画面における timestamp型の表記は
デフォルトではUTCタイムゾーンとして表示される.

JSTといったタイムゾーンの変換を行ったとしてもコンソール上はUTCタイムゾーンを基準に表示される。
先ほどのクエリ結果は、コンソール上では次のように表示される。
image

BigQuery でのタイムゾーンを含む取り扱いのプラクティス

DWHとのレイヤは timestamp型を保持する

BigQueryでタイムゾーンが型レベルで取り扱えるのは timestamp型 のみである。
データマートの作成などデータソースとしての役割を担うDWHレイヤにおいてはtimestamp型で保持することが望ましい

  • タイムゾーン含め日時を表す方の中では 最も情報量が多く明示的な型であるため
  • 変換の関数が一番豊富
  • 元がDATE型やString型であった場合も含め、timestamp型にパースしておくとよい

利用者向けに ts変換用のUDFを用意しておく

時刻の変換用はミスが出やすいため関数を用意しておくと、利用に便利である

create or replace function `fn.ts_conv`(ts timestamp)
as (
  struct(
    ts as raw
    , struct(
      string(ts, 'UTC') as pretty
      , struct(
          timestamp_trunc(ts, hour, 'UTC') as hour
          , timestamp_trunc(ts, day, 'UTC') as day
          , timestamp_trunc(ts, week, 'UTC') as week
          , timestamp_trunc(ts, month, 'UTC') as month
        ) as trunc
      , struct(
          extract(hour from ts at time zone 'UTC') as hour
          , extract(day from ts at time zone 'UTC') as day
          , extract(week from ts at time zone 'UTC') as week
          , extract(month from ts at time zone 'UTC') as month
        ) as part
    ) as utc
    , struct(
      string(ts, 'Asia/Tokyo') as pretty
      struct(
        timestamp_trunc(ts, hour, 'Asia/Tokyo') as hour
        , timestamp_trunc(ts, day, 'Asia/Tokyo') as day
        , timestamp_trunc(ts, week, 'Asia/Tokyo') as week
        , timestamp_trunc(ts, month, 'Asia/Tokyo') as month
      ) as trunc
      , struct(
          extract(hour from ts at time zone 'Asia/Tokyo') as hour
          , extract(day from ts at time zone 'Asia/Tokyo') as day
          , extract(week from ts at time zone 'Asia/Tokyo' ) as week
          , extract(month from ts  at time zone 'Asia/Tokyo') as month
        ) as part
    ) as jst
  )
);

-- 構造体のアクセス宣言のみで時間の変換が可能になる
select ts_conv(current_timestamp()).jst.trunc.hour
;

@@time_zone によるデフォルトのタイムゾーンの変換

BigQuery Scriptの機能を利用し、 デフォルトのタイムゾーンの変換が可能である (Ref. https://cloud.google.com/bigquery/docs/reference/system-variables)

set @@time_zone = 'Asia/Tokyo'

select current_date()
--> 'Asia/Tokyo'での日時となる

ただし、上記の設定はテーブル作成の際のパーティション計算には及ばない点は注意である
@@time_zoneを変更したとしても日付パーティションの作成の際に、timestampをDATEに変換したとしても
DATEはUTCを基準に計算される.

set @@time_zone = 'Asia/Tokyo';
create or replace table `sandbox.test_partition`
-- DATE(d) の日付はUTCを基準に計算される
partition by DATE(d) 
as
  select d from unnest(
    generate_timestamp_array(
      timestamp('2022-08-24', 'UTC')
      , timestamp('2022-08-25', 'UTC')
      , interval 3 hour
    )
  ) as d

Orgnization/Projectごとのデフォルトタイムゾーンの設定 (from twitter:@yutah_3)

BigQueryではタイムゾーン情報は、プロジェクトもしくは組織レベルでのデフォルト値を設定することができる。
この場合ユーザによるBigQuery Script毎の設定は不要となる。

-- 引用: https://cloud.google.com/bigquery/docs/default-configuration#default_configurations

-- Orgnization-wide configuration
ALTER ORGANIZATION
SET OPTIONS (
  `region-us.default_time_zone `= 'Asia/Tokyo'
);

-- Project-wide configuration
ALTER PROJECT project_id
SET OPTIONS (
  `region-us.default_time_zone` = 'Asia/Tokyo'
);

この現在の設定は INFORMATION_SCHEMAから確認できる

-- 現在のプロジェクトで有効な設定の状態
SELECT * FROM region-us.INFORMATION_SCHEMA.EFFECTIVE_PROJECT_OPTIONS;

-- Organization-wide Configuration 
SELECT * FROM region-us.INFORMATION_SCHEMA.ORGANIZATION_OPTIONS;

詳細は公式ドキュメントを参照するとよい。
https://cloud.google.com/bigquery/docs/default-configuration#default_configurations

References

*1. Timestamp Function | How time zones work with timestamp functions

BigQuery Script による カラムリネージ の抽出

create or replace function `v0.zgensql__clineage_queries`(
  target_dataset struct<project_id string, dataset_id string>
  , table_names array<string>
)
as (
  replace(replace(
    """
    -- Query single column for column lineage
    with lineage_sql as (
      select as struct
        table_catalog, table_schema, table_name, field_path, vhash
        , trim(format(r"select %s from `%s.%s.%s` limit 1"
          , field_path, table_catalog, table_schema, table_name
        )) as query
      from `!METADATA_COLUMN_FILED_PATH!`
        , (select generate_uuid() as vhash)
      where
        array_length(!TABLE_NAMES!) = 0
        OR table_name in unnest(!TABLE_NAMES!)
    )
    select array_agg(c) from lineage_sql as c
    """
    , "!TABLE_NAMES!", format('%T', table_names))
    , "!METADATA_COLUMN_FILED_PATH!", format('%s.%s.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS', target_dataset.project_id, target_dataset.dataset_id)
  )
);

begin
  declare clineage_query array<struct<
    table_catalog STRING, table_schema STRING, table_name STRING, field_path STRING, vhash STRING, query STRING
  >>;

  execute immediate `v0.zgensql__clineage_queries`(struct("bqmake", "zgolden"), []) into clineage_query
  ;

  for r in (select * from unnest(clineage_query)) do
    set @@query_label = array_to_string(
      [
        format("clineage__catalog:%s", r.table_catalog),
        format("clineage__schema:%s", r.table_schema),
        format("clineage__table:%s", r.table_name),
        format("clineage__field_path:%s", replace(r.field_path, ".", "_-_")),
        format("clineage__vhash:c%s", r.vhash)
      ]
      , "," 
    );
    execute immediate r.query;
    set @@query_label = null;
  end for
  ;

  with datasource as (
    select
      metadata.`@type` as type
      , c.timestamp
      , resource
      , protopayload_auditlog.servicedata_v1_bigquery.jobCompletedEvent
      , JSON_VALUE_ARRAY(metadata.tableDataRead.fields) as fields
      , string(metadata.tableDataRead.jobName) as jobName
      , c
    from `bqmake._auditlog.cloudaudit_googleapis_com_data_access` as c
    left join unnest([struct(
      safe.parse_json(protopayload_auditlog.metadataJson) as metadata
    )])
    where
      date(timestamp) >= date(current_date() - interval 1 day)
  )
  , tableReadData as (
    select 
      regexp_extract(jobName, '[^/]+$') as jobId
      , resource.labels.dataset_id
      , resource.labels.project_id
      , json_value_array(nullif(to_json_string(fields), 'null')) as fields
      , to_json(datasource) as _raw
    from datasource
  )
  , jobCompleteEvent as (
    select
      timestamp
      , jobCompletedEvent.job.jobName.jobId as jobId
      , struct(
        jobCompletedEvent.job.jobConfiguration as config
        , jobCompletedEvent.job.jobStatistics as statistics
      ) as job
      , to_json(jobCompletedEvent) as _raw
    from datasource
    where 
      jobCompletedEvent.eventName = 'query_job_completed'
      and contains_substr(jobCompletedEvent.job.jobConfiguration.labels, 'clineage')
  )
  , fmt as (
    select 
      any_value(struct(
        clienage__resource, vhash
      )).*
      , array_agg(
        struct(
          field_path as column
          , struct(
            tableReadData.project_id
            , tableReadData.dataset_id
            , tableReadData.fields
          ) as lineage
          , struct(
            job.statistics.totalSlotMs
            , job.statistics.totalProcessedBytes
            , job.statistics.endTime - job.statistics.startTime as leadTime
          ) as stats
        )
      ) as column_lineage
      , struct(
        min(timestamp) as min
        , max(timestamp) as max
      ) as analyze_span
  --    , job.statistics
    from jobCompleteEvent
    left join tableReadData using(jobId)
    left join unnest([struct(
      struct(
        `bqutil.fn.get_value`('clineage__schema',  job.config.labels) as schema
        , `bqutil.fn.get_value`('clineage__catalog',  job.config.labels) as catalog
        , `bqutil.fn.get_value`('clineage__table',  job.config.labels) as table
      ) as clienage__resource
      , replace(`bqutil.fn.get_value`('clineage__field_path',  job.config.labels), '_-_', '.') as field_path
      , `bqutil.fn.get_value`('clineage__vhash',  job.config.labels) as vhash
    )])
    group by format('%t', (clienage__resource, vhash))
    order by analyze_span.min desc
  )

  select * from fmt
;
end
;

SQLの書き換えによるSQL生成システム

SQL が持つ開発体験上の問題点

プログラミングであるような ライブラリへの切り出しやコンポーネント化が難しい

  • 仕様上テーブルの識別子が埋め込みであるため集計などの操作のSQLの再利用できないため、CTEの再利用もできない
  • 複雑化したSQLでは多段のCTE/サブクエリが発生するが、これらを検証する術が乏しく負債化しやすい。

従来型のテンプレートエンジンシステム

これらの問題を解決するデータモデリング用のフレームワークでは
SQLのテンプレートを利用したSQL生成方法が一般的である。

SQLテンプレート型エンジンの開発体験上の問題点

テンプレートエンジンはSQLが持つ表現能力の問題点の多くを解決するが、同時に次のような問題を発生させる。

  • SQLテンプレート自身が有効なSQLではない
    • SQLコードに対するフォーマッターやSQLエンジン、コード解析ツールといったエコシステムのサポートの負担
    • 生成されるSQLが実行しないと評価できないことも多く、コード規模が大規模化にするにつれ急激に開発体験が悪化する
  • テンプレートエンジン と SQLエンジンの組み合わせ的複雑化
    • SQL自身は複雑な言語であるが、この上さらにテンプレートやマクロによる生成規則の複雑な情報が重なることから煩雑になる.
    • テンプレートによるSQLの生成は解析や型制約を伴わないためバグの温床になる. このためSQLテンプレートをコードベースとして扱うときに中長期にテンプレートやマクロが負債になりやすい

提案コンセプト: 有効なSQLの有効なリライトのみによるSQL生成

SQL上安全な書き換えを提供し、それによるSQL生成アプローチをとることで
テンプレートエンジンの問題点を解決する

  • クエリの実装はあくまでSQLなので、SQLに対するエコシステムの恩恵が受けられる
  • 書き換えを前提とするため、すべての定義はひとつのSQLで表現する必要がる
  • この操作はすべてBigQuery上で操作可能であるため、外部依存を減らせる

安全な書き換え例1: Final 節の置き換え

with A as (
  select ix, ix * ix as ix2  from unnest(generated_array(1, 100, 1)) as ix
)
, B as (
 select avg(ix2) from A
)
, C as (
 select count(1) from A
)

select * from A
-- 以下のfinal節に置き換えて利用したい
-- select * from B
-- select * from C

安全な書き換え2: テーブル参照子の置き換え

同一スキーマであれば、そのテーブルの参照子は書き換え可能である

with 
super_large_A as (
  select ix, ix * ix as ix2  from unnest(generated_array(1, 100, 1)) as ix
)
, A as (
  -- デバッグ利用のみで利用
  select ix, ix * ix as ix2  from unnest(generated_array(1, 100000000, 1)) as ix
)
, __constrains_A_table as (
  -- Aとsuper_large_Aは同一スキーマである必要がある.
  select * from A
  union all
  select * from super_large_A
)
, B as (
 select avg(ix2) from A
 -- 本番時は以下に書きかえて集計したい
 -- select avg(ix2) from super_large_A
)
, C as (
 select count(1) from A
 -- 本番時は以下に書きかえて集計したい
 -- select avg(ix2) from A
)

select * from C

実行例

一般公開データセット bqmake.bqtest にデモを用意した。

以下の例では、SQLの安全な書き換えを行う bqmake.bqtest.zbqt_gensql__remake_view UDFを用意し
bqmake.bqtest.demo_sample_view に対し書き換えを実施する。
対象ViewはBigQueryによって有効なSQLであることが担保されるため、安全な書き換え操作により
有効なSQLが生成できる

-- bqmake.bqtest.demo_sample_view
with datasource as (
  select * from `bigquery-public-data.google_trends.top_terms`
)
, datasource_sampled as (
  select * from `bigquery-public-data.google_trends.top_terms`
  TABLESAMPLE SYSTEM (5 percent)
)
, __test_count as (
  select count(1) from datasource
)

select * from datasource
execute immediate `bqtest.zbqt_gensql__remake_view`(
   'demo_sample_view', '__test_count', [('datasource', 'datasource_sampled')]
);

これは次のSQLが実行される

-- bqmake.bqtest.demo_sample_view
with datasource as (
  select * from `bigquery-public-data.google_trends.top_terms`
)
, datasource_sampled as (
  select * from `bigquery-public-data.google_trends.top_terms`
  TABLESAMPLE SYSTEM (5 percent)
)
, __test_count as (
  select count(1) from datasource_sampled 
)

select * from __test_count

bqmake.bqtest.zbqt_gensql__remake_view は データセットプライベートなUDFである。
UDFであることでプロシージャなどと違いUDFで完結するため、セキュリティ安全性が保ちやすい.
他データセットのviewを利用するためには UDFを適切に複製する必要がある。
この機能はは bqmake.bqtest.init__bqtest_dataset により提供される。

どのような場合に役立つか?

SQLに対するテストコード/データの作成

SQL単体では最終出力のみでしか検証できないが、この書き換え操作が実現できると
複数のCTEが存在する場合において、例えば、ユニークが保たれているかなど
途中の任意のCTEに対しデータ検証を行うSQLを書くことができる。

識別子の書き換えにより、ローカル開発時にはモック用データを利用といった操作もできる。

同スキーマテーブルに対する操作の一元化

データアプリケーション開発などで発生するのが、スキーマが同一でデータ生成方法が異なるテーブルを生成するケースである。
この時に、ユニークネスやデータの新鮮さといった共通のデータバリデーションを行う。
この操作をViewとして定義し、上記の書き換えによるSQLを行うことで共通化されたバリデーションが行える。

BigQueryのオプション引数の実装パターン

概要

BigQueryにおいては、UDFやTVFまたはプロシージャにおける引数の省略が構文上できないため
可変長な引数を定義することができないため
挙動を変更する新しい引数を追加するために破壊的変更を生むことになってしまう。

この記事ではBigQueryにおけるOptional 引数の実装方法について載せる

通常のオプション引数の実装

関数においてデフォルト挙動が定義できるオプション引数を設けるとき
struct型を利用して次のように記述すると便利である。

  • null を渡すことで、引数の入力を省略できる。
  • 明示的な型が定義できるため、入力に対するバリデーションが可能
create or replace function `sandbox.sample_udf`(X int64, _options struct<param1 int64, prefix string >)
as
(
  format('%s %d', ifnull(_options.prefix, 'PREFIX: ' ), X + ifnull(_options.param1, 0)) 
)

ARRAYを使うパターン

  • ARRAYにより key, valueのシンプルな仕組みになる
  • ユーザによる castによるstringへの型変換をする手間が必要。
    • すべてのvalueをcast(X as string) としていかなければならない
    • pivotを利用する関係でoptionの引数定義を2か所で管理する必要がある(バリデーション用とパース用)
    • structからarrayへの変換のためにはJSのUDFを利用する必要があり、この操作の隠蔽がしづらい
    • 関数内部のパース処理が長くなりやすい
create or replace function `sandbox.sample_udf`(X int64, arr_options array<struct<key string, value string>>)
as
((
  with _options as (
    with options as (
      select * from unnest(arr_options) option
      where if(
        key in ('param1', 'prefix')
        , true
        , error(format("Invalid Option: %t", option))
        )
    )
    select as struct
      -- 任意の型にParseする
      cast(param1 as int64) as param1
      , cast(prefix as string) as prefix
    from options
      pivot (any_value(value) for key in ('param1', 'prefix'))
    limit 1
  )
  
  select format('%s %d', ifnull(_options.prefix, 'PREFIX: ' ), X + ifnull(_options.param1, 0))
  from unnest([0]), _options
))

JSONを使うパターン

  • jsonはベータの機能
  • 可読性は高い
  • パース処理は簡潔に書ける
  • バリデーションの関係で内部的にJS UDFへの依存が発生する
create or replace function `sandbox.sample_udf`(X int64, json_options JSON)
as 
((
  with _options as (
  select as struct
    safe.int64(json_options.params1) as param1
    , safe.string(json_options.prefix) as prefix
    -- Assert options
    , (
        select logical_and(if(
          key in ('correct')
          , true
          , error(format("Invalid Option: name=%t in %t'", key, json_options))
        ))
        from unnest(`bqutil.fn.json_extract_keys`(to_json_string(json_options))) key
    ) as _assert
  )
  
  select 
    format('%s %d', ifnull(_options.prefix, 'PREFIX: ' ), X + ifnull(_options.param1, 0))
  from unnest([0]), _options
  where _options._assert
))

SQLで正規分布の乱数生成

  select 
    -- normal distribution: box-muller
    sqrt(-2 * ln(rand())) * cos(rand() * 4 * atan(1.0)) as r
  from
    unnest(generate_array(1, 10000)) as v 

Analytics Hubからbqfunデータセットを利用する: 法人情報および祝日情報データセットの利用

概要

https://t.co/2BsGOQlnp4

  • Analytics Hubを使うことで、他人が公開しているデータセットを自身のデータセットのように利用できる
  • gBizINFO法人情報は、法人に関する所在、資金調達情報といった種々のデータが利用できる

bqfunのデータセットを利用する

データセットを使ってみる

追加の手順を進めることで、プロジェクト内にアクセス可能なデータセットができる:

祝日 データセット
SELECT * 
FROM `jp_holidays_preprocessed_by_bq_fun.holidays` 
where extract(year from date) between 2016 and 2022
order by date 
法人 データセット
# 特許の分布と、特許数の多い企業名について調べる
with base as (
  SELECT 
    corporate_number, count(1) as n_patent
  FROM `project-id-7288898082930342315.gbizinfo_preprocessed_by_bq_fun.patent` 
  group by corporate_number
)
, stats as (
  select 
    approx_quantiles(n_patent, 20) as q20tiles
    , approx_top_sum(corporate_number, n_patent, 10) as top10
  from base
)
, report as (
  SELECT 
    *
    , array(
      select as struct
        kv.value, name, kv.sum
      from unnest(stats.top10) kv
      left join `project-id-7288898082930342315.gbizinfo_preprocessed_by_bq_fun.basic`
        on value = corporate_number
      order by kv.sum desc
    )
  from stats
)

select * from report

image

Analytics Hubを使っていて気になったところ

今回、Analytics Hubをはじめてつかったところで気になったところについて書いておく

  • 2022/06/02 現在で Analytics Hubの検索導線からアジアリージョンが選択できない (US, EUのみ)

    • 検索導線よりも、リンク導線で辿った.
  • データセットのロケーション:

    • データセットは自身のデータセットリージョンに配置されるようになっている(今回の場合は, asia-northeast1)
    • region違いのデータは、利用する際に様々な制約を受けるので、複数データセットを利用するときに心配

BigQuery Design Guide

複数のチーム、複数人によるデータセットの管理においては

Consistency is imperative quality for User Experience

一貫性は最も注意すすべき品質である。
すべてのデータセットは一定の規則を元に構築されていることが望ましい。
これは命名やドキュメンテーション、メタデータ管理に関する一貫性を必要とする。

これは利用者に便益を生む。
ユーザは、データセット、テーブル、カラムなどの一覧を見るだけで
直感的かつ書くべきSQLが想像できることが好ましい。
また機能の「不足/不可能」がつたわることも重要なポイントである。

SQLによる分析はテーブルの結合操作により利用来ることが多い。
単一のデータセットによってすべての分析をまかなうことは難しいため
単一のデータセットに限らない一貫性が必要である。

Dataset as API

公開されたデータセット上に配置されるすべてのテーブルおよびルーティンはAPIとしての運用が必要となる。

https://cloud.google.com/apis/design

Useful one wide-table rather than normalized tables in a dataset

同一のPKを持つ複数のテーブルをユーザに公開することは避けるべきである。
ひとつのテーブルであることは単一データセットの複数テーブルであることよりよく
単一データセットの複数テーブルであることは、複数データセットは複数データセット複数テーブルであることよりよい。
ユーザが担うべき認知負荷は最小になるとよい。

ユーザが利用するテーブルは正規化されたテーブルを個別にJOINさせるような
一つのテーブルにおいてすべてのカラムがそろっているワイドテーブルを目指す方がよい。
同一のデータセットのテーブル内において、同一のPKを持つテーブルの複数用意し
ユーザに使い分けやJOINを強いることはなるべく避けた方がよい。

命名による並び替えを意識する

データセット、テーブルは制約上 Alphabetical Orderにより表示される。
人間か認知できるのはせいぜい4-5個の片手で数えられる数だけである。
これは利用者の目線に立つと大きい制約である。

利用頻度が高くなるデータセットおよびテーブルほど、並び順が最初にこなければならない。

次のような単語を利用するとよいだろう。

  • core

Resource Access Modification

多種多様なデータセットの共有において最も避けるべき複雑さは依存の管理による複雑さである。
「依存されうるものは依存される」という [[Hyrumの法則]] にしたがって
データセット開発者は依存される要素を減らすことが望ましい。

特定のデータセットに対して、ユーザが利用すべきテーブルと利用すべきではないテーブルは命名から自明でわかるとよい。
「命名による並び替えを意識する」と合わせて考えると prefixとして zなどを用いることで
ユーザが利用すべきテーブルが上位に来る。

dataset
|- core: ユーザが利用利用可能なPraimary Table
|- zcore__segment1: ユーザ利用を許容しないテーブル1. coreの構築に必要なテーブル
|- zcore__segment2: coreを作成するために必要なセグメント定義2
...

Practical Dataset Design Pattern

Entity

特定のPKに対する属性情報を取りまとめたデータセットである。

  • PKに対する保守された最新情報を公開するテーブル
  • Slowly Changing Dimension に対する履歴機能つきのテーブル
ent__[entity_nane]
|-@routine
|  | - history: as-wasの復元データの公開I/F
|- core: as-isの公開テーブル
|- zhistory:  履歴テーブルの内部データ
|- zindex__segmentX: materialized_viewによるクラスタインデックス
|- zmonitor__YYYY: 品質確認用に用いられるデータソースに関するサマリーテーブル

Sink Source Table

このデータセットはデータソースに対するまとめあげまたはユースケースにもどついたデータソースの分割を担う。

log__[log_nane]
|- all
|- android
|- iOS
|- web
|- zmonitor__YYYY: 品質確認用に用いられるデータソースに関するサマリーテーブル

Related Other Architecture

Data Vault 2.0

これはあくまで実装に対するプラクティスであり、ユーザにとって優れたインタフェースを提供するための考え方ではない。
特にRaw Vaultはユーザ向きではない。 (Data Vault 2.0自身も Data Martの存在を仮定している。)

SatelliteやHubは ユーザが利用する際には、 ひとつのワイドテーブルになっていることが望ましい。

またLinkテーブルは1:1, N:N, 1:Nの曖昧性を解消することはできないし1:1においても曖昧性が残るため
スキーマだけでは曖昧性を解消しきれないコンテキストの高いテーブルである。これはユーザにとっても利用のしづらさを生む

SELECT文内で assertを手軽にする

値の比較を行いたい時にassertのような関数が一つあると便利である。
BigQueryにはerror関数があるがこれをラップすることで次のようなassertを用意すると便利である

create temp function `assert`(expected ANY TYPE, actual ANY TYPE)
as (
  if(expected = actual, null, error(format("Expected %T: %T", expected, actual)))
);

select assert('hoge', 'hoge')

関数をひとつはさむことで、タイプチェックにもなる

wip: 削除用のインターフェースのデザイン

概要

「削除」に関するインタフェース設計は、開発者が最も気をつかう箇所のひとつだといえる。
「削除」操作は、基本取り返しがつかない破壊的な変更であるためだ。

「削除」を安易に便利にしてしまうことで、ユーザの意図しない誤入力などによって影響範囲の広い事故を起こしてしまうだろう。
例えば「rm -rf /」といった有名なコマンドは 削除コマンドに対して 便利な再帰的操作が組み合わさることによって
便利ではあるが数多くのプログラマを悩ませることとなった。現在では --no-preserve-root オプションなしに 削除操作はは実施できないように修正されている。

この記事では「削除」に関するインターフェースについていくつかの方法を列挙し、その特性について検討を行う

1. 「ゴミ箱」パターン: 論理削除によって取り戻し可能にする

ゴミ箱、DBでいえば論理削除という形で、アプリケーション上無効とするフラグを別途用意し
削除操作の取り返しがつくようにする操作がこのパターンである。

論理的に無効となった対象は一定期間をおいて物理削除される。

例: DBの論理削除, OSのゴミ箱

2. delete用フラグ

delete用のフラグオプションを別途設けることで、通常のコマンド機能に削除機能を拡張する形
コマンドを新しく設けずに既存のコマンドの一貫性を底わない形で実現できるとシンプルでかっこいい

例: git push --delete

3. 対話的な確認

削除操作を伴う場合、ユーザによる対話的応答による確認を求めるパターン
GUIなどの単一的な操作に向いている.

CLIにおいては -f, --force オプションなどをともに用いることで確認操作を省略することができる

4. State 管理パターン

例: terraform

Data Studioで使える分布機能付きのグラフ

GA4 Penguins
image image
{
  "$schema": "https://vega.github.io/schema/vega-lite/v5.json",
  "background": "black",
  "transform": [
    { "as": "v1", "calculate": "datum['$metric0']"},
    { "as": "d1", "calculate": "datum['$dimension0']"}
  ],
  "vconcat": [
    {
      "mark": "area",
      "transform": [
        {
          "density": "v1",
          "groupby": ["d1"]
        }
      ],
      "encoding": {
        "x": {"field": "value", "type": "quantitative", "title": null, "axis": {}},
        "y": {"field": "density", "type": "quantitative"},
        "color": {"field": "d1", "type": "nominal"},
        "opacity": {"value": 0.5},
        "gird": false
      }
    },
    {
      "mark": {
        "type": "boxplot",
        "extent": "min-max"
      },
      "encoding": {
        "color": {"field": "d1", "type": "nominal", "legend": null},
        "y": {"field": "d1", "type": "nominal"},
        "x": {
          "field": "v1",
          "type": "quantitative",
          "scale": {"zero": false}
        }
      }
    }
  ]    
}

wip: BigQueryにおけるスキャン量の削減方法についてのプラクティス

BigQuery の 分割テーブル について

BigQueryにおいてはスキャン量を削減する仕組みとして分割テーブルが存在する。

  • 基礎: カラム選択による削減
  • 日付シャーディングテーブル: TABLE_SUFFIX 疑似カラムによるスキャン量の削減. `'project.dataset.table*'で指定するやつ
  • パーティションテーブル: _PARTITIONTIME 疑似カラム や パーティションカラム によるスキャン量の削減
  • クラスタインデックスによる削減
  • マテリアライズドビューによるクエリ書き換えによる削減
  • 検索インデックスによる削減

パーティションのプルーニングに関する機能

  • パーティションカラムに対するプルーニングの評価

  • SELECT式におけるGROUP BY
    -疑似カラムに対する計算

  • ViewやTVFにおいてもパーティションの機能

WITH 
BASE AS (
  SELECT 
    _PARTITIONTIME as __PARTITIONTIME
    , #... 
  FROM
    mytable
)
, AGG AS (
  SELECT 
    __PARTITIONTIME
    , # ... 
  FROM
    BASE
  GROUP BY
    __PARTITIONTIME, # ...
)

SELECT 
  *
FROM 
  AGG
WHERE
  __PARTITIONTIME = TIMESTAMP("2017-10-01") -- 擬似列を利用しスキャン量が抑えられる
  • シャーディング用のワイルド―カードにおけるプルーニング
  • クラスタリングインデックスにおけるプルーニング

BigQueryにおけるテーブルスキーマの整合性の検証

テーブルが特定のスキーマと同一の型表現を持つことはしばしば重要である。
SQLにはや、JOINやワイルドカード表現含め最終的なテーブルの出力形式は暗黙的に解決する方法も多く
暗黙的な型変換やNULLに関する型推論によりテーブルの出力結果の型が意図せず変えてしまうことも多い。
こういった場合において、テーブルのカラム群の整合性を確認できることは便利である。

例えば、本番用のテーブルと現在開発中のテーブルの整合性が確認できることで
変更の際の懸念をひとつ減らすことができる。

この記事では、テーブル間のスキーマの整合性を確認する手段の方法について検討する

準備

この記事では、次のような形で テーブルが物理化されていることを前提に実行を行う。

-- データセット用意
create schema if not exists `check_table_schema`;

-- 参照テーブル: 整合性を確かめる
create or replace table `check_table_schema.reference_table`
as
select * from `bigquery-public-data.austin_bikeshare.bikeshare_stations`
;

確認したいSQL

select * replace (cast(station_id as STRING) as station_id)
from `check_table_schema.reference_table`

INFORMATION_SCHEMA.COLUMNS による 確認

カラムの整合性を確認するための、最も単純な方法は INFORMATION_SCHEMAを確認し
テーブルのスキーマを確認する方法である。

これは次のように確認することができる。

create or replace table `check_table_schema.target_table`
as
select * replace (cast(station_id as STRING) as station_id)
from `check_table_schema.reference_table`
;

with table1 as (
  select *
  from `check_table_schema.INFORMATION_SCHEMA.COLUMNS`
  where table_name = 'reference_table'
)
, table2 as (
  select
    *
  from `check_table_schema.INFORMATION_SCHEMA.COLUMNS`
  where table_name = 'target_table'
)
select
  if(
    count(1) > 0
    , error(
      string_agg(
        format('%t', (
          column_name
          , lhs.data_type
          , rhs.data_type
          , compare_result
        ))
      )
    )
    , "PASSED"
  ) as msg
from table1 as lhs
full join table2 as rhs using(column_name)
left join unnest([STRUCT(
  case 
    when lhs.data_type = rhs.data_type  then "STRICT_MATCH"
    when lhs.data_type != rhs.data_type then "STRICT_UNMATCH"
    when lhs.data_type is not null then "EXISTS_ONLY_LEFT"
    when rhs.data_type is not null then "EXISTS_ONLY_RIGHT"
  end as compare_result
)])
where compare_result not in ("STRICT_MATCH")

メリット

  • カラム名の一致および型の一致を厳密に確認することができる

デメリット

  • 対象のSQLが物理化されていることが必要。
  • スキャン量やスロット消費が一定かかる

緩いカラムの型一致

最初の手段ではスキャン量がかかる型確認方法だったが、これをゼロコストで達成したい。
つまりスキャンコストやスロット消費が無い状態で確認する状態の達成を目指す。

これは次のようなクエリで確認することができる。

with core as (
  select * replace (cast(station_id as STRING) as station_id)
  from `check_table_schema.reference_table`
)
, _validation as (
	-- ※ カラム名一致や厳密な型一致は取れない
	select * from `check_table_schema.reference_table`
	union all
	select * from core
	limit 0
)
select * from core
--> Column 1 in UNION ALL has incompatible types: INT64, STRING at [9:9]

ただし、この方法では UNION ALLオペレータでは暗黙的な型変換を許容するため、
次のようなケースではエラーを発生させることができず、緩い等価性で評価される。

with core as (
  select * replace (cast(station_id as BIGNUMERIC) as station_id)
  from `check_table_schema.reference_table`
)
, _validation as (
  -- ※ カラム名一致や厳密な型一致は取れない
  select * from `check_table_schema.reference_table`
  union all
  select * from core
  limit 0
)
select * from core

メリット

  • カラム数と緩い等価一致の確認ができる
  • CTEとして組み込みが可能で、DRYRUNなどでの検証も可能
  • ゼロコスト: スキャン量やクエリコストがかからない

デメリット

  • カラム名の合致は比較できない
  • 緩い等価確認になる

厳密なカラムの型一致

union all などによる set operatorでの確認では、簡易的で利用しやすい分暗黙的変換が行われる。
厳密な型一致で比較できない点で不便である。
これを型レベルで比較するには、次のようにすると良い。

with core as (
  select * replace (cast(station_id as BIGNUMERIC) as station_id)
  from `check_table_schema.reference_table`
)
, _validation as (
  select
    array(select as struct * from `check_table_schema.reference_table` limit 0)
    || array(select as struct * from core limit 0) --> Error
)
select * from core

メリット

  • カラム数と厳密なカラムの等価性を確認できる
  • CTEとして組み込みが可能で、DRYRUNでの実行時の検証も可能
  • ゼロコスト: スキャン量やクエリコストがかからない

デメリット

  • カラム名の合致は比較できない

テーブルの整合性の厳密一致

SQL単体では、テーブルのカラム型までは確認できるが、カラム名やカラム名の順序までは比較することができない。
BigQuery Scriptを利用し、ここまでの方法を発展させるとクエリの実行コストを抑えた
テーブルの型検証を行うことができる。

-- dee テーブルを構築
create temp table _skelton_dee
as
select 
  (
    select as struct
     (select as struct * from `check_table_schema.reference_table` limit 0).*
  ) as lhs
  , (
    select as struct
    (
	  select as struct * replace (cast(station_id as STRING) as station_id)
	  from `check_table_schema.reference_table` limit 0
	).*
  ) as rhs
;

select
  if(
    array_length(`bqutil.fn.json_extract_keys`(to_json_string(lhs))) = (
      select
        countif(lhs = rhs)
      from 
        unnest(`bqutil.fn.json_extract_keys`(to_json_string(lhs))) as lhs with offset ix
        , unnest(`bqutil.fn.json_extract_keys`(to_json_string(rhs))) as rhs with offset iy
    )
    and [lhs] || [rhs]
    , "PASSED"
    , error('table schema is not matched')
  )  
from _skelton as r

上記、手順は煩雑なためプロシージャなどで用意すると良いかもしれない。

メリット

  • 厳密な型一致とカラム名およびその順序一致を確認することができる
  • ニアゼロコスト: スキャン量はゼロだが、微量のスロット消費が存在する.
    デメリット
  • CTEとして組み込みができないため、実行時間がかかる。

まとめ

この記事では、SQLが生成するテーブルの整合性を確認するための方法を紹介した。
ここで紹介した検証方法を使うことで、スロット消費を限りなく抑えた型確認をSQL単体で、実行することができる。確認項目と用途に応じて選べると良いだろう

BigQueryでNewton法

BigQueryでNewton法

Newton法では次の漸化式で、Nの平方根を求めることができる:

$$ x_{n+1} = x_{n} - \frac{x^2 - N}{2 x} $$

漸化式で表されるものは、WITH RECURSIVEで表すことができる.
この実装は次のようになる。

このSQLにおいては、 2~1000までの数字の平方根を求めることができる。

with 
  recursive newton as (
	-- Initial step
    select 0 as n, 1 + 0.5 * (N - 1) as x, N as tgt from unnest(generate_array(2, 1000)) as N
    union all
	-- Update step
    select 
      n + 1
      , x - (x * x - tgt) / (2 * x)
      , tgt
    from newton
    where 
      -- Maximum recursive depth 
      n < 98
      -- tolerance error
      and abs(x * x - tgt) > 1e-06
  )

  select *, tgt - x*x as e from newton
  qualify n = max(n) over (partition by tgt) 
  order by tgt

ダッシュボードの究極のデータソースを考えてみる

主なパターン

  • 分析ユニットテーブル型:

    • BIツールに集計操作を任せる
    • イベントテーブルだけ取り扱う
    • 集計しないのでデバッグが楽。ストレージは多め。計算コスト高め
    • セグメントの追加が容易
  • ワイドテーブル型

    • additive
    • イベントテーブルだけ取り扱う
    • 集計しないのでデバッグが楽。ストレージは多め。計算コスト高め
  • メトリクス ピボットテーブル型

    • 指標名にラベルなどが組み込みやすい

プラクティス

  • ピボットテーブル

  • 1ページ 1 データソース

    • 複数データソースのページは多かれ早かれ苦労する
  • Additive or Non-Additive な メトリクスか

    • Additive: 再集計が部分集計結果の単純な集計操作で実現可能な指標. 総和や最大値、最小値は 部分集計された結果より集計可能である。ユニーク値計算の際のHyperLogLogのような Sketch も含む。
    • Non-Additive: 上記以外の指標
  • 配列: indexを明示的に付与しておくと無難

  • rollupやcubicオペレータを使いこなす: subotalを 集計せよ。セグメントの比較. 総計と小集計により比較可能にする

理想のテーブルの形式

こうなるとよい

解説 分析ユニット セグメント メトリクス
日時 ユーザ アイテム 日時 ユーザ アイテム Additive Debug
日時のサマリ null null
null
null
ユーザのサマリ null
  • 分析ユニット: BIでは表示しないし、使わないがBI上で必ずフィルタするために必要な列
  • セグメント: BI上で探索の際に利用するセグメント
  • メトリクス: 数値
-- Dashboard ultimate datasource
/*
create or replace table `sandbox.test2`
partition by date(time_id)
cluster by unit
as 
*/
with
 datasource as (
  select
    struct(
      timestamp_trunc(timestamp, hour) as time_id
      , user_pseudo_id as user_id
      , item.item_id
    ) as units
    , struct(
      device
      , geo
    ) as user
    , item as item
    , event_name as signal_type
    , struct(
      -- ID類
      timestamp
      , ecommerce.transaction_id
      , ecommerce.purchase_revenue
      , page_id
      , user_pseudo_id as device_id
      , user_id
      , session_id
      , item_id
      , 1 as first_visit_per_day
      , 1 as first_purchase_per_user
    ) as signal
  from
   `bigquery-public-data.ga4_obfuscated_sample_ecommerce.events_20210131`
  left join unnest([struct(
    timestamp_micros(event_timestamp) as timestamp
    , `bqutil.fn.get_value`('ga_session_id', event_params).string_value as session_id
    , `bqutil.fn.get_value`('page_location', event_params).string_value as page_id
  )])
  left join unnest(items) as item
)
, grain as (
  select
    any_value(units) as units
    , any_value(user) as user
    , any_value(item) as item
    , struct(
      count(1) as records -- 型ごとに主要統計量の取り方が定まる
      -- timestamp signal

      , struct(
        countif(signal.timestamp is not null) as nonnull
        , min(signal.timestamp) as min
        , max(signal.timestamp) as max
      ) as timestamp 
      -- identifier signal
      , struct(
        countif(signal.transaction_id is not null) as nonnull
        , hll_count.init(signal.transaction_id) as hll
      ) as `order` 
      -- numeric
      , struct(
        countif(signal.purchase_revenue is not null) as nonnull
        , sum(signal.purchase_revenue) as sum
      ) as revenue
    ) as metrics
  from
    datasource
  group by
    format('%t', units)
)

, core_stats as (
  select
    -- Unit Metrics
    any_value(_group_key).*
    -- Unit Metrics
    , struct(
      nullif(approx_count_distinct(string(grain.units.time_id)), 0) as time
      , nullif(approx_count_distinct(grain.units.user_id), 0) as users
      , nullif(approx_count_distinct(grain.units.item_id), 0) as items
      , approx_count_distinct(ifnull(grain.units.user_id, 'none') || '-' || ifnull(grain.units.item_id, 'none')) as user_item
    ) as unit_metrics
    -- Additive Metrics
    , struct(
        sum(metrics.records) as records
      , struct(
          sum(metrics.timestamp.nonnull) as nonnull
        , min(metrics.timestamp.min) as min
        , max(metrics.timestamp.min) as max
      ) as timestamp
      , struct(
          sum(metrics.order.nonnull) as nonnull
        , hll_count.merge_partial(metrics.order.hll) as hll
      ) as `order`
      , struct(
          sum(metrics.revenue.nonnull) as nonnull
        , sum(metrics.revenue.sum) as sum
        ) as revenue
      ) as metrics
  from grain
--    left join `user` using(user_id)
--    left join `item` using(item_id) -- segments
    left join unnest(
      [struct(
        struct(
          user.device.category as category
          , user.geo.country as country
        ) as user
      , struct(item.item_category) as item
      , struct(
          extract(
            dayofweek
            from
              units.time_id
          ) as dayofweek
        ) as time
      )]
    ) as context
    left join unnest([
      -- Overall
      struct(
          if(false, null, context.user) as user
        , if(false, null, context.item) as item
        , if(false, null, context.time) as time
      )
      -- Time Specific
      , struct(
          if(false, null, context.user) as user
        , if(false, null, context.item) as item
        , if(true, null, context.time) as time
        )
      -- Item Specific
      , struct(
          if(false, null, context.user) as user
        , if(true, null, context.item) as item
        , if(false, null, context.time) as time
        )
      -- User Specific
      , struct(
          if(true, null, context.user) as user
        , if(false, null, context.item) as item
        , if(false, null, context.time) as time
    )]) as segments -- marginalization for units
    left join unnest([struct('item' as unit, units.item_id as id), null]) as item
    left join unnest([struct('user' as unit, units.user_id as id), null]) as user
    left join unnest([
      struct('week' as unit, datetime(timestamp_trunc(units.time_id, week)) as id)
      , struct('day'  as unit, datetime(date(units.time_id)) as id)
      , struct('hour' as unit, datetime(units.time_id) as id)
      , null
    ]) as time
    left join unnest([
      struct(struct(time, user, item) as units, segments)
    ]) as _group_key

  group by
    -- unit
    format('%t', _group_key)
)

select
  ifnull(nullif(array_to_string([units.time.unit, units.user.unit, units.item.unit], ' x '), ''), '#overall') as unit
  , units.time.id as time_id
  , units.user.id as user_id
  , units.item.id as item_id
  , fn.json_pretty_kv(fn.json_trim_empty(to_json_string(segments)), ', ', null) as segment_label
  , * except(units) 
from core_stats
ダッシュボードで日時変化をみたい!
  • ユーザ / アイテム を nullを 指定する
ユーザの分布を知りたい
  • 日時 - アイテム を null を 指定し、 quantilesなどを計算する

Additiveな指標以外は原則BI上で定義

  • Additiveな指標のみ sumが使える。roll upしたときにはmaxを使うことで最大値が出せる
  • Non-Additiveな指標な指標はレコード数が1の時のみ有効な数として表示可能

WIP マテビューより使いやすく、動的にテーブルを自動リフレッシュする: BigQueryでのbqmakeの紹介

bqmake

  • ワークフローオーケストレーションのためのサービスの運用が不要になる
    • BigQueryで簡潔

BigQueryにおけるマテリアライズドビュー

マテリアライズドビューは、SQLによる定義を与えると、参照元の更新時刻よりデータ更新を自動的に行う仕組み。

  • 事前計算により計算結果のキャッシュ機能として機能する. 計算リソースの削減
  • 参照元と参照先の更新日時の関係からデータが自動更新されるため、ワークフローに基づいたジョブ更新が不要
  • (BigQuery) スマートクエリ書き換えなどによる元テーブルへのクエリの効率化

といったあたりでメリットがある。

詳細は以下を参照
https://cloud.google.com/bigquery/docs/materialized-views-intro?hl=ja#overview

BigQuery標準のマテリアライズドビューの問題点

  • SQLで対応する機能が限定的

    • 集計関数でつかえない機能が多く強い制約を持っている。
    • arrayやstructといった型の機能が対応していない
  • パーティションテーブルの更新のためのアライメント戦略も限定

    • 日付と1:1対応でなければならない
    • 日時 → 週次などの対応関係は作れない
    • 複数のテーブルのjoinのrefreshも可能だが、その更新基準は最初の参照テーブルなど限定的
  • BigQuery上のテーブルのpreview機能やBIサービスでデータ接続で不便

    • Data StudioのBI Engineの対象外

マテビューは強力な機能ではあるのだが上記の点不満があり
モデリングを行う上で最初の選択肢になりづらい点で難しい。

bqmake

マテビューの自作更新メリット

マテビューの問題点が解消され、利用用途が増える。
スケジュールクエリのみで実現可能なように作れば、ワークフローツールも不要でモデリングを進めることができる。
ツールへの依存を少なく作ることができるためモデリングのための第一候補にもあげやすい

wip: マテリアライズドビューを利用したデータモデリング

概要

  • BigQueryのマテリアライズドビューの機能を中心としたモデリング方法について紹介
  • マテリアライズドビューを中心としたモデリングを行うことで次の恩恵を受けることができる
    • ワークフローツールレスなデータ基盤の運用
      • BigQueryであればスケジュールクエリだけでの運用が可能
    • 宣言的な依存関係/Staleness の構築

データ基盤における2つのコンセプト: ワークフロー v.s. データフロー

データフロー型のデータ基盤を実現する

dbt (dataform)

  • 複数のツールをサポートしている

Native Materialized View

BigQueryやSnowflakeといった代表的なDWHサービスでは、マテリアライズドビューがサポートされている。

マテリアライズドビューの更新機能をBigQueryで実装する

BigQuery の Materialized Viewの概要

マテリアライズドビューは、SQLによる定義を与えると、参照元の更新時刻よりデータ更新を自動的に行う仕組み。

  • 事前計算により計算結果のキャッシュ機能として機能する. 計算リソースの削減
  • 参照元と参照先の更新日時の関係からデータが自動更新されるため、ワークフローに基づいたジョブ更新が不要

といったあたりでメリットがある。

詳細は以下を参照
https://cloud.google.com/bigquery/docs/materialized-views-intro?hl=ja#overview

BigQuery標準のマテリアライズドビューの問題点

  • SQLで対応する機能が限定的

    • 集計関数でつかえない機能が多い
    • arrayやstructといった型の機能が対応していない
  • パーティションのアライメント戦略も限定

    • 日付と1:1対応でなければならない
    • 日時 → 週次などの対応関係は作れない
  • BigQuery上のテーブルのpreview機能やBIサービスでデータ接続で不便

    • Data StudioのBI Engineの対象外

マテビューは強力な機能ではあるのだが、上記の点不満があり
モデリングを行う上で最初の選択肢になりづらい点で難しい。

マテビューの自作更新メリット

マテビューの問題点が解消され、利用用途が増える。
スケジュールクエリのみで実現可能なように作れば、ワークフローツールも不要でモデリングを進めることができる。
ツールへの依存を少なく作ることができるためモデリングのための第一候補にもあげやすい

wip: データ基盤における異常時におけるデータ公開のデザインパターン

概要

  • 可用性: 分析利用者が常にデータを使うことができるか
  • 整合性/一貫性/妥当性: DWHが提供するデータに質的に問題がないかあるか
  • freshness: データが最新かどうか

ALWAYS WELL: 未検証データをそのまま公開する

デフォルトのパターン。
データは検証が保証されない状態で公開される。
常に最新の状態で公開される。
異常がある場合に、組織的に周知可能な場合に利用できるだろう。

STAGE-CHECK-EXCHANGE : 検証を行い交換する

dataset
- public:  ユーザが利用するテーブル
- _stage:  ETLの処理により読み込まれるテーブル
  1. ETLツールは _stageに最新のデータなどを読み込む
  2. _stageに対し、データのチェック
  3. _stageが問題ない場合、publicとテーブルを入れ替える
  • かならず正常なデータが利用できる
  • 問題があった場合にデータ遅延が発生する

パターン2: 実行時データ分離パターン

dataset
- public: _stageについて フィルタするView
- _stage: 
  • _stageには正常系と異常なデータの両方が含まれる

    • _stageに対して監視用の集計クエリを仕込むことでETLの結果に対して監視がしやすくなくなる
    • streamなデータにも対応可能
  • 実行時のデータ量が増えることになる

    • マテリアライズドビューやクラスタテーブルによる最適化をを推奨
  • レコードレベルでのフィルタ操作をすることになるため、
    異常が発生した際には テーブルレベルの集計値が異常になる点に注意

パターン3: 実行時データ検証パターン

CTEにデータ検証用の集計操作を用意する:

#15

レコードレベル 検証パターン

  • 集計関数を利用した場合にエラーになるが、個々のレコードを利用する場合にはエラーを回避できる

カラム検証パターン

主キーと従属情報のような関係の場合、従属情報に問題がある場合に主キーまで 非公開になると困る
テーブル結合を利用するパターンに使うことが多い

テーブル検証パターン

- public: データ全体の整合性を分析実行時にも検証する

実行時にテーブル全体に対するデータ検証を行い、問題がある場合は例外を投げ利用をできなくする.
実行時の計算コストが増える

  • マテリアライズドビューにより軽減可能

パーティションのガードパターン

参考

BigQueryのREGEPX_関数のエスケープ処理: ベンチマークつき

BigQueryでは、REGEXP_CONTAINS などの REGEXP_系の関数が、第二引数として正規表現文字列を受け付ける。
この値はテーブルの値をそのまま渡すことも可能であり、その場合、テーブルの文字列が安全でないと問題が生じることになる。

REGEXP_CONTAINS を テーブルの値から利用する場合には適切なエスケープ操作が必要になるが
この記事ではエスケープ操作の3種の実装を行い結果を比較し、最も効率的な計算方法について調査する。

実験コード

-- JSによる書き換え
CREATE TEMPORARY FUNCTION escape_regex_pattern1(pattern STRING)
RETURNS STRING
LANGUAGE js AS r"""
  return pattern.replace(/[.*+?^${}()|[\]\\]/g,  '\\$&');
""";

--  SQLのみでのエスケープ (REPALCE版)
CREATE TEMPORARY FUNCTION escape_regex_pattern2(pattern STRING)
RETURNS STRING
AS (
  REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(
    pattern,
    '\\', '\\\\\\'),
    '|', '\\|'),
    '(', '\\('),
    ')', '\\)'),
    '{', '\\{'),
    '}', '\\}'),
    '$', '\\$'),
    '^', '\\^'),
    '*', '\\*'),
    '+', '\\+'),
    '?', '\\?'),
    '.', '\\.')
)
;

--  SQLのみでのエスケープ (REGEXP_REPLACE)
CREATE TEMPORARY FUNCTION escape_regex_pattern3(pattern STRING)
RETURNS STRING
AS (
  REGEXP_REPLACE(pattern, r'([.*+?^${}()|[\]\\])', r'\\\1')
)
;

-- テストコード
WITH test_cases AS (
  SELECT '.*+' AS pattern, r'\.\*\+' AS expected_result UNION ALL
  SELECT '$.[]()' AS pattern, r'\$\.\[\]\(\)' AS expected_result UNION ALL
  SELECT 'hello, world!' AS pattern, r'hello, world!' AS expected_result UNION ALL
  SELECT '\\[\\]' AS pattern, r'\\\[\\\]' AS expected_result UNION ALL
  SELECT '' AS pattern, '' AS expected_result UNION ALL
  SELECT REPEAT('abcdefghijklmnopqrstuvwxyz', 200) AS pattern, REPEAT('abcdefghijklmnopqrstuvwxyz', 200) AS expected_result
)
, benchmark__dataset as (
  select * from test_cases, unnest(generate_array(1, 100000))
)
, benchmark1 as (
 select escape_regex_pattern1(pattern) = expected_result as assert from benchmark__dataset 
)
, benchmark2 as (
 select escape_regex_pattern2(pattern) = expected_result as assert from benchmark__dataset 
)
, benchmark3 as (
 select escape_regex_pattern3(pattern) = expected_result as assert from benchmark__dataset 
)

select countif(assert) from benchmark1
-- select countif(assert) from benchmark2
-- select countif(assert) from benchmark3
-- select pattern, escape_regex_pattern2(pattern), expected_result from test_cases

ベンチマーク結果

benchmark1

image

benchmark2

image

benchmark3

image

結論

パフォーマンスは REPLACE > REGEXP_REPLACE > JS の順番に良いことがわかった。
REPALCEは複数(x14-5回程度)呼び出ししていることもあり、REGEXP_REPLACEとREPLACEの間で反転すると思ったが
REPLACEの方が性能として良い結果となった。

分位数(パーセンタイル)を計算するSketch: t-digest

概要

異常値が発生する状況やLong-tailな分布といった状況下においては
**値(50%ile)や99%ileといった分位数は重要な統計量となりうる。

現代においては、これらの統計量はより大きなデータ量のもと計算効率や空間効率よく計算できることが望まれる。

  • 数十千万のユーザからアクセスされるWebsiteにおける統計量の算出

    • 数百GB/日~数TBのデータ量が発生する状況においての 統計量の算出
  • 毎秒数万のリクエストを捌く、アプリケーションサーバのlatencyの測定

    • 99%ile, 99.9%ile, ...

BigQuery における近似分位数の計算

declare scale int64 default 3; 
declare buckets int64 default cast(pow(10, scale) as int64);
declare plots array<int64> default 
  array(select cast(pow(10, s) as int64) from unnest(generate_array(0, scale -1)) as s)
  || [cast(buckets/2 as int64)]
  || array(select cast(pow(10, scale) - pow(10, s) as int64) from unnest(generate_array(0, scale - 1)) as s)
  ;
declare n_data int64 default 1000000;
declare n_sample int64 default 10;


with 
  uniform as (
    select 
      v2 as sample_ix
      , rand() as r
    from
      unnest(generate_array(1, n_data)) as v 
      , unnest(generate_array(1, n_sample)) as v2
  )
  , normal_dist as (
    select
      v2 as sample_ix 
      -- normal distribution: box-muller
      , sqrt(-2 * ln(rand())) * cos(rand() * 4 * atan(1.0)) as r
    from
      unnest(generate_array(1, n_data)) as v
      , unnest(generate_array(1, n_sample)) as v2

  )
  , heavy_tailed as (
    select 
      v2 as sample_ix
      -- normal distribution: box-muller
      , POWER((-1. / 1.5) * LOG(1. - rand()), 1./0.1)
      as r
    from
      unnest(generate_array(1, n_data)) as v 
      , unnest(generate_array(1, n_sample)) as v2
  )

  , experimented as (
    with datasource as (
      select 'heavytail' as label, * from heavy_tailed
      union all 
      select 'normal', * from normal_dist
      union all
      select 'uniform', * from uniform
    )
    , groundtruth as (
      with calc as (
        select 
          label, sample_ix
          , percent_rank() over (partition by label, sample_ix order by r) * buckets as prank
          , r
        from datasource
      )
      select 
        label, sample_ix
        , cast(round(prank, 0) as int64) as qtile
        , min(r) as min
        , max(r) as max
        , (max(r) + min(r)) / 2 as mid
      from calc
      group by label, sample_ix, qtile 
    )
    , approximate as (
      select
        label, sample_ix 
        , approx_quantiles(r, buckets) as value
      from datasource
      group by label, sample_ix
    )
    
    select 
      groundtruth, round(approx, 2) as approx, round((approx - mid), 3) as err_abs, round((approx - mid) / mid, 3) as err_rel
    from approximate as A
    left join unnest(A.value) as approx with offset qtile
    left join groundtruth using(label, sample_ix, qtile)
    where qtile in unnest(plots)
    order by qtile
  )

  select  
    groundtruth.label
    , to_json(struct(scale, n_data, n_sample)) as config
    , groundtruth.qtile / buckets as qtile
    , round(any_value(groundtruth.mid), 2) as groundtruth
    , round(avg(approx), 2) as approx
    , struct(
      round(avg(err_abs), 3) as avg
      , round(stddev(err_abs), 3) as stddev
     ) as err_abs
    , struct(
      round(avg(err_rel), 3) as avg
      , round(stddev(err_rel), 3) as stddev
    ) as err_rel
  from experimented
  group by label, qtile
  order by label

t-digstによる分位数の近似計算

Snowflakeなどでは t-digest が用いられている

  • マージ可能
  • 並列可能
  • 精度保証なし

https://docs.snowflake.com/en/sql-reference/functions/approx_percentile_estimate.html

1

Sketchの更新操作 (データのマージ)

image
(from Algorithm 1, 1)

References

Footnotes

  1. Computing Extremely Accurate Quantiles Using t-Digests 2

DataCatalogのLineageの BigQueryインテグレーションがやってきた!!!

https://cloud.google.com/bigquery/docs/release-notes#December_22_2022

先日、 DataCatalogのLineage機能がPreviewになっていたのだが
その機能が BigQuery の UIとして反映されるPublic Previewのリリースが本日入りました.

これは BigQueryで下記のようなテーブルのLineageが確認できるようになる、というものでです。
これはみなが求めていたムネアツの機能のひとつですね!

image

Lineage APIを有効にする

Lineage APIを有効にするには、こちらを参考に

  • Data Lineage API
  • Data Catalog APIs
    を有効にしてください。

https://cloud.google.com/bigquery/docs/data-catalog#enable_data_lineage

Data Catalogの Lineageを確認する

上記 APIを有効にしたタイミングから Data Lineageの構築がはじまると思いますので
Data Catalogの画面から確認しましょう。

僕が試した感じの注意点としては

  • Lineageのデータが有効になるまで 10minちょっとかかる
  • 有効になった時間より、前の時間でジョブのデータは参照されない(ように見えるので)
    適宜クエリを流してやると Lineageが早く確認できる

あたりがあります

今後のBigQueryどうなる...?

Data Catalogの BigQueryインテグレーションは誰もが望んでいた待望の機能だと思います。
そのなかで 近年のデータ基盤の取り組みでもアツい メタデータのなかで、LineageがBigQueryにここまで早く統合されたことは非常に嬉しいことです。

これまでは Data Catalogの機能は 独立していたことは多く
BigQuery → Data Catalog方向のインテグレーションはあっても逆方向は多くありませんでした。
昨年から続いてきた(いろいろ煮え湯飲まされた)BQのフロントUIの刷新の成果が
ようやく実を結んだことが実感できるリリースではないでしょうか?

ここを足掛かりにその他のBigQueryのData Catalog機能がBQコンソールに統合される未来も
そう遠くないのではないかと期待せずにはいられないリリースかなと思います。

BigQueryの新レイアウトのカラムの省略表示を外す

BigQuery 新UIによるカラム名の省略表示

BigQueryの新UIにおいて、いくつか不満があるが
その一つにカラム名の表示の省略がある。

特にstruct型に対する表示が好ましくなく、次のような表示となる。

image

ツールチップを表示さればカラム名は表示されるがコピーもできなくなるため端的に煩わしい。
これを解消するためのスニペットを紹介する。

スニペット適用前後での変化

before after

スニッペット

$$('th[aria-describedby]').forEach(
    n => {
      const column_name = $('#' + n.getAttribute('aria-describedby')).textContent;
      $("b", n).textContent = column_name;
      n.replaceChildren($("b", n));
      n.style = `width: ${column_name.length}rem;`
    }
)

BigQueryのAuditLogからジョブの参照カラムの情報を抽出する

こういう感じのクエリを書けばよい

select 
  metadata.`@type` as type
  , metadata.tableDataRead.reason
  , JSON_VALUE_ARRAY(metadata.tableDataRead.fields) as fields
  , *
from `bqmake._auditlog.cloudaudit_googleapis_com_data_access` as c
left join unnest([struct(
   safe.parse_json(protopayload_auditlog.metadataJson) as metadata
)])
where 
  date(timestamp) >= date(current_date() - interval 1 day)
  and resource.type = 'bigquery_dataset'
order by timestamp desc

BigQueryテーブルのFeature Toggle

はじめに

大規模なデータ処理システムにおいてはテーブルが大規模に利用されているほどリリースのリスクは高まります。Upstreamの予期しないデータの変更やETLの変更により、BigQuery上のテーブルについてデプロイ戦略を考えることが重要です。
BigQueryにおいて バージョナイズは一つの手でありますが、これは利用者によるマイグレーション操作が必要です。
可能であれば十分な監視があると仮定のもと、ユーザが利用しているテーブルにおいて安全に開発&デプロイをできると望ましいでしょう。

本記事では、BigQueryのテーブルリリース戦略として、Feature Toggleを活用しリスクを最小限に抑える方法を紹介します。Viewを使うことでFeature Toggleを実現し、次のような出し分けできます。

  • 小規模なデータの出し分け (特定のレコードのみ)
  • ユーザによる出し分け

Feature Toggleとは?

Feature Toggle(フィーチャートグル)とは、ソフトウェアの機能を切り替えする仕組みのことです。 この仕組み自体を指しFeature Flagと呼ばれます。
アプリケーションの開発にこれを組み込むことによって、開発者は特定の条件にあてはまるユーザのみに新しく開発した機能を提供することができます。
これにより機能開発のリスクを小さく保ちつことができます。

BigQueryでFeature Toggleを実現する方法

BigQueryでは、Viewを使ってFeature Toggleを実現することができます。
次に手順を示します。

セットアップ

-- Set up
create schema if not exists `demo`;
create or replace table `demo.bikeshare_stations_blue`
copy `bigquery-public-data.austin_bikeshare.bikeshare_stations`
;

create or replace table `demo.bikeshare_stations_green`
as
  select * from `bigquery-public-data.austin_bikeshare.bikeshare_stations`
;

基礎: Feature Toggle付きViewの用意

上記のセットアップの元、Feature toggle機能つきのViewは次のSQLにより構築することができます。
この例ではユーザによって 出し分けを行い、10%のユーザには 'green'環境が提供されます。

-- Feature toggle by user
create or replace view `demo.bikeshare_stations`
as
with switched_by_user as (
  with unioned as (
    select *, 'blue' as _metadata_version from `takegue.canary.bikeshare_stations_blue`
    union all
    select *, 'green' as _metadata_version from `takegue.canary.bikeshare_stations_green`
  )
  select *
  from unioned
  where 
    case
      -- ユーザによってテーブルを出しわける 
      when mod(farm_fingerprint(session_user()), 10) = 1 then _metadata_version = 'blue'
      else _metadata_version = 'green'
    end
)

select * from switched_by_user

ユーザによってではなく、レコードのIDよる出し分けを実現することも容易です。

-- Feature toggle by user
create or replace view `demo.bikeshare_stations`
as
with switched_by_user as (
  with unioned as (
    select *, 'blue' as _metadata_version from `takegue.canary.bikeshare_stations_blue`
    union all
    select *, 'green' as _metadata_version from `takegue.canary.bikeshare_stations_green`
  )
  select *
  from unioned
  where 
    case
      -- ユーザによってテーブルを出しわける 
      when mod(farm_fingerprint(format('%t', station_id)), 10) = 1 then _metadata_version = 'blue'
	  -- レコードのIDによって出しわける
      else _metadata_version = 'green'
    end
)

select * from switched_by_user

上記を実施する際の注意点として、動的な出し分けを行う際には利用者側にはどのソースからデータを取得したかの情報を知ることができるように _metadata_version カラムがあるとデバッグが容易になります。

応用: 高度な切り替え

切り替え条件に対し、操作可能なテーブルやメタデータを組み込むことで、高度な切り替え条件を実現することが可能になります。
ここではいくつかの高度な切り替え例を示します。

テーブルのラベル情報による切り替え

テーブルのラベル情報を元にテーブルの実装を動的に切り替えることができます。
次の例では、ラベル=featureの値によってテーブルを切り替える際のコード例です。

with unioned as (
  select *, 'blue' as _source from `demo.bikeshare_stations_blue`
  union all
  select *, 'green' as _source from `demo.bikeshare_stations_green`
)
, metadata__feature as (
  select as value
    label
  from `demo.INFORMATION_SCHEMA.TABLE_OPTIONS`
  left join unnest([struct(
    -- BigQuery の option_valueのパース
    -- https://github.com/takegue/bqmake/blob/a43db847795a5e6be6aa4a3751ba0c43a6b04ff1/bigquery/%40default/v0/%40routines/zget_bqlabel_from_option/ddl.sql
    array(
      select as struct
        string(label[0]) as name, string(label[1]) as value
      from unnest(json_extract_array(safe.parse_json(replace(replace(replace(option_value, "STRUCT", ""), '(', '['), ')', ']')))) as label
    ) as labels
  )])
  left join unnest(labels) as label
  where 
    table_schema = 'canary' and table_name = 'bikeshare_stations'
    and option_name = 'labels'
    and label.name = 'feature'
)
, core as (
  select *
  from 
    unioned
  left join metadata__feature as _feat on true
  where
    case 
      when _feat.value = 'blue' then _source = 'blue'
      else _source = 'green'
    end
)

select * from core

異常系におけるフォールバック

新しく作成したテーブルは時に問題のあるデータを含むかもしれません。
そのようなケースに備え、検査用のCTEを設けて 異常がある場合には
既に安定したテーブルに切り替えることもできます!

with unioned as (
  select *, 'blue' as _source from `demo.bikeshare_stations_blue`
  union all
  select *, 'green' as _source from `demo.bikeshare_stations_green`
)
, metadata__stats as (
  select as value count(*) from `demo.bikeshare_stations_green`
)
, core as (
  select *
  from 
    unioned
  left join metadata__stats as _stats on true
  where
    case 
      when _stats >= 50 then _source = 'blue'
      else _source = 'green'
    end
)

select * from core

上記のコードに対し異常を引き起こす場合には次の操作で demo.bikeshare_stations_blue
に異常を引き起こしてみてください。

truncate from `demo.bikeshare_stations_blue`

この後に上記のコードを実行すると、 _soruce = "green" のテーブルに切り替わっていることが確認できます。

CTE内で集計することで計算コストがあがることを心配するかもしれません。
実はこれは心配はなく、計算コストが問題になる場合には、このコードはマテリアライズドビューによる最適化が可能です! 次のコードにより作成してみてください。

create materialized view `demo.bikeshare_stations`
options(
  allow_non_incremental_definition = true
  , max_staleness = interval 30 minute
)
as
-- 省略

まとめ

今回紹介した切り替え条件の他にも、条件式を工夫することで様々な切り替えが実現できます。
みなさんでぜひ工夫してみてください。
1点利用の際の注意点として今回提示した方法はテーブル自身の一貫性の一部犠牲に開発可能性や再利用性を高める方法といえます。利用の際にはこの点には注意する必要があるでしょう。

BigQueryにおいてデータセットをpublic公開する

データセットの公開範囲の特殊グループとして allAuthenticatedUsers(GCPアカウント持ちユーザ) や allUsers(認証を含まないすべてのユーザ) を追加することで
GCPアカウントを持つ全ユーザにデータセットを公開することができる

  • UDFやプロシージャなどの公開に利用可能

image

指標レイヤのシフトレフト: 指標定義はデータソースに記述する

  • データマート層 / DWH層の使い分けは認知負荷が高い

    • 「実装者観点」として合理的な分け方だが、「データ利用者観点」で誤った分け方である
  • 集計データは再利用性が低い: 使いまわすことをあきらめる方が無難

    • グレインテーブルを構成する 分析ユニット / セグメント / 指標 の3要素 最小粒度の定義は困難
      * 最小粒度にしても、ストレージコストにそこまでメリットが生まれない
    • 「計測」としての「目的」「用途」「目的達成のための方法・手段」は再利用できない
  • 集計テーブルを定義するのではなく、指標定義データソースに書く

    • レイヤーを増やすのではなく、Viewを利用しカラム方向に拡張をする

データの集計操作は不可逆性の高い操作

#15 で言及はしたが、集計操作は不可逆性の高い操作である。

  • データソースは状態を持つと考えるのが自然であり、この情報は集計によって失われやすい
    • 計測システムにおける障害や問題の修正、backfilling

「計測」は再利用できない

集計データ、とは大なり小なり意思決定のために利用される。
何かを計測するために存在する。

ここで計測とは、何かを触れると JIS Z 8103:2019 より

特定の目的をもって,測定の方法及び手段を考究し,実施し,その結果を用
いて所期の目的を達成させること。

とある。
計測における「目的」「用途」「目的達成のための方法・手段」は再利用できない

  • 「目的」: 目的に応じたイベント定義や分析ユニット
  • 「用途」: 誰がどこでそのデータを使うか
  • 「目的の達成方法・手段」: データの前処理、集計の後処理の仕方

輝きを失った黄金パターン: データウェアハウスレイヤとデータマート

データレイク/データウェアハウス/データマート の3レイヤでの分割
データウェアハウシングを進めるうえでの「データエンジニア観点」の考え方フレームワークとしてよかったが
これは「データ利用のI/Fに適していない」分割で「データ利用者観点」では関心の分離が微妙で分割方法としては誤っている。

  • データの状態が3層に依存するため、結局そのつながりをすべて認知しないといけない

    • テーブルの使い分けは認知負荷がかかり、経験ベースでの習得が必要になる
    • 計測における問題修正や仕様変更に伴って、依存をすべて適切に管理しないといけない
  • 責任の境界をこの層の間で引いてしまうと運用がかえって難しくなる

    • これらの整合性を適切に管理し続けるのは運用負荷が高い
    • 依存は厳しく管理されなければならない

「データ利用者」にとってどこに認知負荷がかかりやすいか?

  • 認知負荷がかかるのは
    • データに関する仕様は認知負荷
    • データ仕様に関する前処理 / 後処理 など
    • 複数のテーブルの使い分け
    • テーブル同士の結合操作

逆に認知負荷が低いのは

  • SQLによる集計操作: sumやcount distinctは容易に習得できる
  • ワイドテーブルの単一テーブルに大量のカラムがある場合

万能なファクトテーブル

  • よく利用するID列群とその集計値のカラムをひとつのテーブルをファクトテーブルを用意する
  • セグメントについてディメンジョンテーブルのテーブルで管理する
  • セグメントに関する指標は、ファクトテーブルにディメンジョンテーブルを結合し、再集計することでリソース効率よく管理が可能になる

ただ上記操作では

  • よく利用するID列群を選ぶ組み合わせに自由度がある

これがどういう運用になるかというと

  • 不要なID列があとからあると、ファクトテーブルは作り直しになる
    • ファクトテーブルを作りなおすと、ファクトテーブルを利用したテーブルも作り直しになる

これを避けるためにどういうことするかというと

  • すべてのID列を列挙したファクトテーブルを作る

ということになる。
こうなってくると事前計算するコストメリットがかなり薄く、これはデータソースそのものとほぼ一致する
スタースキーマは大体のケースにおいて早すぎる最適化の方法にしかならない。

「指標」レイヤのシフトレフト

「指標」定義を再利用するためにはどうするとよいのか?

指標の標準化は大事である一方、指標レイヤは実装するべきではない。
もし定義すべき指標があるなら、DWHレイヤにカラムとして、切り出していくべきである。

DWH layer

awesome_dataset.dwh

-- DWH layer
with data as (
	select * from unnest([
		struct(1 as id, "one" as label)
        ,  (2, "two")
        ,  (3, "three")
        ,  (4, "four")
        # ...
	])
)

select * from data

awesome_datamart.core

-- Datamart Layer
with 
_pre as (
  select * from `awesome_dataset.dwh`
  where mod(id, 2) = 1
)
, _agg as (
  select 
    countif(label is not null) as nonnulls
    ,  countif(distinct id) as uniques
  from _pre
)

select * from _agg

Metric Layer

select "Odd Number Uniqueness" as name, uniques as value  from `awesome_datamart.core`
-- union all ...

こうするべきでは?

  • データの前処理含めて、カラムとして追加で定義する
    • データソースと指標定義が比較できるため、デバッグも容易
    • フィルタしやすいカラムは提供してても、whereによるフィルタは直接行わない
  • データマートレイヤは集計操作を行うだけ

awesome_dataset.dwh

-- DWH layer
with data as (
	select * from unnest([
		struct(1 as id, "one" as label)
        ,  (2, "two")
        ,  (3, "three")
        ,  (4, "four")
        # ...
	])
)
, metric_columns as (
  select 
     *
     , if(mod(id, 2) = 1,  id, null) as metric__oddlabel__uniquenss
     , if(mod(id, 2) = 1,  label is not null, null) as metric__oddlabel_nonnull
   from data
)

select * from metric_columns 

awesome_datamart.core

select 
  count(distinct metric__oddlabel_uniquenss)
  , countif(metric__oddlabel_nonnull)
from `awesome_dataset.dwh`

SQLにおけるNULLの取り扱いガイド: 3値論理の落とし穴を回避する

SQLの取り扱いで最もトラブルの多い 仕様の1つが NULL に関する取扱いだろう。
例として、次のクエリを確認する。

with data as (
  select 'key1' as key, "_name_a"  as name, "X" as label
  union all   
  select 'key2' as key, "_name_b"  as name, NULL as label
  union all   
  select 'key3' as key, "_name_c"  as name, "Y" as label
)

select 
   count(1) 
from 
  data
where 
   label != "X"

-- >  1(actual)   != 2 (expected)

実際に数えたいXではないレコードは、2にも関わらず実際の出力は1しか得られない。

解説と対処方法

SQLにおいては、真理値(true, false)に加えて NULLを含めた 3値論理の扱いが原則となる。
そして、where/having では nullの取り扱いは すべて not true として取り扱われる.

null をハンドリングするには

  • ifnull / coalesce : null時に適当な値でフォールバックする関数
  • is distinct from: 引数がnullでも返り値にNULLを返さない演算子

といった方法が利用できる

-- ifnull によるハンドリング
-- 省略
select 
   count(1) 
from 
  data
where 
   ifnull(label, "Unknown")  != "X"
-- is distinct from によるハンドリング
-- 省略
select 
   count(1) 
from 
  data
where 
  label is not distinct from  "X"

注意点として, ifnull や coalesceでハンドリングする場合は、NULL同士の比較には留意する必要がある。
仕様上は NULLは true = NULL is distinct from NULL となる。
ifnullなどでハンドリングした際に一つの値に丸めことに対する弊害には留意しよう。
余談ではあるが、NULLに似た取り扱いである NaNの取り扱いでも同様といえる。

3値論理(Thee-Valued Logic)

標準的なSQLにおいて、論理値の扱いは 2値ではなくNULLを含めた3値論理ですべて考える必要がある。

比較演算 と NULL

SQL上で定義されるビルトインの演算子は、オペランドのいずれかを含むNULLを含む計算は
出力がNULLになることを覚えておく必要がある。
例えば A <= B の場合 AかBがNULLであれば、この比較演算の結果は NULL となる。

struct での取り扱い

NULLに関する取扱いで特殊となるのがstruct型がサポートする比較演算の演算(=, !=, <>, IN)である。
struct型ではstruct型同士での型比較が可能である。 (このことからJOINのusing に含むことが可能である)

nullを含むstruct型では 次の例のような挙動を行う。

select struct(1, null) = struct(2, null)
--> false
select struct(2, null) = struct(2, null)
--> null
select struct(null, null) = struct(2, null)
--> null

これはという挙動で、 struc型の等価比較は各フィールドの等価比較を論理積AND(論理演算の項を参照)で結合した結果と等価である。

この動作は直感的にはかなり問題を生みやすいため、struct型の等価比較の利用はおススメしない
もしstruct型の比較を利用する場合には、次のようにformat演算などで文字列化しての比較をオススメする。
この場合においてはnullの等価性の定義としては不正確な演算ではあるが、
人間にとっては比較的直感的な取り扱いとして利用できる。

select format('%t', struct(2, null)) = format('%t', struct(2, null))
--> true

論理演算

ここまでNULLを含む演算子での取り扱いを考えてきたが、三値論理を考えるうえで
さらに複雑になるのが論理演算での取り扱いである。

次の3値での論理演算での結果を確認してほしい。

select true AND NULL 
--> null

select true AND NULL 
--> null

select true OR NULL 
--> true

select false OR NULL 
--> null

特にNULLに関する取扱いで間違えることが多いのが
論理積ANDにおけるNULLの取り扱いである、whereで複数条件を利用する際に

 -- ...
  where 
      condition1
      AND condition2
      AND condition3

と頻出するこの表現が正しく機能するためには
condition1-3の結果がすべてNULLでなければならない
1つの条件でもNULLが含まれる場合、全体として null, つまり not true となるため条件に合致しなくなる。

3値論理演算の真理値表

T = true, U = NULL, F = false として、真理値表を記述すると次のようになる。
3値論理においてNULLは不定(trueでもfalseでも良い)としてとり扱うことで理解がしやすいだろう

NOT

NOT演算は入力がNULLであれば、出力もNULLという扱い

V1 T U F
F U T
AND
V1\V2 T U F
T T U F
U U U F
F F F F

ANDの演算は 一方のオペランドがfalseの時、他の値が不定(NULL)であろうとtrueであろうと
全体をfalseと判定できる。

  • not any(e is false)
OR
V1\V2 T U F
T T T T
U T U U
F T U F

ORの演算は 一方のオペランドがtrueであれば、他の値が不定(NULL)であろうとtrueであろうと
全体をtrueと判定できる。

悪魔合体: 比較演算と論理演算の混合

さてここまでで論理演算によるNULLの取り扱いを確認してきた。
次のケースにおいて、XとYがどのような場合にNULLとなるか確認してみてほしい

  1. NOT ((10<= X) OR (Y < 20))
  2. NOT ((10<= X) AND (Y < 20))

集計関数における nullの取り扱い

最後にNULLの取り扱いとなるのは、 集計関数だ。

https://cloud.google.com/bigquery/docs/reference/standard-sql/aggregate-function-calls

集計関数においてもNULLの考慮が必要となる。

  • count: nullでも1つとして数え上げられる
  • sum: nullの場合は無視される(≒0として加算)
  • avg: 分母はcount, 分子はsum として数え上げられる。
  • min, max: NULLの値は無視されて計算される

avgはcountとsumの異なる挙動が複合的に動くため、カラムのNULLの比率が不透明の場合での利用を推奨しない。
利用する場合は IGNORE NULLS/RESPECT NULLS といった形で, NULLを明示的にに利用するか否かを示すことをお勧めする。

ORDER句におけるNULLの取り扱い

ODER BYにおける並び替えにおいて、NULLは デフォルトでは 昇順で必ず最初に来る値 としてふるまう。
つまり、どの値よりも小さい値として扱われる。
実はこの挙動は、変更可能である。 ORDER BY句のオペランドに後に
次の句によって NULLを度の値よりも小さい値か、NULLをどの値よりも大きい値を選択的に選ぶことができる

https://cloud.google.com/bigquery/docs/reference/standard-sql/query-syntax#order_by_clause

  • ORDER BY A ASC NULLS FIRST
  • ORDER BY A AC NULLS LAST

複数行の文字列のtrim

BigQueryのSQLにおいては、複数行の文字列に対するtrimを行う関数は実装されていないので
なので、自身で作る必要がある。

次のような関数を用意すればよい

create or replace function `fn.trim_multiline_string`(s string)
returns string
as
((
  select as value
    string_agg(nullif(trim(line), ''), '\n')
  from unnest(split(s, '\n')) line
))
;

select `fn.trim_multiline_string`("
hoge
    ,  fuga
") 
-- > "hoge\n, fuga"

https://github.com/takegue/estante/blob/master/codes/bigquery/fn/trim_multiline_string/ddl.sql

Slowly Changing Dimension Type 2 on BigQuery

Slowly Changing Dimension

DWH上でデータを整備するにあたって、特定のテーブルにおいては属性情報が時間経過とともに変化する場合がある。
このようなデータについては現在の状態と過去の状態を総合的に取り扱う必要がある。

例として、station_idをナチュラルキーとする次の駅テーブルを考えてみる
ある時点Aでは上記で表されていた、テーブルがある次点Bで次のように変化する (Close → Open)

時点A:

station_id name status location ...
12345 品川駅 Open 品川 ...
12346 高輪ゲートウェイ駅 Close 五反田 ...

時点B:

station_id name status location ...
12345 品川駅 Open 品川 ...
12346 高輪ゲートウェイ駅 Open 五反田 ...

このようなディメンションを Slowly Changing Dimension (以下SCD)と呼ぶ。
SCDのの取り扱い方法にはいくつか種類がありType 1- 6について定義される.
この中でも特に実践的に取り扱う手法はType 2 と Type4である。

Type 2では対象の元テーブルに対して、変化のあったレコードレベルでの記録を行う。
先の例でいえば、Type 2では次のように valid_from, valid_to の 有効期間のカラムを導入することで実現する。

station_id valid_from valid_to name status location ...
12345 timestamp A null 品川駅 Open 品川 ...
12346 timestamp B null 高輪ゲートウェイ駅 Open 五反田 ...
12346 timestamp A timestamp B 高輪ゲートウェイ駅 Close 五反田 ...

これを BigQueryで実現する方法について取り扱う

実装クエリ

テーブルの作成

元テーブルに対して、SCD Type2 でデータを記録する実態となるテーブルを次のように生成をしている。

create or replace table `sandbox.stations_scd_type2`
partition by DATE(valid_to)
cluster by unique_key
as
select  
  entity.station_id as unique_key
  , version_hash
  , timestamp(null) as valid_from
  , timestamp(null) as valid_to
  , entity
from `bigquery-public-data.austin_bikeshare.bikeshare_stations` as entity 
 , (select as value generate_uuid() ) as version_hash
limit 0
  • unique_key: 対象のユニークキー. クラスタインデックスのため切り出している
  • version_hash: 実行時のバージョンhash. これに対してuniqueカウントすることで、変更数を知ることができる。
  • valid_from, valid_to: データの有効期間を表すカラム
  • entity: 差分の検出を簡単にするために構造体化している

BigQueryにおいては パーティションがあるが、valid_toに対して制御を行うことで
valid_to IS NULL の時に 現在の状態が抽出できる.
また クラスタインデックスを unique_keyに作成することで実行コストの最適化が行える。

現在の状態の取得

現在の状態を再現するためには、valid_to が nullであるレコードを参照するだけである

SELECT 
  entity.*
FROM `project-id-7288898082930342315.sandbox.stations_scd_type2` 
where valid_to is null

過去の特定状態の取得

任意の時間時点 snapshot_point での状態を再現するためには、
次のクエリを実行すればよい。TVFにしておくことで簡単にアクセスできる。

SELECT 
  entity.*
FROM `project-id-7288898082930342315.sandbox.stations_scd_type2` 
where 
  -- when snapshot_point is null, use latest version
  (valid_from <= snapshot_point and ifnull(snapshot_point < valid_to, true))
  or
   (snapshot_point is null and valid_to is null)

更新式

作成したテーブルに対して、次のMERGEクエリを任意の時間間隔で実行することで
SCD Type2の履歴テーブルを構築することができる。

merge `sandbox.stations_scd_type2` T
using 
  (
    SELECT 
      entity.station_id as unique_key
      , uuid as version_hash
      , timestamp '2010-01-01' as valid_from
      , timestamp(null) as valid_to
      , entity
    FROM 
      -- 例として、 特定のIDのみのデータを置き換えている
      (select * replace(if(station_id in (2499), "closed", status) as status) from `bigquery-public-data.austin_bikeshare.bikeshare_stations`) as entity
    , (select as value generate_uuid()) uuid
  ) as S
on
  T.valid_to is null
  and S.unique_key = T.unique_key
  and format('%t', S.entity) = format('%t', T.entity)
-- Insert new records changed  
when not matched then
  insert row
-- Deprecate old records changed
when 
  not matched by source 
  and T.valid_to is null then  
  update set
    valid_to = current_timestamp()

-->  affect 2 records

注意点

元テーブルのスキーマ変更、上記実装では、元テーブルにスキーマ変更があった場合にも
派生テーブルである上記テーブルについてmigration操作を行わなければならない。

SQLにおいて 実行時にデータ検証を行う

ビューのCTEにテストを記述し、実行時評価を行うことでアドホックなデータ検証を行うことができる。

この方法は、フレームワークの導入前などにも簡単に導入できる
"とりあえず"書いておける

実行時の計算コストが増加することになるが、マテリアライズドビューを使うことによってこれは軽減可能。

with datasource as (
  select v.* from unnest([
    struct(1 as id, 'one' as name)
    , struct(2 as id, 'two' as name)
    , struct(3 as id, 'three' as name)
    , struct(3 as id, 'three' as name)
  ]) as v
)
, _validation as (
  with uniqueness as (
    select
      "#1 Uniquness" as title
      , format("%s should be unique. count=%d", unique_key, count(1)) as message
      , 1 = count(1) as assertion
    from datasource
    left join unnest([struct(
       format('%t', (id)) as unique_key
    )])
    group by unique_key
  )

  , test as (
    select as struct
      title
      , string_agg(distinct message, '\n') as error
    from (
      select * from uniqueness
    )
    where not assertion
    group by title
  )

  select as value
    if(
      count(error) > 0
      , error(format('Failed: %s', string_agg(error, ',' limit 10)))
      , true
    ) as errors
  from test
  where error is not null
)

select
  datasource.*
from datasource, _validation
where
  _validation

BigQueryのジョブ履歴からデータリネージを生成する

RECURISVE構文とINFORMATION_SCHEMA.JOBS_BY_* を組み合わせることで
ジョブの実行履歴からデータリネージを生成することができる

RECURSIVE CTE による再帰的SQLの記述

RECURSIVE構文を利用することにより、SQLの最適的取り扱いが可能である。

INFORMATION_SCHEMAによる参照テーブルの取得

またBigQueryが提供するメタデータ INFORMATION_SCHEMA.JOBS_BY_* によってクエリの実行履歴が取得できるが
これには referenced_tables といった形でクエリの実行時に参照したテーブルの情報が記録されている。

実装

次の実装は、ジョブの実行履歴からテーブルのリネージを生成する例となる。
ついでにテーブルの依存関係だけでなくユーザの呼び出しクエリも記載している。

これを利用することで、

  • ユーザのテーブル利用状態の取得
  • 利用されているテーブルの依存関係の一覧

などができるだろう

感想

WITH RECURSIVEは TVFなどで関数化することができないため、ちょっと使いづらく困った

-- BigQuery Table Depdencies
-- depth = -1 means query destination tables in ordinal usage
with recursive lineage as (
select
format('%s.%s.%s', dst_project, dst_dataset, dst_table) as destination
, 0 as depth
, relations.* except(unique_key)
from relations
where unique_key not like '%(User)%'
and not starts_with(dst_table, '_')
and not starts_with(dst_dataset, '_script')
union all
select
destination
, depth + 1 as depth
, lineage.src_project as dst_project
, lineage.src_dataset as dst_dataset
, lineage.src_table as dst_table
, relations.* except(dst_project, dst_dataset, dst_table, unique_key)
from lineage
join relations
on (relations.dst_project, relations.dst_dataset, relations.dst_table)
= (lineage.src_project, lineage.src_dataset, lineage.src_table)
)
, job as (
select
job_id
, user_email
, creation_time
, end_time - start_time as processed_time
, start_time - creation_time as wait_time
, query
, statement_type
, total_bytes_processed
, total_slot_ms
, destination_table
, referenced_tables
from `project-id-7288898082930342315.region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT`
where
destination_table.table_id is not null
and error_result.reason is null
and state = 'DONE'
)
, relations as (
select
if(
is_temporary and is_anonymous_query
, format('(User) -> %t', ref)
, format('%t <- %t', destination_table, ref)
) as unique_key
, any_value(destination_table).project_id as dst_project
, any_value(destination_table).dataset_id as dst_dataset
, any_value(normalized_dst_table) as dst_table
, any_value(ref).project_id as src_project
, any_value(ref).dataset_id as src_dataset
, any_value(normalized_ref_table) as src_table
, max(creation_time) as job_latest
, approx_top_sum(query, unix_seconds(creation_time), 10)[safe_offset(0)].value as query
, approx_count_distinct(user_email) as n_user
, approx_count_distinct(query) as n_queries
, approx_count_distinct(job_id) as n_job
, sum(total_bytes_processed) as total_bytes
, approx_quantiles(processed_time_ms, 10) as processed_time__quantiles
, approx_quantiles(wait_time_ms, 10) as wait_time__quantiles
, sum(total_slot_ms) as total_slots_ms
, approx_quantiles(total_slot_ms, 10) as total_slots_ms__quantiles
from job, unnest(referenced_tables) as ref
left join unnest([struct(
extract(millisecond from processed_time)
+ extract(second from processed_time) * 1000
+ extract(minute from processed_time) * 60 * 1000
+ extract(hour from processed_time) * 60 * 60 * 1000
as processed_time_ms
, extract(millisecond from wait_time)
+ extract(second from wait_time) * 1000
+ extract(minute from wait_time) * 60 * 1000
+ extract(hour from wait_time) * 60 * 60 * 1000
as wait_time_ms
, regexp_extract(ref.table_id, r'\d+$') as _src_suffix_number
, regexp_extract(destination_table.table_id, r'\d+$') as _dst_suffix_number
, destination_table = ref as is_self_reference
, starts_with(destination_table.dataset_id, '_') and char_length(destination_table.dataset_id) > 40 as is_temporary
, starts_with(destination_table.table_id, 'anon') as is_anonymous_query
)])
left join unnest([struct(
if(safe.parse_date('%Y%m%d', _src_suffix_number) is not null, regexp_replace(ref.table_id, r'\d+$', '*'), ref.table_id) as normalized_ref_table
, if(safe.parse_date('%Y%m%d', _dst_suffix_number) is not null, regexp_replace(destination_table.table_id, r'\d+$', '*'), destination_table.table_id) as normalized_dst_table
)])
where
not is_self_reference
and not statement_type in ('INSERT', 'DELETE', 'ALTER_TABLE', 'DROP_TABLE')
and statement_type is not null
group by unique_key
)
, user_query as (
select
format('%s.%s.%s', src_project, src_dataset, src_table) as destination
, -1 as depth
, dst_project as dst_project
, string(null) as dst_dataset
, string(null) as dst_table
, relations.* except(unique_key, dst_project, dst_dataset, dst_table)
from relations
where starts_with(unique_key, '(User)')
and not starts_with(src_table, '_')
and not starts_with(src_dataset, '_script')
union all
select * from lineage
)
select * from user_query
order by destination, depth

集計の際にデバッグ可能な情報を埋め込む: SQLを検証可能にする

概要

集計は怖いぞ

  • 集計クエリが負債化しやすいのは、集計操作が不可逆な操作でなおかつデータソースというas-wasの状態を含むから
  • 集計クエリには、集計対象の情報だけでなく集計元やデバッグ情報を埋め込もう
  • こういったメタ情報を埋め込むといいんじゃないかという情報を紹介する

どういう情報を埋め込むとよいか

次のような情報を埋め込むとよいと考えている(他何かいいのあれば募集)
逆にとらえると集計という操作は、元テーブルに対して次のような情報を失うことを意味する。
(そりゃ運用できなくなるな。コンテキストが高すぎる)

  • 集計対象のテーブルに対するメタデータ: 対象のテーブルは時間経過とともに変わるかもしれない

    • データソース名
    • 集計日時の範囲
    • 集計時でのテーブルのレコード数
  • 集計対象のカラムに対するメタデータ

    • 主要な統計量
    • ユーザやページといった主要なエンティティに関する統計量
      • 特定のユーザIDによるskewが発生しているなどがあとからわかると嬉しい
    • approx_top_countや approx_quantilesが便利である
  • 集計操作に関するメタデータ

    • 集計条件
    • 条件内外のデータ
  • (JOINする場合) 結合操作に関するメタデータ

    • テーブルとテーブル間の関係は後からではわからない
    • 1:1なのか1:Nなのか 1: 0or1 なのか... といったことがわかるもの
  • 実行時のジョブに対するメタデータ

    • 実行日時
    • ジョブの作成に利用したそのほかパラメータ
      • BigQueryであれば __PARTITIONTIME などのパラメータ
      • 集計条件などを入れるとよいかもしれない
      • 関数などのバージョン番号

集計クエリ1 (group by単体)

上記をもとに、集計クエリを書き換えてみる

before:

with source as (
  select * from `table`  --> このテーブルは 日々replaceされるかもしれない. 古いレコードは削除されるかもしれない
)
, agg as (
  select 
     dim1 --> dim1のラベルは減ったり増えたり、欠損するようになるかもしれない
     , dim2
    -- 何かしら集計したい指標
     , sum(value) as sum_value --> 特定のユーザ (user_id)の寄与が99%かもしれない. 値域が変わるかもしれない
  from source
  where 
    dim4 = "some_condition"  --> ドリフトにより 条件外の時のデータが当時と違って増えるかもしれない. dim4のratio情報が必要
  group by dim1, dim2 
)

select * from agg

after:

with source as (
  select  
    _PARTITIONTIME  as __PARTITIONTIME
    , * 
  from `table` 
  where _PARTITIONTIME between "2022-06-10" and "2022-06-12"
)
, _metadata_table as (
   select as struct
    "`table`" as name
     , min(__PARTITIONTIME) as __PARTITIONTIME_from
      , max(__PARTITIONTIME) as __PARTITIONTIME_to
     ,  count(1) as n_records
     -- フィルタ操作に対するメタデータ
     ,  countif(dim4 = "some_condition")  as n_matched
   from source
)
, agg as (
  select 
     dim1
     , dim2
     , sum(value) as sum_value
     -- metadata
     , struct(
       struct(
          -- valueの統計値
          struct(
            min(value) as min
            , min(value) as max
            , avg(value) as avg
            , approx_qunatiles(value, 10) q10tiles
          ) as value
          -- valueに関わるエンティティの統計情報
          , struct(
 	   approx_top_sum(user_id, sum_value, 10) as top5_sum
	      , approx_top_count(user_id, 10) as top10_count
          ) as user
       ) as column
       ,  _metadata_table as table
       , struct(
         current_timestamp() as created_at
       ) as job
     ) as _metadata
  from source, _metadata_table
  group by dim1, dim2
)

select * from agg

Slowly Changing Dimension Type 4 on BigQuery

Typeに対する記述が誤り

Slowly Changing Dimension

Type 2

  • unique_key に対して、行レベルでヒストリを保持する
  • 一部の高頻度で状態が変わるカラムを保持している場合、レコード数の増加が大変になる
    • 更新頻度が低い場合、こちらが有利

Type 4

  • unique_key に対して、カラムレベルでヒストリを保持する
  • カラムをpivotし、行として扱うことになるため文字列だけストレージコスト増える
    • 更新頻度が高い場合、こちらが有利
    • 最終利用時には pivotをうまく使う必要があり、これが少々厄介

実装

テーブル設計

  • valid_to に対して パーティションを切るとよい

    • valid_to is NULL であるレコードが 最新の状態となり、コスト効率が良い
  • ユニークキーにクラスタインデックスを貼っておくとよい

SCD Type4 用のクエリ/テーブル生成

テーブルの カラムに対して動的に生成する必要がある

https://github.com/takegue/estante/blob/c997d9bf300f905573dc2386cd0765f7e777712e/codes/bigquery/fn/generate_sql_scd_type4/ddl.sql

更新式

  1. 冪等: 複数回実行したとしても、レコードの値が変わらない限り最新分のレコードのみ追加される
merge `sandbox.test_scd_type4` T
using 
  (
      select 
         src.unique_key 
         , version_hash
         , current_timestamp() as valid_from
         , timestamp(null) as  valid_to
      from 
         src as src
         ,  (select as value generate_uuid()) as version_hash
  ) as S
on
  T.valid_to is null
  and S.unique_key = T.unique_key
  and S.column_name = T.column_name
  -- Evaluate `struct(1, null) = struct(2, null)` is false, `struct(1, null) = struct(1, null)` is false
  and format('%t', S.column_value) = format('%t', T.column_value)

-- Insert New Record
when not matched then
  insert row
-- Deprecated Old Rercord
when 
  not matched by source 
  and T.valid_to is null then  
  update set
    valid_to = current_timestamp()

利用:

  1. 特定の時間時点での履歴を

BigQueryにおける 三値論理の all, any関数の実装

BigQuery上における all, anyの誤った実装例

Pythonでいうallやany, javasciptにおける allやsomeといった関数を
SQL上で実現しようと考えると unnest と logical_or , logical_and が思い浮かぶ

declare arr array<bool> default [true, false];
select
  (select logical_and(v) from unnest(arr) as v ) as ret

注意としてこれらはallやsomeと等価にならない
これは空集合に対しての値定義が一致しなたいためである。

また logical_or や logical_and は三値論理
nullの値を無視するつくりになっており、三値論理正しく実現していない。
例えば次のようなケースである

いずれかの値がnullであるケース:

  • 論理積であれば いずれか falseであれば falseになる. それ以外は不定
  • 論理積であれば いずれか trueであれば trueになる. それ以外は不定

allやanyと同等の結果を実現するためには、これらについて適切にハンドリングする必要がある

BigQuery上における 3値論理を実現する all, anyの正しい実装例

any

create temp function `any`(arr array<bool>)
as 
 ((
   select as value 
    if(
      array_length(arr) > 0
      , if(logical_or(v), true, if(countif(v is null) > 0, null, false))
      , false
    )
   from unnest(arr) as v
 ))
;
assert `any`([true, false]) = true;
assert `any`([true, null]) = true;
assert `any`([false, null]) is null;
assert `any`([]) = false;

all

create temp function `all`(arr array<bool>)
as 
 ((
   select as value 
    if(
      array_length(arr) > 0
      , if(logical_and(v), if(countif(v is null) > 0, null, true), false)
      , true
    ) 
   from unnest(arr) as v
 ))
;

assert `all`([true, false]) = false;
assert `all`([false, null]) = false;
assert `all`([true, null]) is null;
assert `all`([]) = true;

BigQueryで `LIMIT` を使いこなす

SQLにおける limit 節は、データの出力行数を制御するためのパラメータであり
カラムナ型データベースであるBigQueryにおいては 一見役割が薄く見える節でありますが

BigQueryのSQLエンジンにおいては、このLIMIT節について高度な最適化がされており
これらを使いこなすことでいくつかの恩恵を受けることができます。
ここではこれらのtipsについていくつかまとめていきます。

ORDER BY と LIMIT の組み合わせによる最適化

ORDER BY節はテーブルの行数が多い場合に特にコストが高くなる計算のひとつにあげられますが、LIMIT節と組み合わせることで恩恵を受けることができます。
LIMIT節があることでLIMIT操作を割り当てられたslotが十分な結果を保持した段階もしくは
計算処理打ち切りが条件が明確な場合において、前段のslot消費を止めることができるためです。

これは BigQueryのクエリ最適化のプラクティスにも挙げられている手法のひとつです

-- 最適化されたSQLの例: 次から引用
-- https://cloud.google.com/bigquery/docs/best-practices-performance-filter-order#use_limit_with_an_order_by_clause
SELECT  id,  reputation,  creation_date,  DENSE_RANK() OVER (ORDER BY creation_date) AS user_rankFROM bigquery-public-data.stackoverflow.users  
ORDER BY user_rank ASCLIMIT 10;

実際のBigQueryのエンジンがどのようにslotを配置するかを視覚的に理解するには次の動画が参考になります

BigQuery 管理者リファレンス ガイド: クエリの最適化

LIMIT 0: 一見無用に見える便利なテクニック

LIMIT 0は行を出力しない構文で、一見意味のない構文に見えるかもしれませんが
LIMIT 0に施された最適化はリソース消費をゼロもしくは極限に抑えることができます
これを使うことで次のようなことができます

  • 出力をともわないクエリのDRY RUN: ViewやTVFに破壊的変更がないか確認することができます
  • 空テーブルの構築: SQLのスキーマを確認する際に便利です
  • 複数のSQLのスキーマ整合性の確認

このようなSQLがあったとしましょう。

with complex_1 as (
  -- ...
)
, complex_2 as (
  -- ...
  -- from complext_1
)
, complex_3 as (
  -- ...
  -- from complex_2
)

select * from complex_3

スキーマの確認すらも容易ではないですが、次のようなSQLを書くことで
簡単にスキーマ出力を得ることができます!

create temp table `check_schema`
as
with complex_1 as (
  -- ...
)
, complex_2 as (
  -- ...
  -- from complext_1
)
, complex_3 as (
  -- ...
  -- from complex_2
)

select * from complex_3
limit 0

require_partition_filter オプションの検証をバイパスし、スキーマチェックする

require_partition_filter はパーティション構造をもつテーブルに対して
パーティションを指定しないクエリの記述を禁止するオプションである.

このテーブルに対して limit 0 を使うと パーティションに関するフィルタを指定しない状態においてもクエリのチェックが可能になる。
これによって巨大なパーティションテーブルとのスキーマの整合性チェックなどが容易にできる

次のクエリは実行可能な有効なクエリである.

create or replace temp table `takegue.sandbox.sample_partition_table`
partition by date_jst
options(
  require_partition_filter  = true
)
as select * from `takegue.sandbox.sample_partition_table` limit 0

;

with _source as (
  SELECT * FROM `takegue.sandbox.sample_partition_table`
)
, _query as (
  SELECT date(null) as date_jst
)

select * from _source
union all
select * from _query
limit 0

LIMIT 節の制限: 定数値の使用のみが許される

ここまで紹介してきたようにLIMIT節は有用な用途が存在する一方でいくつかの制限があります

  • BigQuery Scriptの変数を使うことはできません

  • クエリパラメータは使うことができます @limit_num

  • array_agg などの 配列集計で動的な制限をかけたい場合には where節で条件指定をすることで、limitと同等の出力を得ることができます。

declare limit_num int64;
set limit_num = 3;

select 
  [
    select
      u
    from
	  unnest(generate_array(1, 10)) as u
	-- Error!: limit limit_num
	-- Passed: limit @limit_num
	limit 3
  ]

この制限は次のような書き換えを行うことで回避することができます。

declare limit_num int64;
set limit_num = 3;

select 
  [
    select
      u
    from
	  unnest(generate_array(1, 10)) as u 
	    with offset ix
	where
	   ix < 3
	-- Passed: ix < limit_num
	-- Passed: ix < @limit_num
  ]

まとめ

LIMIT 節にまつわるいくつかのtipsを紹介しました。
ここで紹介したtipsがみなさんの役に立てれば幸いです!

BigQueryで特定のクエリの参照テーブルを抽出する

特定のクエリが依存するテーブルを抽出したい

viewやTVFによって定義されたテーブルが依存する物理テーブルを取得したい場合がある。
特に孫依存になっている場合は、そのテーブルのリネージを管理するうえで、祖先の物理テーブルを把握することは特に重要といえる。
こういった時にどうすればいいか、というのが今回の焦点である。

解決方法

BigQueryの INFORMATION_SCHEMA.VIEWS などにおいては、こういった参照テーブルの情報は保存されておらず
INFORMATION_SCHEMA.JOBS_BY_USER などの実行情報から参照しなければならない。

-- Prepare Sample TVF
create schema if not exists `sandbox`;
create or replace table function `sandbox.sample_tvf`(target_date DATE)
as
  select * from `bigquery-public-data.ga4_obfuscated_sample_ecommerce.events*`
  where parse_date(_TABLE_SUFFIX, '%Y%m%d') = target_date

-- Analyze referenced tables
execute immediate 'select * from `sandbox.sample_tvf`("2022-06-20")';
set last_job_id = @@last_job_id;
set ret = (
  select as value
    referenced_tables
  from `region-us.INFORMATION_SCHEMA.JOBS_BY_USER`
  where job_id = last_job_id
  and date(creation_time) = current_date()
  order by start_time desc
  limit 1
);

注意点としては、この方法ではテーブル関数やビューテーブルの場合、
それらのリソース先まで参照が解決されるため、これらの呼び出しを抽出することはできない。

Pro tip: 参照解析のためのクエリ実行のリソース消費量を最低にする

上記のクエリのままだと、参照解析のためのクエリ実行のリソース消費量が大きい

  • on-demand: テーブルのスキャン量に応じた課金が発生する
  • flat-rate: スロット消費が発生する

やりたいことは参照解析だけであるため、これらのリソース消費量をなるべく低く抑えたい。

カラムを参照しないクエリを投げる

-- 課金されず、スロット消費が最低となる
select 1 from `bigquery-public-data.ga4_obfuscated_sample_ecommerce.events_*` limit 0

参照解決のためのerrorを意図的に起こしBigQueryの課金を回避する

on-demand課金の場合、BigQuery は エラーが発生した場合のクエリは課金されない仕組みである。
そして INFORMATION_SCHEMA.JOBS_BY_USER は エラーが起きた場合においても、参照クエリが解析される仕様である。

この仕様を組み合わせて、あえてエラーが起こるクエリを実行することで
テーブルスキャンを回避した参照テーブルの抽出が可能となる。
また 実行時エラーにより、行レベルのスキャンが行われるため、スロット消費もこちらの方が低い。

-- エラーにより課金されず、スロット消費も最小限となる
select 1 from `bigquery-public-data.ga4_obfuscated_sample_ecommerce.events_*` where error("try error") limit 0

CTEで定義のみ行い、参照を行わない

実はCTEだけ定義を用意して、そちらを一切クエリ中で利用せずとも参照解析ができる

-- エラーにより課金されず、スロット消費も最小限となる
with A as (
  select * from `bigquery-public-data.ga4_obfuscated_sample_ecommerce.events_*` 
)
select error('intentional error')

Procedure化する

プロシージャ化しておくと、ちょっとしたときに使えて便利である。
https://github.com/takegue/estante/blob/master/codes/bigquery/fn/get_query_referenced_tables/ddl.sql

Smart Tuning利用可能なテーブルのプロファイル用SQLを生成する

概要

未知のテーブルや既存の大きいデータのテーブルの利用に対しては
初手ではデータのプロファイリングを行いたい

  1. 充足率(non-nullの比率)はどうなっているか
  2. uniqunessかどうか
  3. カラムがどういう値域を持っているか、その分布はどうなっているか

といったことを調べたくなる。

この計算はテーブルのデータ検証で頻出する表現であり、
BigQueryにおいては、マテリアライズドビュー化しておくことでSmart Tuning機能によるクエリ書き換え機能も期待できる
https://cloud.google.com/bigquery/docs/materialized-views-use#smart-tuning

このようなSQLを一定自動生成できるとデータ調査が非常にはかどるので実装しておきたい

実装

テーブルのパス情報に対して、次の用なSQLを生成する。
このSQLではカラムごとの型情報に基づいて集計指標が、カラムとして展開される。

  • 動的なINFORMATION_SCHEMAの参照が必要なため、プロシージャでの実装が必要となる
  • materialized view の 生成には、DDL構文を別途記述する必要がある
  select
   partition_key
   , null as group_keys   , count(1) as count
     -- station_id (f1)
          , 'station_id' as f1__name
          , count(station_id is not null) as f1__nonnull
          , approx_count_distinct(station_id) as f1__unique
          , hll_count.init(station_id) as f1__hll
          , sum(station_id) as f1__sum
          , avg(station_id) as f1__avg
          , min(station_id) as f1__min
          , max(station_id) as f1__max
        
          , approx_top_count(station_id, 5) as f1__top_count
          , approx_quantiles(station_id, 20) as f1__20quantile
    -- name (f2)
          , 'name' as f2__name
          , count(name is not null) as f2__nonnull
          , approx_count_distinct(name) as f2__unique
          , hll_count.init(name) as f2__hll
          , avg(CHARACTER_LENGTH(name)) as f2__avg_len
          , min(CHARACTER_LENGTH(name)) as f2__min_len
          , max(CHARACTER_LENGTH(name)) as f2__max_len
        
            , approx_top_count(name, 5) as f2__top_count
            , approx_quantiles(name, 20) as f2__20quantile
   # ...

https://github.com/takegue/estante/blob/master/codes/bigquery/fn/generate_sql_table_stats/ddl.sql

option カラム

  • ("materialized_view_mode", "true") : materialized view用の syntaxのみをサポートした関数で集計を行う

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.