Workflows で DTS + Dataform の ETL パイプラインをオーケストレーションする
- 公開日
- カテゴリ:Workflows
- タグ:Workflows,Terraform,BigQuery,DataTransferService,Dataform,自分用メモ

Google Cloud の Workflows は、サーバーレスのワークフローオーケストレーションサービス。YAML でワークフローを定義し、Google Cloud サービスへの API 呼び出しを順序制御・並列実行・エラーハンドリング付きで実行できる。ステップ数課金で、本構成のような日次1回のパイプラインであれば無料枠(月5,000内部ステップ)に収まる。
これまでの記事で DTS GCS コネクタ、DTS S3 コネクタ、Dataform をそれぞれ Terraform で構築してきた。本記事では、これらを Workflows で一連のパイプラインとしてオーケストレーションし、Cloud Scheduler でスケジュール実行する構成を Terraform で構築する。あわせて、Terraform のディレクトリ構成を modules/ + environments/ に整理し、DTS モジュールを for_each パターンにリファクタする。
contents
- 構成
- 前提
- サービスアカウントの設計
- ディレクトリ構成
- Terraform
- environments/dev/versions.tf
- environments/dev/locals.tf
- environments/dev/api.tf
- environments/dev/locals_dts_gcs.tf
- environments/dev/locals_dts_s3.tf
- environments/dev/main.tf
- modules/dts_gcs/main.tf
- modules/dts_gcs/variables.tf
- modules/dts_gcs/outputs.tf
- modules/dts_s3/main.tf
- modules/dts_s3/variables.tf
- modules/dts_s3/outputs.tf
- modules/workflows/service_account.tf
- modules/workflows/workflow.tf
- modules/workflows/scheduler.tf
- modules/workflows/variables.tf
- modules/workflows/outputs.tf
- modules/workflows/templates/workflow.yaml
- 適用
構成
Cloud Scheduler(cron: 毎日 09:00 JST)
│
↓ トリガー(scheduler-executor SA)
Cloud Workflows(etl-pipeline)
│ workflow-executor SA
│
├── parallel ─────────────────────────────────────┐
│ ├── gcs: DTS GCS 転送(stores) │
│ └── s3_parquet: DTS S3 転送 │
│ parallel(ネスト) │
│ ├── orders │
│ ├── products │
│ └── users │
├── 全て完了 ←────────────────────────────────────┘
│
├── Dataform CompilationResult 作成
├── Dataform WorkflowInvocation 作成
└── Dataform ポーリング(30秒間隔)→ 完了
DTS 転送は parallel ブロックで並列実行し、全完了後に Dataform を実行する。S3 Parquet のように複数テーブルの転送がある場合は parallel をネスト(最大2レベル)して並列実行する。
DTS コネクタと Dataform API の違い
| サービス | 連携方式 | 完了待ち |
|---|---|---|
| DTS | 組み込みコネクタ(googleapis.bigquerydatatransfer.v1) | LRO 自動待機(明示的なポーリング不要) |
| Dataform | REST API 直接呼び出し(http.post + auth.type: OAuth2) | 30秒間隔のポーリング |
DTS には組み込みコネクタがあり、startManualRuns を呼ぶだけで転送完了まで自動的にブロックする。Dataform にはコネクタが存在しないため、REST API で CompilationResult → WorkflowInvocation を作成し、ポーリングで完了を待つ。いずれも *.googleapis.com への呼び出しなので内部ステップとしてカウントされる。
エラーハンドリング
parallel
├── gcs: try/except → 成功(走り切る)
└── s3_parquet: try/except
├── orders: 成功
├── products: 失敗 → エラー記録
└── users: 成功(走り切る)
↓
check_transfer_errors: transfer_errors に1件以上
→ エラーログ出力 → raise で停止
→ Dataform には進まない
各並列分岐内に try/except を配置することで、1件失敗しても他の転送は走り切る。parallel のデフォルト動作では未ハンドルの例外が発生すると他の並列分岐がキャンセルされるため、try/except で必ず例外を捕捉する。全転送完了後にエラーの有無をチェックし、1件でもエラーがあれば Dataform には進まず停止する。
前提
- Google Cloud プロジェクトが作成済み
- Terraform がインストール済み
gcloudで認証済み- Dataform 環境を Terraform で構築する の構成が適用済み(SA
dataform-executorが存在する状態) - DTS GCS コネクタ、DTS S3 コネクタ の構成が適用済み
サービスアカウントの設計
3つの ServiceAccount(以下、SA) を用途ごとに分離する。
Cloud Scheduler
│
│ scheduler-executor(roles/workflows.invoker)
↓
Cloud Workflows
│
│ workflow-executor(roles/bigquery.admin, roles/dataform.editor, roles/logging.logWriter)
│ │
│ │ roles/iam.serviceAccountUser(dataform-executor に対して act-as)
│ ↓
├── DTS startManualRuns → dataform-executor が転送実行(GCS 読み取り、S3 認証)
└── Dataform WorkflowInvocation → dataform-executor が BigQuery 操作
| SA | 用途 | 権限 |
|---|---|---|
dataform-executor | DTS 転送実行、Dataform による BigQuery 操作 | roles/bigquery.user(プロジェクト)、roles/bigquery.dataViewer(dl_sample)、roles/bigquery.dataEditor(dwh_sample, dm_sample, dl_spreadsheet)、roles/storage.objectViewer(GCS バケット) |
workflow-executor | Workflows のワークフロー実行(DTS・Dataform API 呼び出し) | roles/bigquery.admin(DTS API)、roles/dataform.editor(Dataform API)、roles/logging.logWriter(sys.log)、roles/iam.serviceAccountUser(dataform-executor に対して) |
scheduler-executor | Cloud Scheduler からの Workflows 起動 | roles/workflows.invoker |
SA を分離する理由
dataform-executor に全権限を集約すると、「オーケストレーター」が「実行者」の権限も全て持つことになる。Workflows の YAML を変更できる人が、間接的に全データセットへのアクセス権を持つ。
SA を分離することで:
- 最小権限の原則: 各 SA が必要な権限だけを持つ。
workflow-executorは BigQuery のデータ自体にはアクセスできない - 影響範囲の限定:
workflow-executorが漏洩しても BigQuery のデータに直接アクセスできない - 監査のしやすさ: Cloud Audit Logs で「オーケストレーションの起動」と「データの操作」が別の SA として記録される
act-as チェック
Dataform の WorkflowInvocation 作成時、Workflows の SA(workflow-executor)が Dataform の実行 SA(dataform-executor)を借用する。この借用に iam.serviceAccounts.actAs 権限(roles/iam.serviceAccountUser)が必要。
この権限がない場合、ワークフロー実行ログに以下の WARNING が記録される。
Dry run of 'CreateWorkflowInvocation' for the configured service account ...
returned the following error message: The caller does not have permission
機能的には動作するが、WARNING を解消するために workflow-executor に roles/iam.serviceAccountUser を dataform-executor に対して付与する。
workflow-executor に bigquery.admin が必要な理由
DTS の startManualRuns(DTS の転送設定を即時実行する API メソッド)呼び出しには bigquery.transfers.update 権限が必要だが、この権限を含むロールは roles/bigquery.admin のみ。より狭い定義済みロールは存在しない。カスタムロールで bigquery.transfers.update のみを付与する選択肢もある。
ディレクトリ構成
Terraform のディレクトリ構成を modules/ + environments/ パターンに整理する。以下、HashiCorp の Standard Module Structure および Google Cloud の推奨に従った構成。
terraform/
├── modules/
│ ├── dataform/
│ │ ├── dataform.tf
│ │ ├── service_account.tf
│ │ ├── secret_manager.tf
│ │ ├── variables.tf
│ │ └── outputs.tf
│ │
│ ├── dts_gcs/
│ │ ├── main.tf
│ │ ├── variables.tf
│ │ └── outputs.tf
│ │
│ ├── dts_s3/
│ │ ├── main.tf
│ │ ├── variables.tf
│ │ └── outputs.tf
│ │
│ ├── spreadsheet/
│ │ └── ...
│ │
│ └── workflows/
│ ├── service_account.tf
│ ├── workflow.tf
│ ├── scheduler.tf
│ ├── variables.tf
│ ├── outputs.tf
│ └── templates/
│ └── workflow.yaml
│
└── environments/
└── dev/
├── main.tf # provider + module 呼び出し
├── locals.tf # project_id, region 等
├── versions.tf # provider バージョン制約
├── api.tf # google_project_service
├── locals_dts_gcs.tf # DTS GCS 転送グループ定義
├── locals_dts_s3.tf # DTS S3 転送グループ定義
├── dts_gcs_tables/ # GCS テーブルリスト(YAML)
│ └── some_project.yaml
└── dts_s3_tables/ # S3 テーブルリスト(YAML)
└── ec_app.yaml
modules/は再利用可能なリソース定義。環境に依存しないenvironments/は環境固有の値を定義し、modules/を呼び出す- prod を追加する場合は
environments/prod/を作るだけ terraform.tfvarsは不使用。機密値は Secret Manager、環境固有値はlocalsで管理
DTS テーブルリストの YAML 分離
転送対象のテーブル一覧は YAML ファイルに切り出し、yamldecode(file(...)) で読み込む。
# dts_s3_tables/ec_app.yaml
- orders
- products
- users
YAML ファイル名はデータソース名(DB 名やプロジェクト名)にする。接続先の設定(バケット名、データセット等)は .tf ファイルに明示的に記述する。テーブルの追加・削除時は YAML を編集するだけで .tf ファイルの変更は不要。
Terraform
environments/dev/versions.tf
terraform {
required_version = ">= 1.0"
required_providers {
google = {
source = "hashicorp/google"
version = "~> 6.0"
}
google-beta = {
source = "hashicorp/google-beta"
version = "~> 6.0"
}
}
}
environments/dev/locals.tf
locals {
project_id = "sample-etl-485309"
project_number = "771842124699"
region = "asia-northeast1"
github_repo_owner = "rito328"
github_repo_name = "sample-bigquery-etl"
}
environments/dev/api.tf
resource "google_project_service" "dataform" {
service = "dataform.googleapis.com"
disable_on_destroy = false
}
resource "google_project_service" "secret_manager" {
service = "secretmanager.googleapis.com"
disable_on_destroy = false
}
resource "google_project_service" "bigquery_data_transfer" {
service = "bigquerydatatransfer.googleapis.com"
disable_on_destroy = false
}
resource "google_project_service" "workflows" {
service = "workflows.googleapis.com"
disable_on_destroy = false
}
resource "google_project_service" "cloud_scheduler" {
service = "cloudscheduler.googleapis.com"
disable_on_destroy = false
}
environments/dev/locals_dts_gcs.tf
DTS GCS 転送グループの定義。tables は YAML ファイルから読み込む。
locals {
dts_gcs_transfer_groups = {
csv = {
bucket_name = "${local.project_id}-dts-source"
destination_dataset = "dl_gcs"
file_format = "CSV"
gcs_prefix = "csv"
file_extension = ".csv"
tables = yamldecode(file("${path.module}/dts_gcs_tables/some_project.yaml"))
}
}
}
environments/dev/locals_dts_s3.tf
locals {
dts_s3_transfer_groups = {
parquet = {
s3_bucket = "sample-etl-dts-source-202603"
aws_access_key_id_secret = "aws-dts-access-key-id"
aws_secret_access_key_secret = "aws-dts-secret-access-key"
destination_dataset = "dl_s3"
file_format = "PARQUET"
s3_prefix = "parquet"
file_extension = ".parquet"
tables = yamldecode(file("${path.module}/dts_s3_tables/ec_app.yaml"))
}
parquet_dated = {
s3_bucket = "sample-etl-dts-source-202603"
aws_access_key_id_secret = "aws-dts-access-key-id"
aws_secret_access_key_secret = "aws-dts-secret-access-key"
destination_dataset = "dl_s3"
file_format = "PARQUET"
s3_prefix = "parquet"
file_extension = "_{run_date}.parquet"
destination_table_suffix = "_parquet"
tables = ["stores"]
}
csv = {
s3_bucket = "sample-etl-dts-source-202603"
aws_access_key_id_secret = "aws-dts-access-key-id"
aws_secret_access_key_secret = "aws-dts-secret-access-key"
destination_dataset = "dl_s3"
file_format = "CSV"
s3_prefix = "csv"
file_extension = ".csv"
destination_table_suffix = "_csv"
tables = ["stores"]
}
}
}
S3 転送グループは3つ定義されている。
| グループ | 形式 | テーブル | 説明 |
|---|---|---|---|
parquet | Parquet | orders, products, users | ec_app.yaml から読み込み |
parquet_dated | Parquet | stores | {run_date} 付きパス |
csv | CSV | stores | CSV 形式 |
environments/dev/main.tf
provider "google" {
project = local.project_id
region = local.region
}
provider "google-beta" {
project = local.project_id
region = local.region
}
module "dataform" {
source = "../../modules/dataform"
project_id = local.project_id
project_number = local.project_number
region = local.region
github_repo_owner = local.github_repo_owner
github_repo_name = local.github_repo_name
}
module "spreadsheet" {
source = "../../modules/spreadsheet"
project_id = local.project_id
region = local.region
dataform_executor_email = module.dataform.dataform_executor_email
}
module "dts_gcs" {
source = "../../modules/dts_gcs"
region = local.region
dataform_executor_email = module.dataform.dataform_executor_email
transfer_groups = local.dts_gcs_transfer_groups
}
module "dts_s3" {
source = "../../modules/dts_s3"
region = local.region
dataform_executor_email = module.dataform.dataform_executor_email
transfer_groups = local.dts_s3_transfer_groups
}
module "workflows" {
source = "../../modules/workflows"
project_id = local.project_id
region = local.region
gcs_transfer_config_id = module.dts_gcs.transfer_config_ids["csv_stores"]
s3_parquet_transfer_config_ids = [
module.dts_s3.transfer_config_ids["parquet_orders"],
module.dts_s3.transfer_config_ids["parquet_products"],
module.dts_s3.transfer_config_ids["parquet_users"],
]
dataform_repository_name = module.dataform.repository_name
dataform_release_config_name = module.dataform.release_config_name
dataform_executor_email = module.dataform.dataform_executor_email
}
DTS モジュールの output は transfer_config_ids マップを返す。キーは {グループ名}_{テーブル名} の形式(例: csv_stores、parquet_orders)。Workflows モジュールは必要な転送設定 ID をこのマップから取得する。
modules/dts_gcs/main.tf
前回の記事から for_each + transfer_groups パターンにリファクタした。
locals {
transfers = merge([
for group_key, group in var.transfer_groups : {
for table in group.tables :
"${group_key}_${table}" => {
bucket_name = group.bucket_name
destination_dataset = group.destination_dataset
file_format = group.file_format
data_path_template = "gs://${group.bucket_name}/${group.gcs_prefix}/${table}${group.file_extension}"
destination_table = table
write_disposition = group.write_disposition
field_delimiter = group.field_delimiter
skip_leading_rows = group.skip_leading_rows
max_bad_records = group.max_bad_records
allow_quoted_newlines = group.allow_quoted_newlines
}
}
]...)
bucket_names = distinct([for g in var.transfer_groups : g.bucket_name])
}
# SA に GCS バケットへの読み取り権限を付与
resource "google_storage_bucket_iam_member" "dts_source_viewer" {
for_each = toset(local.bucket_names)
bucket = each.value
role = "roles/storage.objectViewer"
member = "serviceAccount:${var.dataform_executor_email}"
}
# DTS 転送設定: GCS → BigQuery
resource "google_bigquery_data_transfer_config" "gcs" {
for_each = local.transfers
display_name = each.key
location = var.region
data_source_id = "google_cloud_storage"
destination_dataset_id = each.value.destination_dataset
params = merge(
{
data_path_template = each.value.data_path_template
destination_table_name_template = each.value.destination_table
file_format = each.value.file_format
write_disposition = each.value.write_disposition
},
each.value.file_format == "CSV" ? {
field_delimiter = each.value.field_delimiter
skip_leading_rows = each.value.skip_leading_rows
max_bad_records = each.value.max_bad_records
allow_quoted_newlines = each.value.allow_quoted_newlines
} : {}
)
service_account_name = var.dataform_executor_email
}
transfer_groups を merge + ネストした for でフラットなマップに展開し、for_each で個別の転送設定を作成する。CSV 固有のパラメータ(field_delimiter 等)は file_format == "CSV" の場合のみ追加する。
modules/dts_gcs/variables.tf
variable "region" {
description = "デフォルトリージョン"
type = string
}
variable "dataform_executor_email" {
description = "Dataform 実行用サービスアカウントのメールアドレス"
type = string
}
variable "transfer_groups" {
description = "GCS 転送グループの定義"
type = map(object({
bucket_name = string
destination_dataset = string
file_format = string
gcs_prefix = string
file_extension = string
write_disposition = optional(string, "MIRROR")
tables = list(string)
skip_leading_rows = optional(string, "1")
field_delimiter = optional(string, ",")
max_bad_records = optional(string, "0")
allow_quoted_newlines = optional(string, "true")
}))
}
modules/dts_gcs/outputs.tf
output "transfer_config_ids" {
description = "転送設定 ID のマップ"
value = { for k, v in google_bigquery_data_transfer_config.gcs : k => v.name }
}
v.name は projects/{project_id}/locations/{region}/transferConfigs/{config_id} 形式のフルパス。Workflows で startManualRuns の parent に渡す値。
modules/dts_s3/main.tf
locals {
transfers = merge([
for group_key, group in var.transfer_groups : {
for table in group.tables :
"${group_key}_${table}" => {
s3_bucket = group.s3_bucket
aws_access_key_id_secret = group.aws_access_key_id_secret
aws_secret_access_key_secret = group.aws_secret_access_key_secret
destination_dataset = group.destination_dataset
file_format = group.file_format
data_path = "s3://${group.s3_bucket}/${group.s3_prefix}/${table}${group.file_extension}"
destination_table = "${table}${group.destination_table_suffix}"
field_delimiter = group.field_delimiter
skip_leading_rows = group.skip_leading_rows
max_bad_records = group.max_bad_records
}
}
]...)
secret_names = distinct(flatten([
for g in var.transfer_groups : [
g.aws_access_key_id_secret,
g.aws_secret_access_key_secret,
]
]))
}
# AWS 認証情報(Secret Manager から取得)
data "google_secret_manager_secret_version" "aws" {
for_each = toset(local.secret_names)
secret = each.value
}
# DTS 転送設定: S3 → BigQuery
resource "google_bigquery_data_transfer_config" "s3" {
for_each = local.transfers
display_name = each.key
location = var.region
data_source_id = "amazon_s3"
destination_dataset_id = each.value.destination_dataset
params = merge(
{
data_path = each.value.data_path
destination_table_name_template = each.value.destination_table
file_format = each.value.file_format
write_disposition = "WRITE_TRUNCATE"
access_key_id = data.google_secret_manager_secret_version.aws[each.value.aws_access_key_id_secret].secret_data
secret_access_key = data.google_secret_manager_secret_version.aws[each.value.aws_secret_access_key_secret].secret_data
},
each.value.file_format == "CSV" ? {
field_delimiter = each.value.field_delimiter
skip_leading_rows = each.value.skip_leading_rows
max_bad_records = each.value.max_bad_records
} : {}
)
service_account_name = var.dataform_executor_email
}
Secret Manager からの認証情報取得も for_each で動的に行う。secret_names で重複を排除し、同じシークレットを複数回取得しない。
modules/dts_s3/variables.tf
variable "region" {
description = "デフォルトリージョン"
type = string
}
variable "dataform_executor_email" {
description = "Dataform 実行用サービスアカウントのメールアドレス"
type = string
}
variable "transfer_groups" {
description = "S3 転送グループの定義(ソース単位)"
type = map(object({
s3_bucket = string
aws_access_key_id_secret = string
aws_secret_access_key_secret = string
destination_dataset = string
file_format = string
s3_prefix = string
file_extension = string
tables = list(string)
destination_table_suffix = optional(string, "")
field_delimiter = optional(string, ",")
skip_leading_rows = optional(string, "1")
max_bad_records = optional(string, "0")
}))
}
modules/dts_s3/outputs.tf
output "transfer_config_ids" {
description = "転送設定 ID のマップ"
value = { for k, v in google_bigquery_data_transfer_config.s3 : k => v.name }
}
modules/workflows/service_account.tf
# Workflows 実行用 SA(オーケストレーター)
resource "google_service_account" "workflow_executor" {
account_id = "workflow-executor"
display_name = "Workflow Executor"
description = "Cloud Workflows 実行用(DTS・Dataform API 呼び出し)"
}
# DTS API 呼び出し(bigquery.transfers.update を含む最小ロール)
resource "google_project_iam_member" "workflow_executor_bq_admin" {
project = var.project_id
role = "roles/bigquery.admin"
member = "serviceAccount:${google_service_account.workflow_executor.email}"
}
# Dataform API 呼び出し
resource "google_project_iam_member" "workflow_executor_dataform_editor" {
project = var.project_id
role = "roles/dataform.editor"
member = "serviceAccount:${google_service_account.workflow_executor.email}"
}
# sys.log による Cloud Logging 書き込み
resource "google_project_iam_member" "workflow_executor_log_writer" {
project = var.project_id
role = "roles/logging.logWriter"
member = "serviceAccount:${google_service_account.workflow_executor.email}"
}
# Dataform 実行用 SA の借用(CreateWorkflowInvocation 時の act-as チェック)
resource "google_service_account_iam_member" "workflow_executor_actas_dataform" {
service_account_id = "projects/${var.project_id}/serviceAccounts/${var.dataform_executor_email}"
role = "roles/iam.serviceAccountUser"
member = "serviceAccount:${google_service_account.workflow_executor.email}"
}
# Cloud Scheduler 用 SA(ワークフロー起動元)
resource "google_service_account" "scheduler_executor" {
account_id = "scheduler-executor"
display_name = "Scheduler Executor"
description = "Cloud Scheduler からの Workflows 起動用"
}
# Workflows 起動権限
resource "google_project_iam_member" "scheduler_executor_workflows_invoker" {
project = var.project_id
role = "roles/workflows.invoker"
member = "serviceAccount:${google_service_account.scheduler_executor.email}"
}
modules/workflows/workflow.tf
resource "google_workflows_workflow" "etl_pipeline" {
name = "etl-pipeline"
region = var.region
description = "DTS 並列転送 → Dataform 実行"
service_account = google_service_account.workflow_executor.id
source_contents = templatefile("${path.module}/templates/workflow.yaml", {
gcs_transfer_config_id = var.gcs_transfer_config_id
s3_parquet_transfer_config_ids = var.s3_parquet_transfer_config_ids
dataform_repository_name = var.dataform_repository_name
dataform_release_config_name = var.dataform_release_config_name
})
depends_on = [
google_project_iam_member.workflow_executor_bq_admin,
google_project_iam_member.workflow_executor_dataform_editor,
google_project_iam_member.workflow_executor_log_writer,
]
}
templatefile で YAML テンプレートに DTS 転送設定 ID や Dataform リポジトリ名を注入する。depends_on で IAM 付与完了後にワークフローを作成する。
modules/workflows/scheduler.tf
resource "google_cloud_scheduler_job" "trigger_etl_pipeline" {
name = "trigger-etl-pipeline"
region = var.region
schedule = "0 9 * * *" # JST 9:00
time_zone = "Asia/Tokyo"
http_target {
http_method = "POST"
uri = "https://workflowexecutions.googleapis.com/v1/${google_workflows_workflow.etl_pipeline.id}/executions"
oauth_token {
service_account_email = google_service_account.scheduler_executor.email
}
}
depends_on = [
google_project_iam_member.scheduler_executor_workflows_invoker,
]
}
Cloud Scheduler の HTTP ターゲットで Workflows Executions API を直接呼び出す。oauth_token で scheduler-executor SA の認証情報を使用する。
modules/workflows/variables.tf
variable "project_id" {
description = "GCP プロジェクト ID"
type = string
}
variable "region" {
description = "デフォルトリージョン"
type = string
}
variable "gcs_transfer_config_id" {
description = "GCS 転送設定のフルパス(projects/.../transferConfigs/...)"
type = string
}
variable "s3_parquet_transfer_config_ids" {
description = "S3 Parquet 転送設定のフルパスリスト"
type = list(string)
}
variable "dataform_repository_name" {
description = "Dataform リポジトリのフルパス"
type = string
}
variable "dataform_release_config_name" {
description = "Dataform リリース設定のフルパス"
type = string
}
variable "dataform_executor_email" {
description = "Dataform 実行用サービスアカウントのメールアドレス"
type = string
}
modules/workflows/outputs.tf
output "workflow_executor_email" {
value = google_service_account.workflow_executor.email
}
output "scheduler_executor_email" {
value = google_service_account.scheduler_executor.email
}
output "workflow_id" {
value = google_workflows_workflow.etl_pipeline.id
}
modules/workflows/templates/workflow.yaml
ワークフロー定義の全体。templatefile で ${...} は Terraform の変数展開、$${...} は Workflows のランタイム式として出力される。
main:
steps:
- init:
assign:
- gcs_config_id: "${gcs_transfer_config_id}"
- s3_parquet_config_ids:
%{ for id in s3_parquet_transfer_config_ids ~}
- "${id}"
%{ endfor ~}
- repository: "${dataform_repository_name}"
- release_config: "${dataform_release_config_name}"
- transfer_errors: []
# 1. DTS 転送を並列実行(各並列分岐内で try/except)
- run_all_transfers:
parallel:
shared: [transfer_errors]
branches:
- gcs:
steps:
- try_gcs:
try:
steps:
- start_gcs:
call: googleapis.bigquerydatatransfer.v1.projects.locations.transferConfigs.startManualRuns
args:
parent: $${gcs_config_id}
body:
requestedRunTime: $${time.format(sys.now())}
result: gcs_result
except:
as: e
steps:
- record_gcs_error:
assign:
- transfer_errors: $${list.concat(transfer_errors, ["gcs - " + json.encode_to_string(e)])}
- s3_parquet:
steps:
- try_s3:
try:
steps:
- start_s3_all:
parallel:
for:
value: config_id
in: $${s3_parquet_config_ids}
steps:
- start_s3:
call: googleapis.bigquerydatatransfer.v1.projects.locations.transferConfigs.startManualRuns
args:
parent: $${config_id}
body:
requestedRunTime: $${time.format(sys.now())}
except:
as: e
steps:
- record_s3_error:
assign:
- transfer_errors: $${list.concat(transfer_errors, ["s3_parquet - " + json.encode_to_string(e)])}
# 2. エラーチェック
- check_transfer_errors:
switch:
- condition: $${len(transfer_errors) > 0}
steps:
- log_transfer_errors:
call: sys.log
args:
text: '$${"Transfer errors - " + json.encode_to_string(transfer_errors)}'
severity: "ERROR"
- raise_transfer_errors:
raise: '$${"DTS transfer failed. " + string(len(transfer_errors)) + " error(s). See logs for details."}'
- log_transfers_done:
call: sys.log
args:
text: "All DTS transfers completed. Starting Dataform."
severity: "INFO"
# 3. Dataform CompilationResult 作成
- create_compilation:
call: http.post
args:
url: $${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"}
auth:
type: OAuth2
body:
releaseConfig: $${release_config}
result: compilation
# 4. Dataform WorkflowInvocation 作成(実行開始)
- create_invocation:
call: http.post
args:
url: $${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"}
auth:
type: OAuth2
body:
compilationResult: $${compilation.body.name}
result: invocation
- log_invocation_started:
call: sys.log
args:
text: '$${"Dataform invocation started - " + invocation.body.name}'
severity: "INFO"
# 5. Dataform ポーリングで完了待ち
- poll_dataform:
call: http.get
args:
url: $${"https://dataform.googleapis.com/v1beta1/" + invocation.body.name}
auth:
type: OAuth2
result: poll_result
- check_dataform_status:
switch:
- condition: $${poll_result.body.state == "SUCCEEDED"}
next: done
- condition: $${poll_result.body.state == "FAILED"}
raise: '$${"Dataform execution failed - " + json.encode_to_string(poll_result.body)}'
- condition: $${poll_result.body.state == "CANCELLED"}
raise: "Dataform execution cancelled"
- wait_before_poll:
call: sys.sleep
args:
seconds: 30
next: poll_dataform
- done:
call: sys.log
args:
text: "Pipeline completed successfully."
severity: "INFO"

ワークフローの処理フロー:
| ステップ | 処理 |
|---|---|
init | 変数初期化(転送設定 ID、Dataform リポジトリ名) |
run_all_transfers | DTS 転送を parallel で並列実行。GCS と S3 Parquet を並行して起動。S3 Parquet はさらにネストした parallel + for で全テーブル分を並列実行 |
check_transfer_errors | transfer_errors にエラーがあれば Cloud Logging に出力し raise で停止 |
create_compilation | Dataform の CompilationResult を作成(リリース設定を元にコンパイル) |
create_invocation | WorkflowInvocation を作成してワークフロー実行を開始 |
poll_dataform → check_dataform_status → wait_before_poll | 30秒間隔でポーリングし、SUCCEEDED / FAILED / CANCELLED を判定 |
templatefile のエスケープ
Workflows の YAML を templatefile で読み込む場合、Terraform の ${...} と Workflows の ${...} が衝突する。
| 記法 | 展開タイミング | 用途 |
|---|---|---|
${...} | terraform plan 時 | Terraform 変数の注入(転送設定 ID 等) |
$${...} | Workflows ランタイム | Workflows の式評価(sys.now() 等) |
file() で読み込む場合はこのエスケープは不要だが、templatefile で変数を注入する場合は $${...} と書く必要がある。
DTS コネクタの LRO 自動待機
googleapis.bigquerydatatransfer.v1.projects.locations.transferConfigs.startManualRuns を call で呼ぶと、コネクタが LRO(Long Running Operation)の完了を自動的にブロッキング待機する。明示的なポーリングコードは不要。デフォルトのタイムアウトは30分。
shared 変数
parallel ブロック内で書き込みが必要な変数は shared で明示的に宣言する。transfer_errors を shared に指定することで、各並列分岐からエラー情報を書き込める。shared に指定しない変数は読み取り専用。
適用
cd terraform/environments/dev
terraform init
terraform plan
terraform apply
workflows モジュールで作成されるリソースは以下。
| リソース | 説明 |
|---|---|
google_service_account.workflow_executor | Workflows 実行用 SA |
google_project_iam_member × 3 | workflow-executor の IAM(bigquery.admin, dataform.editor, logging.logWriter) |
google_service_account_iam_member | workflow-executor → dataform-executor の act-as 権限 |
google_service_account.scheduler_executor | Cloud Scheduler 用 SA |
google_project_iam_member | scheduler-executor の workflows.invoker |
google_workflows_workflow.etl_pipeline | ワークフロー定義 |
google_cloud_scheduler_job.trigger_etl_pipeline | スケジューラジョブ |
動作確認
apply 後、Cloud Scheduler の「強制実行」または Workflows コンソールからの手動実行でパイプライン全体が動作することを確認する。
- Cloud Scheduler コンソール →
trigger-etl-pipeline→ 「強制実行」 - Workflows コンソール →
etl-pipeline→ 実行状態を確認 - BigQuery で DTS の転送先テーブルにデータが入っていることを確認
- Dataform の実行履歴で SUCCEEDED を確認
まとめ
Terraform で作成したリソースの全体像。
terraform/
├── modules/
│ ├── dataform/
│ │ ├── dataform-executor SA + IAM
│ │ ├── Secret Manager(GitHub PAT 参照 + IAM)
│ │ └── Dataform リポジトリ + リリース設定 + ワークフロー設定
│ │
│ ├── dts_gcs/
│ │ ├── SA の GCS IAM(objectViewer)
│ │ └── DTS 転送設定(for_each)
│ │
│ ├── dts_s3/
│ │ ├── Secret Manager data source(AWS 認証情報)
│ │ └── DTS 転送設定(for_each)
│ │
│ ├── spreadsheet/
│ │ └── 外部テーブル + スケジュールドクエリ
│ │
│ └── workflows/
│ ├── workflow-executor SA + IAM
│ ├── scheduler-executor SA + IAM
│ ├── Workflows ワークフロー(etl-pipeline)
│ └── Cloud Scheduler ジョブ(trigger-etl-pipeline)
│
└── environments/
└── dev/
├── locals(project_id, region 等)
├── API 有効化(5 API)
├── DTS 転送グループ定義(locals + YAML テーブルリスト)
└── module 呼び出し(5 module)
検証で得た知見:
- DTS には組み込みコネクタがあり LRO 自動待機で転送完了を待てる。Dataform にはコネクタがなく REST API + ポーリングが必要
parallel内の未ハンドル例外は他の並列分岐をキャンセルする。各分岐内にtry/exceptを配置して対策するtemplatefileで Workflows YAML を読み込む場合、Workflows の式${...}は$${...}にエスケープするbigquery.transfers.updateを含むロールはroles/bigquery.adminのみ。DTS API 呼び出し用の最小ロールは存在しない- Dataform の
WorkflowInvocation作成には act-as チェックがあり、roles/iam.serviceAccountUserが必要 - DTS の
scheduleは Cloud Scheduler で一元管理する場合は不要(Terraform から削除) - Workflows の月間ステップ数は無料枠(5,000内部ステップ)に収まる

