BigQueryパイプラインでワークフロー作成|テーブル変換・スケジュール設定など
- 公開日
- カテゴリ:BigQuery
- タグ:BigQueryPipeline,自分用メモ

BigQuery でデータ分析基盤を構築する際、datalake 層に取り込んだデータを加工して dwh/dm 層を作成する「変換(Transform)」処理が必要になる。
この変換処理を UI ベースで簡単に設定できる BigQuery パイプライン を試す。
contents
BigQuery パイプライン
BigQueryパイプラインは、Dataform の技術をベースにした簡易版 UI ツール。
UIベースで dwh/dm テーブルの作成・依存関係管理・スケジュール実行ができる。
ETL でいうと 「T(Transform)」と「L(Load)」 を担当する。
E (Extract) → データソースからデータを取り出す
T (Transform) → データを加工・変換 ← ここ
L (Load) → 変換後のデータを格納 ← ここ
Dataform との関係
Dataform(コア技術)
├─→ BigQuery パイプライン(UIベース簡易版)
└─→ Dataform サービス(フル機能版、Terraform対応)
| 項目 | BigQueryパイプライン | Dataform |
|---|---|---|
| 設定方法 | UIのみ | コード(SQLX) + UI |
| Terraform | なし | あり |
| Git連携 | なし | あり |
| 用途 | 検証・簡易利用 | 本番運用 |
参考:
BigQuery パイプラインでできること
| 機能 | 内容 |
|---|---|
| SQLでテーブル作成 | SELECT文を書いて新しいテーブル/ビューを定義 |
| 依存関係の自動解決 | テーブル間の参照関係を自動検出、正しい順序で実行 |
| スケジュール実行 | 毎日/毎週など定時実行の設定 |
| メタデータ管理 | テーブル・カラムに説明を付与 |
参考:
1. パイプラインの作成
まず最初に、パイプラインを作成する。一つのワークフローのイメージ。この中にタスクを複数配置していく流れになる。
01. BigQuery コンソールの左メニュー「パイプライン」→「パイプラインを作成」をクリック。

02. コードアセットを保存するデフォルトのリージョンを選択。

03. 「BigQuery Pipelines へようこそ」ダイアログが表示される。

| オプション | 用途 |
|---|---|
| ユーザー認証情報で実行 [プレビュー] | 個人検証・開発時 |
| サービスアカウントで実行 | チーム開発・本番運用時 |
ユーザー認証でもスケジュール実行は可能。ただし、退職などでアカウントが削除されると全パイプラインが停止するリスクがある。本番運用ではサービスアカウントを使う。
今回は検証なので「ユーザー認証情報で実行」を選択。
上部に表示されているパイプライン名「無題のパイプライン」をクリックすると名前を設定できる。

参考:
2. タスクの作成
次に、ワークフロー内で実行するタスクを作成していく。
例えば、datalake層のデータから 2 つの dwh テーブルを作成したい場合、2つのタスクを作成するイメージ。
01. タスク追加
「タスクを追加」→「テーブル」を選択


タスクの種類:
| カテゴリ | 項目 | 用途 |
|---|---|---|
| ソースの追加 | ソースを宣言する | 既存テーブルをソースとして定義 |
| 構築 | テーブル | SQLで新しいテーブル/ビューを作成 |
| データの準備 | AIでデータクリーニング・変換 | |
| 検証 | データ品質テスト | データ品質チェック |
テーブル名を入力(例: order_summary)し、「作成」を押下すると、タスクが作成される。

02. タスク設定
タスクを作成したら、作成するテーブルのSQLなどを設定していく。
下部パネルの「開く」ボタン、またはタスクをダブルクリックでSQLエディタを開く。
001. メタデータ設定
config ブロックにテーブルの設定・説明・カラムの説明を記述する。
config {
type: "table",
name: "order_summary",
dataset: "dwh_sample",
description: "注文サマリー:ユーザー別・注文別の合計金額",
columns: {
order_id: "注文ID",
user_id: "ユーザーID",
user_name: "ユーザー名",
ordered_at: "注文日時",
total_amount: "合計金額"
}
}
| 項目 | 説明 |
|---|---|
| type | テーブルの種類(table / view) |
| name | 作成するテーブル名 |
| dataset | 保存先データセット |
| description | テーブルの説明(BigQueryに反映) |
| columns | 各カラムの説明(BigQueryに反映) |
002. SQLを入力
config ブロックの下にSQLを記述。
SELECT
o.order_id,
o.user_id,
u.name AS user_name,
o.ordered_at,
SUM(oi.quantity * oi.unit_price) AS total_amount
FROM dl_sample.orders o
JOIN dl_sample.users u ON o.user_id = u.user_id
JOIN dl_sample.order_items oi ON o.order_id = oi.order_id
GROUP BY 1, 2, 3, 4

003. タスクを実行(検証)
上部の「実行」ボタンでSQLの検証を実行できる。この時点ではテーブルは作成されない(検証のみ)。
004. パイプラインを実行(テーブル作成)
パイプライン画面に戻り、パイプラインの「実行」ボタン をクリック。
これでテーブルが作成される。メタデータ(テーブルの説明・カラムの説明)も反映される。

BigQueryパイプラインを用いてのテーブル作成を、ミニマムで実現させるまでの手順はここまで。
依存関係の確認
複数のテーブルを追加して、依存関係が自動検出されることを確認する。
作成するテーブル構成
[並列] order_summary ───┐
├─→ [直列] daily_sales_report
[並列] product_sales ───┘
| テーブル | 依存関係 | 実行タイミング |
|---|---|---|
| order_summary | dl層のみ参照 | 並列実行可 |
| product_sales | dl層のみ参照 | 並列実行可 |
| daily_sales_report | 上記2つを参照 | 上記完了後 |
依存関係の自動検出: ${ref()} 記法
パイプライン内の他テーブルを参照する場合、${ref("テーブル名")} を使う。
-- 直接指定(依存関係が自動検出されない)
FROM dwh_sample.order_summary os
-- ref記法(依存関係が自動検出される)
FROM ${ref("order_summary")} os
ref() を使うと:
- 依存関係グラフに矢印が自動追加される
- 実行順序が自動で制御される
daily_sales_report の例
config {
type: "table",
name: "daily_sales_report",
dataset: "dm_sample",
description: "日次売上レポート:注文サマリーと商品売上を結合",
columns: {
order_date: "注文日",
total_orders: "注文数",
total_revenue: "総売上",
top_category: "売上トップカテゴリ"
}
}
SELECT
DATE(os.ordered_at) AS order_date,
COUNT(DISTINCT os.order_id) AS total_orders,
SUM(os.total_amount) AS total_revenue,
(
SELECT ps.category
FROM ${ref("product_sales")} ps
ORDER BY ps.total_sales DESC
LIMIT 1
) AS top_category
FROM ${ref("order_summary")} os
GROUP BY 1
依存関係グラフ
パイプライン画面で、タスク間の依存関係が自動検出されて表示される。

order_summaryとproduct_salesは並列(矢印なし)daily_sales_reportは上記2つから矢印が来ている
スケジュール設定
パイプラインの定時自動実行を設定する。
スケジュール設定を開く
パイプライン画面上部の「スケジュール」をクリック。


スケジュールの頻度を設定

| 頻度 | 説明 |
|---|---|
| 毎時 | 1時間ごとに実行 |
| 毎日 | 1日1回、指定時刻に実行 |
| 毎週 | 週1回、指定曜日・時刻に実行 |
| 毎月 | 月1回、指定日・時刻に実行 |
| カスタム | cron形式で柔軟に指定 |
実行オプション
「優先度の高いインタラクティブ ジョブとして実行する」
| 設定 | 動作 |
|---|---|
| ON(デフォルト) | 即座に実行開始 |
| OFF | アイドル状態のリソースが空いたときに実行(バッチジョブ) |
通常はデフォルト(ON)のままでOK。コスト削減したい場合はOFFにする。
設定完了後、「スケジュールを作成」をクリックでスケジュールが設定される。
スケジュール設定の確認と実行確認
パイプライン画面から「スケジュールを見る」で設定を確認可能

実行数タブから、スケジュール実行されたことを確認できる。

参考:
まとめ
BigQuery パイプラインを使って、datalake 層のデータを加工して dwh/dm 層のテーブルを作成する流れを試した。
良い点:
- UIでサクッと設定できる
- メタデータ(テーブル・カラムの説明)も設定可能
ref()記法で依存関係を自動検出- スケジュール実行も設定可能
辛い点:
- 数が増えてきたら管理が煩雑になりそう
- 設定をコード管理できない
- パイプライン間の依存管理ができない
小規模でシンプルなワークフロー構築なら簡単に実現できて使い勝手が良い。

