Ritolabo
  1. Home
  2. GoogleCloud
  3. BigQuery
  4. BigQueryパイプラインでワークフロー作成|テーブル変換・スケジュール設定など

BigQueryパイプラインでワークフロー作成|テーブル変換・スケジュール設定など

  • 公開日
  • カテゴリBigQuery
  • タグBigQueryPipeline,自分用メモ
BigQueryパイプラインでワークフロー作成|テーブル変換・スケジュール設定など

BigQuery でデータ分析基盤を構築する際、datalake 層に取り込んだデータを加工して dwh/dm 層を作成する「変換(Transform)」処理が必要になる。

この変換処理を UI ベースで簡単に設定できる BigQuery パイプライン を試す。

contents

  1. BigQuery パイプライン
    1. Dataform との関係
  2. BigQuery パイプラインでできること
  3. 1. パイプラインの作成
  4. 2. タスクの作成
    1. 01. タスク追加
    2. 02. タスク設定
  5. 依存関係の確認
    1. 作成するテーブル構成
    2. 依存関係の自動検出: ${ref()} 記法
    3. daily_sales_report の例
    4. 依存関係グラフ
  6. スケジュール設定
    1. スケジュール設定を開く
    2. 実行オプション
    3. スケジュール設定の確認と実行確認

BigQuery パイプライン

BigQueryパイプラインは、Dataform の技術をベースにした簡易版 UI ツール。

UIベースで dwh/dm テーブルの作成・依存関係管理・スケジュール実行ができる。

ETL でいうと 「T(Transform)」と「L(Load)」 を担当する。

E (Extract)     → データソースからデータを取り出す
T (Transform)   → データを加工・変換 ← ここ
L (Load)        → 変換後のデータを格納 ← ここ

Dataform との関係

Dataform(コア技術)
    ├─→ BigQuery パイプライン(UIベース簡易版)
    └─→ Dataform サービス(フル機能版、Terraform対応)
項目BigQueryパイプラインDataform
設定方法UIのみコード(SQLX) + UI
Terraformなしあり
Git連携なしあり
用途検証・簡易利用本番運用

参考:

BigQuery パイプラインでできること

機能内容
SQLでテーブル作成SELECT文を書いて新しいテーブル/ビューを定義
依存関係の自動解決テーブル間の参照関係を自動検出、正しい順序で実行
スケジュール実行毎日/毎週など定時実行の設定
メタデータ管理テーブル・カラムに説明を付与

参考:

1. パイプラインの作成

まず最初に、パイプラインを作成する。一つのワークフローのイメージ。この中にタスクを複数配置していく流れになる。

01. BigQuery コンソールの左メニュー「パイプライン」→「パイプラインを作成」をクリック。

パイプラインメニュー

02. コードアセットを保存するデフォルトのリージョンを選択。

リージョン選択

03. 「BigQuery Pipelines へようこそ」ダイアログが表示される。

実行ユーザー選択
オプション用途
ユーザー認証情報で実行 [プレビュー]個人検証・開発時
サービスアカウントで実行チーム開発・本番運用時

ユーザー認証でもスケジュール実行は可能。ただし、退職などでアカウントが削除されると全パイプラインが停止するリスクがある。本番運用ではサービスアカウントを使う。

今回は検証なので「ユーザー認証情報で実行」を選択。

上部に表示されているパイプライン名「無題のパイプライン」をクリックすると名前を設定できる。

パイプライン名設定

参考:

2. タスクの作成

次に、ワークフロー内で実行するタスクを作成していく。

例えば、datalake層のデータから 2 つの dwh テーブルを作成したい場合、2つのタスクを作成するイメージ。

01. タスク追加

「タスクを追加」→「テーブル」を選択

タスクを追加
タスクを追加

タスクの種類:

カテゴリ項目用途
ソースの追加ソースを宣言する既存テーブルをソースとして定義
構築テーブルSQLで新しいテーブル/ビューを作成
データの準備AIでデータクリーニング・変換
検証データ品質テストデータ品質チェック

テーブル名を入力(例: order_summary)し、「作成」を押下すると、タスクが作成される。

パイプライン画面

02. タスク設定

タスクを作成したら、作成するテーブルのSQLなどを設定していく。

下部パネルの「開く」ボタン、またはタスクをダブルクリックでSQLエディタを開く。

001. メタデータ設定

config ブロックにテーブルの設定・説明・カラムの説明を記述する。

config {
  type: "table",
  name: "order_summary",
  dataset: "dwh_sample",
  description: "注文サマリー:ユーザー別・注文別の合計金額",
  columns: {
    order_id: "注文ID",
    user_id: "ユーザーID",
    user_name: "ユーザー名",
    ordered_at: "注文日時",
    total_amount: "合計金額"
  }
}
項目説明
typeテーブルの種類(table / view
name作成するテーブル名
dataset保存先データセット
descriptionテーブルの説明(BigQueryに反映)
columns各カラムの説明(BigQueryに反映)

002. SQLを入力

config ブロックの下にSQLを記述。

SELECT
  o.order_id,
  o.user_id,
  u.name AS user_name,
  o.ordered_at,
  SUM(oi.quantity * oi.unit_price) AS total_amount
FROM dl_sample.orders o
JOIN dl_sample.users u ON o.user_id = u.user_id
JOIN dl_sample.order_items oi ON o.order_id = oi.order_id
GROUP BY 1, 2, 3, 4
config,SQL入力後

003. タスクを実行(検証)

上部の「実行」ボタンでSQLの検証を実行できる。この時点ではテーブルは作成されない(検証のみ)。

004. パイプラインを実行(テーブル作成)

パイプライン画面に戻り、パイプラインの「実行」ボタン をクリック。

これでテーブルが作成される。メタデータ(テーブルの説明・カラムの説明)も反映される。

order_summaryテーブル詳細

BigQueryパイプラインを用いてのテーブル作成を、ミニマムで実現させるまでの手順はここまで。

依存関係の確認

複数のテーブルを追加して、依存関係が自動検出されることを確認する。

作成するテーブル構成

[並列] order_summary ───┐
                        ├─→ [直列] daily_sales_report
[並列] product_sales ───┘
テーブル依存関係実行タイミング
order_summarydl層のみ参照並列実行可
product_salesdl層のみ参照並列実行可
daily_sales_report上記2つを参照上記完了後

依存関係の自動検出: ${ref()} 記法

パイプライン内の他テーブルを参照する場合、${ref("テーブル名")} を使う。

-- 直接指定(依存関係が自動検出されない)
FROM dwh_sample.order_summary os

-- ref記法(依存関係が自動検出される)
FROM ${ref("order_summary")} os

ref() を使うと:

  • 依存関係グラフに矢印が自動追加される
  • 実行順序が自動で制御される

daily_sales_report の例

config {
  type: "table",
  name: "daily_sales_report",
  dataset: "dm_sample",
  description: "日次売上レポート:注文サマリーと商品売上を結合",
  columns: {
    order_date: "注文日",
    total_orders: "注文数",
    total_revenue: "総売上",
    top_category: "売上トップカテゴリ"
  }
}

SELECT
  DATE(os.ordered_at) AS order_date,
  COUNT(DISTINCT os.order_id) AS total_orders,
  SUM(os.total_amount) AS total_revenue,
  (
    SELECT ps.category
    FROM ${ref("product_sales")} ps
    ORDER BY ps.total_sales DESC
    LIMIT 1
  ) AS top_category
FROM ${ref("order_summary")} os
GROUP BY 1

依存関係グラフ

パイプライン画面で、タスク間の依存関係が自動検出されて表示される。

依存関係グラフ
  • order_summaryproduct_sales は並列(矢印なし)
  • daily_sales_report は上記2つから矢印が来ている

スケジュール設定

パイプラインの定時自動実行を設定する。

スケジュール設定を開く

パイプライン画面上部の「スケジュール」をクリック。

パイプライン画面
スケジュール設定メニュー

スケジュールの頻度を設定

スケジュール設定メニュー
頻度説明
毎時1時間ごとに実行
毎日1日1回、指定時刻に実行
毎週週1回、指定曜日・時刻に実行
毎月月1回、指定日・時刻に実行
カスタムcron形式で柔軟に指定

実行オプション

「優先度の高いインタラクティブ ジョブとして実行する」

設定動作
ON(デフォルト)即座に実行開始
OFFアイドル状態のリソースが空いたときに実行(バッチジョブ)

通常はデフォルト(ON)のままでOK。コスト削減したい場合はOFFにする。

設定完了後、「スケジュールを作成」をクリックでスケジュールが設定される。

スケジュール設定の確認と実行確認

パイプライン画面から「スケジュールを見る」で設定を確認可能

スケジュール設定

実行数タブから、スケジュール実行されたことを確認できる。

スケジュール設定

参考:

まとめ

BigQuery パイプラインを使って、datalake 層のデータを加工して dwh/dm 層のテーブルを作成する流れを試した。

良い点:

  • UIでサクッと設定できる
  • メタデータ(テーブル・カラムの説明)も設定可能
  • ref() 記法で依存関係を自動検出
  • スケジュール実行も設定可能

辛い点:

  • 数が増えてきたら管理が煩雑になりそう
  • 設定をコード管理できない
  • パイプライン間の依存管理ができない

小規模でシンプルなワークフロー構築なら簡単に実現できて使い勝手が良い。

Author

rito

rito

  • Backend Engineer
  • Tokyo, Japan
  • PHP 5 技術者認定上級試験 認定者
  • 統計検定 3 級