Ritolabo
  1. Home
  2. GoogleCloud
  3. Workflows
  4. Workflows で DTS + Dataform の ETL パイプラインをオーケストレーションする

Workflows で DTS + Dataform の ETL パイプラインをオーケストレーションする

  • 公開日
  • カテゴリWorkflows
  • タグWorkflows,Terraform,BigQuery,DataTransferService,Dataform,自分用メモ
Workflows で DTS + Dataform の ETL パイプラインをオーケストレーションする

Google Cloud の Workflows は、サーバーレスのワークフローオーケストレーションサービス。YAML でワークフローを定義し、Google Cloud サービスへの API 呼び出しを順序制御・並列実行・エラーハンドリング付きで実行できる。ステップ数課金で、本構成のような日次1回のパイプラインであれば無料枠(月5,000内部ステップ)に収まる。

これまでの記事で DTS GCS コネクタDTS S3 コネクタDataform をそれぞれ Terraform で構築してきた。本記事では、これらを Workflows で一連のパイプラインとしてオーケストレーションし、Cloud Scheduler でスケジュール実行する構成を Terraform で構築する。あわせて、Terraform のディレクトリ構成を modules/ + environments/ に整理し、DTS モジュールを for_each パターンにリファクタする。

contents

  1. 構成
    1. DTS コネクタと Dataform API の違い
    2. エラーハンドリング
  2. 前提
  3. サービスアカウントの設計
    1. SA を分離する理由
    2. act-as チェック
    3. workflow-executor に bigquery.admin が必要な理由
  4. ディレクトリ構成
    1. DTS テーブルリストの YAML 分離
  5. Terraform
    1. environments/dev/versions.tf
    2. environments/dev/locals.tf
    3. environments/dev/api.tf
    4. environments/dev/locals_dts_gcs.tf
    5. environments/dev/locals_dts_s3.tf
    6. environments/dev/main.tf
    7. modules/dts_gcs/main.tf
    8. modules/dts_gcs/variables.tf
    9. modules/dts_gcs/outputs.tf
    10. modules/dts_s3/main.tf
    11. modules/dts_s3/variables.tf
    12. modules/dts_s3/outputs.tf
    13. modules/workflows/service_account.tf
    14. modules/workflows/workflow.tf
    15. modules/workflows/scheduler.tf
    16. modules/workflows/variables.tf
    17. modules/workflows/outputs.tf
    18. modules/workflows/templates/workflow.yaml
  6. 適用
    1. 動作確認

構成

Cloud Scheduler(cron: 毎日 09:00 JST)
  │
  ↓ トリガー(scheduler-executor SA)
Cloud Workflows(etl-pipeline)
  │ workflow-executor SA
  │
  ├── parallel ─────────────────────────────────────┐
  │   ├── gcs: DTS GCS 転送(stores)                │
  │   └── s3_parquet: DTS S3 転送                    │
  │         parallel(ネスト)                        │
  │           ├── orders                             │
  │           ├── products                           │
  │           └── users                              │
  ├── 全て完了 ←────────────────────────────────────┘
  │
  ├── Dataform CompilationResult 作成
  ├── Dataform WorkflowInvocation 作成
  └── Dataform ポーリング(30秒間隔)→ 完了

DTS 転送は parallel ブロックで並列実行し、全完了後に Dataform を実行する。S3 Parquet のように複数テーブルの転送がある場合は parallel をネスト(最大2レベル)して並列実行する。

DTS コネクタと Dataform API の違い

サービス連携方式完了待ち
DTS組み込みコネクタ(googleapis.bigquerydatatransfer.v1LRO 自動待機(明示的なポーリング不要)
DataformREST API 直接呼び出し(http.post + auth.type: OAuth230秒間隔のポーリング

DTS には組み込みコネクタがあり、startManualRuns を呼ぶだけで転送完了まで自動的にブロックする。Dataform にはコネクタが存在しないため、REST API で CompilationResult → WorkflowInvocation を作成し、ポーリングで完了を待つ。いずれも *.googleapis.com への呼び出しなので内部ステップとしてカウントされる。

エラーハンドリング

parallel
  ├── gcs: try/except → 成功(走り切る)
  └── s3_parquet: try/except
        ├── orders: 成功
        ├── products: 失敗 → エラー記録
        └── users: 成功(走り切る)
↓
check_transfer_errors: transfer_errors に1件以上
  → エラーログ出力 → raise で停止
  → Dataform には進まない

各並列分岐内に try/except を配置することで、1件失敗しても他の転送は走り切る。parallel のデフォルト動作では未ハンドルの例外が発生すると他の並列分岐がキャンセルされるため、try/except で必ず例外を捕捉する。全転送完了後にエラーの有無をチェックし、1件でもエラーがあれば Dataform には進まず停止する。

前提

サービスアカウントの設計

3つの ServiceAccount(以下、SA) を用途ごとに分離する。

Cloud Scheduler
  │
  │ scheduler-executor(roles/workflows.invoker)
  ↓
Cloud Workflows
  │
  │ workflow-executor(roles/bigquery.admin, roles/dataform.editor, roles/logging.logWriter)
  │     │
  │     │ roles/iam.serviceAccountUser(dataform-executor に対して act-as)
  │     ↓
  ├── DTS startManualRuns → dataform-executor が転送実行(GCS 読み取り、S3 認証)
  └── Dataform WorkflowInvocation → dataform-executor が BigQuery 操作
SA用途権限
dataform-executorDTS 転送実行、Dataform による BigQuery 操作roles/bigquery.user(プロジェクト)、roles/bigquery.dataViewer(dl_sample)、roles/bigquery.dataEditor(dwh_sample, dm_sample, dl_spreadsheet)、roles/storage.objectViewer(GCS バケット)
workflow-executorWorkflows のワークフロー実行(DTS・Dataform API 呼び出し)roles/bigquery.admin(DTS API)、roles/dataform.editor(Dataform API)、roles/logging.logWriter(sys.log)、roles/iam.serviceAccountUser(dataform-executor に対して)
scheduler-executorCloud Scheduler からの Workflows 起動roles/workflows.invoker

SA を分離する理由

dataform-executor に全権限を集約すると、「オーケストレーター」が「実行者」の権限も全て持つことになる。Workflows の YAML を変更できる人が、間接的に全データセットへのアクセス権を持つ。

SA を分離することで:

  • 最小権限の原則: 各 SA が必要な権限だけを持つ。workflow-executor は BigQuery のデータ自体にはアクセスできない
  • 影響範囲の限定: workflow-executor が漏洩しても BigQuery のデータに直接アクセスできない
  • 監査のしやすさ: Cloud Audit Logs で「オーケストレーションの起動」と「データの操作」が別の SA として記録される

act-as チェック

Dataform の WorkflowInvocation 作成時、Workflows の SA(workflow-executor)が Dataform の実行 SA(dataform-executor)を借用する。この借用に iam.serviceAccounts.actAs 権限(roles/iam.serviceAccountUser)が必要。

この権限がない場合、ワークフロー実行ログに以下の WARNING が記録される。

Dry run of 'CreateWorkflowInvocation' for the configured service account ...
returned the following error message: The caller does not have permission

機能的には動作するが、WARNING を解消するために workflow-executorroles/iam.serviceAccountUserdataform-executor に対して付与する。

workflow-executor に bigquery.admin が必要な理由

DTS の startManualRuns(DTS の転送設定を即時実行する API メソッド)呼び出しには bigquery.transfers.update 権限が必要だが、この権限を含むロールは roles/bigquery.admin のみ。より狭い定義済みロールは存在しない。カスタムロールで bigquery.transfers.update のみを付与する選択肢もある。

ディレクトリ構成

Terraform のディレクトリ構成を modules/ + environments/ パターンに整理する。以下、HashiCorp の Standard Module Structure および Google Cloud の推奨に従った構成。

terraform/
├── modules/
│   ├── dataform/
│   │   ├── dataform.tf
│   │   ├── service_account.tf
│   │   ├── secret_manager.tf
│   │   ├── variables.tf
│   │   └── outputs.tf
│   │
│   ├── dts_gcs/
│   │   ├── main.tf
│   │   ├── variables.tf
│   │   └── outputs.tf
│   │
│   ├── dts_s3/
│   │   ├── main.tf
│   │   ├── variables.tf
│   │   └── outputs.tf
│   │
│   ├── spreadsheet/
│   │   └── ...
│   │
│   └── workflows/
│       ├── service_account.tf
│       ├── workflow.tf
│       ├── scheduler.tf
│       ├── variables.tf
│       ├── outputs.tf
│       └── templates/
│           └── workflow.yaml
│
└── environments/
    └── dev/
        ├── main.tf               # provider + module 呼び出し
        ├── locals.tf             # project_id, region 等
        ├── versions.tf           # provider バージョン制約
        ├── api.tf                # google_project_service
        ├── locals_dts_gcs.tf     # DTS GCS 転送グループ定義
        ├── locals_dts_s3.tf      # DTS S3 転送グループ定義
        ├── dts_gcs_tables/       # GCS テーブルリスト(YAML)
        │   └── some_project.yaml
        └── dts_s3_tables/        # S3 テーブルリスト(YAML)
            └── ec_app.yaml
  • modules/ は再利用可能なリソース定義。環境に依存しない
  • environments/ は環境固有の値を定義し、modules/ を呼び出す
  • prod を追加する場合は environments/prod/ を作るだけ
  • terraform.tfvars は不使用。機密値は Secret Manager、環境固有値は locals で管理

DTS テーブルリストの YAML 分離

転送対象のテーブル一覧は YAML ファイルに切り出し、yamldecode(file(...)) で読み込む。

# dts_s3_tables/ec_app.yaml
- orders
- products
- users

YAML ファイル名はデータソース名(DB 名やプロジェクト名)にする。接続先の設定(バケット名、データセット等)は .tf ファイルに明示的に記述する。テーブルの追加・削除時は YAML を編集するだけで .tf ファイルの変更は不要。

Terraform

environments/dev/versions.tf

terraform {
  required_version = ">= 1.0"

  required_providers {
    google = {
      source  = "hashicorp/google"
      version = "~> 6.0"
    }
    google-beta = {
      source  = "hashicorp/google-beta"
      version = "~> 6.0"
    }
  }
}

environments/dev/locals.tf

locals {
  project_id        = "sample-etl-485309"
  project_number    = "771842124699"
  region            = "asia-northeast1"
  github_repo_owner = "rito328"
  github_repo_name  = "sample-bigquery-etl"
}

environments/dev/api.tf

resource "google_project_service" "dataform" {
  service            = "dataform.googleapis.com"
  disable_on_destroy = false
}

resource "google_project_service" "secret_manager" {
  service            = "secretmanager.googleapis.com"
  disable_on_destroy = false
}

resource "google_project_service" "bigquery_data_transfer" {
  service            = "bigquerydatatransfer.googleapis.com"
  disable_on_destroy = false
}

resource "google_project_service" "workflows" {
  service            = "workflows.googleapis.com"
  disable_on_destroy = false
}

resource "google_project_service" "cloud_scheduler" {
  service            = "cloudscheduler.googleapis.com"
  disable_on_destroy = false
}

environments/dev/locals_dts_gcs.tf

DTS GCS 転送グループの定義。tables は YAML ファイルから読み込む。

locals {
  dts_gcs_transfer_groups = {
    csv = {
      bucket_name         = "${local.project_id}-dts-source"
      destination_dataset = "dl_gcs"
      file_format         = "CSV"
      gcs_prefix          = "csv"
      file_extension      = ".csv"
      tables              = yamldecode(file("${path.module}/dts_gcs_tables/some_project.yaml"))
    }
  }
}

environments/dev/locals_dts_s3.tf

locals {
  dts_s3_transfer_groups = {
    parquet = {
      s3_bucket                    = "sample-etl-dts-source-202603"
      aws_access_key_id_secret     = "aws-dts-access-key-id"
      aws_secret_access_key_secret = "aws-dts-secret-access-key"
      destination_dataset          = "dl_s3"
      file_format                  = "PARQUET"
      s3_prefix                    = "parquet"
      file_extension               = ".parquet"
      tables                       = yamldecode(file("${path.module}/dts_s3_tables/ec_app.yaml"))
    }
    parquet_dated = {
      s3_bucket                    = "sample-etl-dts-source-202603"
      aws_access_key_id_secret     = "aws-dts-access-key-id"
      aws_secret_access_key_secret = "aws-dts-secret-access-key"
      destination_dataset          = "dl_s3"
      file_format                  = "PARQUET"
      s3_prefix                    = "parquet"
      file_extension               = "_{run_date}.parquet"
      destination_table_suffix     = "_parquet"
      tables                       = ["stores"]
    }
    csv = {
      s3_bucket                    = "sample-etl-dts-source-202603"
      aws_access_key_id_secret     = "aws-dts-access-key-id"
      aws_secret_access_key_secret = "aws-dts-secret-access-key"
      destination_dataset          = "dl_s3"
      file_format                  = "CSV"
      s3_prefix                    = "csv"
      file_extension               = ".csv"
      destination_table_suffix     = "_csv"
      tables                       = ["stores"]
    }
  }
}

S3 転送グループは3つ定義されている。

グループ形式テーブル説明
parquetParquetorders, products, usersec_app.yaml から読み込み
parquet_datedParquetstores{run_date} 付きパス
csvCSVstoresCSV 形式

environments/dev/main.tf

provider "google" {
  project = local.project_id
  region  = local.region
}

provider "google-beta" {
  project = local.project_id
  region  = local.region
}

module "dataform" {
  source            = "../../modules/dataform"
  project_id        = local.project_id
  project_number    = local.project_number
  region            = local.region
  github_repo_owner = local.github_repo_owner
  github_repo_name  = local.github_repo_name
}

module "spreadsheet" {
  source                  = "../../modules/spreadsheet"
  project_id              = local.project_id
  region                  = local.region
  dataform_executor_email = module.dataform.dataform_executor_email
}

module "dts_gcs" {
  source                  = "../../modules/dts_gcs"
  region                  = local.region
  dataform_executor_email = module.dataform.dataform_executor_email
  transfer_groups         = local.dts_gcs_transfer_groups
}

module "dts_s3" {
  source                  = "../../modules/dts_s3"
  region                  = local.region
  dataform_executor_email = module.dataform.dataform_executor_email
  transfer_groups         = local.dts_s3_transfer_groups
}

module "workflows" {
  source                         = "../../modules/workflows"
  project_id                     = local.project_id
  region                         = local.region
  gcs_transfer_config_id         = module.dts_gcs.transfer_config_ids["csv_stores"]
  s3_parquet_transfer_config_ids = [
    module.dts_s3.transfer_config_ids["parquet_orders"],
    module.dts_s3.transfer_config_ids["parquet_products"],
    module.dts_s3.transfer_config_ids["parquet_users"],
  ]
  dataform_repository_name       = module.dataform.repository_name
  dataform_release_config_name   = module.dataform.release_config_name
  dataform_executor_email        = module.dataform.dataform_executor_email
}

DTS モジュールの output は transfer_config_ids マップを返す。キーは {グループ名}_{テーブル名} の形式(例: csv_storesparquet_orders)。Workflows モジュールは必要な転送設定 ID をこのマップから取得する。

modules/dts_gcs/main.tf

前回の記事から for_each + transfer_groups パターンにリファクタした。

locals {
  transfers = merge([
    for group_key, group in var.transfer_groups : {
      for table in group.tables :
      "${group_key}_${table}" => {
        bucket_name           = group.bucket_name
        destination_dataset   = group.destination_dataset
        file_format           = group.file_format
        data_path_template    = "gs://${group.bucket_name}/${group.gcs_prefix}/${table}${group.file_extension}"
        destination_table     = table
        write_disposition     = group.write_disposition
        field_delimiter       = group.field_delimiter
        skip_leading_rows     = group.skip_leading_rows
        max_bad_records       = group.max_bad_records
        allow_quoted_newlines = group.allow_quoted_newlines
      }
    }
  ]...)

  bucket_names = distinct([for g in var.transfer_groups : g.bucket_name])
}

# SA に GCS バケットへの読み取り権限を付与
resource "google_storage_bucket_iam_member" "dts_source_viewer" {
  for_each = toset(local.bucket_names)
  bucket   = each.value
  role     = "roles/storage.objectViewer"
  member   = "serviceAccount:${var.dataform_executor_email}"
}

# DTS 転送設定: GCS → BigQuery
resource "google_bigquery_data_transfer_config" "gcs" {
  for_each = local.transfers

  display_name           = each.key
  location               = var.region
  data_source_id         = "google_cloud_storage"
  destination_dataset_id = each.value.destination_dataset

  params = merge(
    {
      data_path_template              = each.value.data_path_template
      destination_table_name_template = each.value.destination_table
      file_format                     = each.value.file_format
      write_disposition               = each.value.write_disposition
    },
    each.value.file_format == "CSV" ? {
      field_delimiter       = each.value.field_delimiter
      skip_leading_rows     = each.value.skip_leading_rows
      max_bad_records       = each.value.max_bad_records
      allow_quoted_newlines = each.value.allow_quoted_newlines
    } : {}
  )

  service_account_name = var.dataform_executor_email
}

transfer_groupsmerge + ネストした for でフラットなマップに展開し、for_each で個別の転送設定を作成する。CSV 固有のパラメータ(field_delimiter 等)は file_format == "CSV" の場合のみ追加する。

modules/dts_gcs/variables.tf

variable "region" {
  description = "デフォルトリージョン"
  type        = string
}

variable "dataform_executor_email" {
  description = "Dataform 実行用サービスアカウントのメールアドレス"
  type        = string
}

variable "transfer_groups" {
  description = "GCS 転送グループの定義"
  type = map(object({
    bucket_name          = string
    destination_dataset  = string
    file_format          = string
    gcs_prefix           = string
    file_extension       = string
    write_disposition    = optional(string, "MIRROR")
    tables               = list(string)
    skip_leading_rows    = optional(string, "1")
    field_delimiter      = optional(string, ",")
    max_bad_records      = optional(string, "0")
    allow_quoted_newlines = optional(string, "true")
  }))
}

modules/dts_gcs/outputs.tf

output "transfer_config_ids" {
  description = "転送設定 ID のマップ"
  value       = { for k, v in google_bigquery_data_transfer_config.gcs : k => v.name }
}

v.nameprojects/{project_id}/locations/{region}/transferConfigs/{config_id} 形式のフルパス。Workflows で startManualRunsparent に渡す値。

modules/dts_s3/main.tf

locals {
  transfers = merge([
    for group_key, group in var.transfer_groups : {
      for table in group.tables :
      "${group_key}_${table}" => {
        s3_bucket                    = group.s3_bucket
        aws_access_key_id_secret     = group.aws_access_key_id_secret
        aws_secret_access_key_secret = group.aws_secret_access_key_secret
        destination_dataset          = group.destination_dataset
        file_format                  = group.file_format
        data_path                    = "s3://${group.s3_bucket}/${group.s3_prefix}/${table}${group.file_extension}"
        destination_table            = "${table}${group.destination_table_suffix}"
        field_delimiter              = group.field_delimiter
        skip_leading_rows            = group.skip_leading_rows
        max_bad_records              = group.max_bad_records
      }
    }
  ]...)

  secret_names = distinct(flatten([
    for g in var.transfer_groups : [
      g.aws_access_key_id_secret,
      g.aws_secret_access_key_secret,
    ]
  ]))
}

# AWS 認証情報(Secret Manager から取得)
data "google_secret_manager_secret_version" "aws" {
  for_each = toset(local.secret_names)
  secret   = each.value
}

# DTS 転送設定: S3 → BigQuery
resource "google_bigquery_data_transfer_config" "s3" {
  for_each = local.transfers

  display_name           = each.key
  location               = var.region
  data_source_id         = "amazon_s3"
  destination_dataset_id = each.value.destination_dataset

  params = merge(
    {
      data_path                       = each.value.data_path
      destination_table_name_template = each.value.destination_table
      file_format                     = each.value.file_format
      write_disposition               = "WRITE_TRUNCATE"
      access_key_id                   = data.google_secret_manager_secret_version.aws[each.value.aws_access_key_id_secret].secret_data
      secret_access_key               = data.google_secret_manager_secret_version.aws[each.value.aws_secret_access_key_secret].secret_data
    },
    each.value.file_format == "CSV" ? {
      field_delimiter   = each.value.field_delimiter
      skip_leading_rows = each.value.skip_leading_rows
      max_bad_records   = each.value.max_bad_records
    } : {}
  )

  service_account_name = var.dataform_executor_email
}

Secret Manager からの認証情報取得も for_each で動的に行う。secret_names で重複を排除し、同じシークレットを複数回取得しない。

modules/dts_s3/variables.tf

variable "region" {
  description = "デフォルトリージョン"
  type        = string
}

variable "dataform_executor_email" {
  description = "Dataform 実行用サービスアカウントのメールアドレス"
  type        = string
}

variable "transfer_groups" {
  description = "S3 転送グループの定義(ソース単位)"
  type = map(object({
    s3_bucket                    = string
    aws_access_key_id_secret     = string
    aws_secret_access_key_secret = string
    destination_dataset          = string
    file_format                  = string
    s3_prefix                    = string
    file_extension               = string
    tables                       = list(string)
    destination_table_suffix     = optional(string, "")
    field_delimiter              = optional(string, ",")
    skip_leading_rows            = optional(string, "1")
    max_bad_records              = optional(string, "0")
  }))
}

modules/dts_s3/outputs.tf

output "transfer_config_ids" {
  description = "転送設定 ID のマップ"
  value       = { for k, v in google_bigquery_data_transfer_config.s3 : k => v.name }
}

modules/workflows/service_account.tf

# Workflows 実行用 SA(オーケストレーター)
resource "google_service_account" "workflow_executor" {
  account_id   = "workflow-executor"
  display_name = "Workflow Executor"
  description  = "Cloud Workflows 実行用(DTS・Dataform API 呼び出し)"
}

# DTS API 呼び出し(bigquery.transfers.update を含む最小ロール)
resource "google_project_iam_member" "workflow_executor_bq_admin" {
  project = var.project_id
  role    = "roles/bigquery.admin"
  member  = "serviceAccount:${google_service_account.workflow_executor.email}"
}

# Dataform API 呼び出し
resource "google_project_iam_member" "workflow_executor_dataform_editor" {
  project = var.project_id
  role    = "roles/dataform.editor"
  member  = "serviceAccount:${google_service_account.workflow_executor.email}"
}

# sys.log による Cloud Logging 書き込み
resource "google_project_iam_member" "workflow_executor_log_writer" {
  project = var.project_id
  role    = "roles/logging.logWriter"
  member  = "serviceAccount:${google_service_account.workflow_executor.email}"
}

# Dataform 実行用 SA の借用(CreateWorkflowInvocation 時の act-as チェック)
resource "google_service_account_iam_member" "workflow_executor_actas_dataform" {
  service_account_id = "projects/${var.project_id}/serviceAccounts/${var.dataform_executor_email}"
  role               = "roles/iam.serviceAccountUser"
  member             = "serviceAccount:${google_service_account.workflow_executor.email}"
}

# Cloud Scheduler 用 SA(ワークフロー起動元)
resource "google_service_account" "scheduler_executor" {
  account_id   = "scheduler-executor"
  display_name = "Scheduler Executor"
  description  = "Cloud Scheduler からの Workflows 起動用"
}

# Workflows 起動権限
resource "google_project_iam_member" "scheduler_executor_workflows_invoker" {
  project = var.project_id
  role    = "roles/workflows.invoker"
  member  = "serviceAccount:${google_service_account.scheduler_executor.email}"
}

modules/workflows/workflow.tf

resource "google_workflows_workflow" "etl_pipeline" {
  name            = "etl-pipeline"
  region          = var.region
  description     = "DTS 並列転送 → Dataform 実行"
  service_account = google_service_account.workflow_executor.id

  source_contents = templatefile("${path.module}/templates/workflow.yaml", {
    gcs_transfer_config_id         = var.gcs_transfer_config_id
    s3_parquet_transfer_config_ids = var.s3_parquet_transfer_config_ids
    dataform_repository_name       = var.dataform_repository_name
    dataform_release_config_name   = var.dataform_release_config_name
  })

  depends_on = [
    google_project_iam_member.workflow_executor_bq_admin,
    google_project_iam_member.workflow_executor_dataform_editor,
    google_project_iam_member.workflow_executor_log_writer,
  ]
}

templatefile で YAML テンプレートに DTS 転送設定 ID や Dataform リポジトリ名を注入する。depends_on で IAM 付与完了後にワークフローを作成する。

modules/workflows/scheduler.tf

resource "google_cloud_scheduler_job" "trigger_etl_pipeline" {
  name      = "trigger-etl-pipeline"
  region    = var.region
  schedule  = "0 9 * * *" # JST 9:00
  time_zone = "Asia/Tokyo"

  http_target {
    http_method = "POST"
    uri         = "https://workflowexecutions.googleapis.com/v1/${google_workflows_workflow.etl_pipeline.id}/executions"

    oauth_token {
      service_account_email = google_service_account.scheduler_executor.email
    }
  }

  depends_on = [
    google_project_iam_member.scheduler_executor_workflows_invoker,
  ]
}

Cloud Scheduler の HTTP ターゲットで Workflows Executions API を直接呼び出す。oauth_tokenscheduler-executor SA の認証情報を使用する。

modules/workflows/variables.tf

variable "project_id" {
  description = "GCP プロジェクト ID"
  type        = string
}

variable "region" {
  description = "デフォルトリージョン"
  type        = string
}

variable "gcs_transfer_config_id" {
  description = "GCS 転送設定のフルパス(projects/.../transferConfigs/...)"
  type        = string
}

variable "s3_parquet_transfer_config_ids" {
  description = "S3 Parquet 転送設定のフルパスリスト"
  type        = list(string)
}

variable "dataform_repository_name" {
  description = "Dataform リポジトリのフルパス"
  type        = string
}

variable "dataform_release_config_name" {
  description = "Dataform リリース設定のフルパス"
  type        = string
}

variable "dataform_executor_email" {
  description = "Dataform 実行用サービスアカウントのメールアドレス"
  type        = string
}

modules/workflows/outputs.tf

output "workflow_executor_email" {
  value = google_service_account.workflow_executor.email
}

output "scheduler_executor_email" {
  value = google_service_account.scheduler_executor.email
}

output "workflow_id" {
  value = google_workflows_workflow.etl_pipeline.id
}

modules/workflows/templates/workflow.yaml

ワークフロー定義の全体。templatefile${...} は Terraform の変数展開、$${...} は Workflows のランタイム式として出力される。

main:
  steps:
    - init:
        assign:
          - gcs_config_id: "${gcs_transfer_config_id}"
          - s3_parquet_config_ids:
%{ for id in s3_parquet_transfer_config_ids ~}
              - "${id}"
%{ endfor ~}
          - repository: "${dataform_repository_name}"
          - release_config: "${dataform_release_config_name}"
          - transfer_errors: []

    # 1. DTS 転送を並列実行(各並列分岐内で try/except)
    - run_all_transfers:
        parallel:
          shared: [transfer_errors]
          branches:
            - gcs:
                steps:
                  - try_gcs:
                      try:
                        steps:
                          - start_gcs:
                              call: googleapis.bigquerydatatransfer.v1.projects.locations.transferConfigs.startManualRuns
                              args:
                                parent: $${gcs_config_id}
                                body:
                                  requestedRunTime: $${time.format(sys.now())}
                              result: gcs_result
                      except:
                        as: e
                        steps:
                          - record_gcs_error:
                              assign:
                                - transfer_errors: $${list.concat(transfer_errors, ["gcs - " + json.encode_to_string(e)])}

            - s3_parquet:
                steps:
                  - try_s3:
                      try:
                        steps:
                          - start_s3_all:
                              parallel:
                                for:
                                  value: config_id
                                  in: $${s3_parquet_config_ids}
                                  steps:
                                    - start_s3:
                                        call: googleapis.bigquerydatatransfer.v1.projects.locations.transferConfigs.startManualRuns
                                        args:
                                          parent: $${config_id}
                                          body:
                                            requestedRunTime: $${time.format(sys.now())}
                      except:
                        as: e
                        steps:
                          - record_s3_error:
                              assign:
                                - transfer_errors: $${list.concat(transfer_errors, ["s3_parquet - " + json.encode_to_string(e)])}

    # 2. エラーチェック
    - check_transfer_errors:
        switch:
          - condition: $${len(transfer_errors) > 0}
            steps:
              - log_transfer_errors:
                  call: sys.log
                  args:
                    text: '$${"Transfer errors - " + json.encode_to_string(transfer_errors)}'
                    severity: "ERROR"
              - raise_transfer_errors:
                  raise: '$${"DTS transfer failed. " + string(len(transfer_errors)) + " error(s). See logs for details."}'

    - log_transfers_done:
        call: sys.log
        args:
          text: "All DTS transfers completed. Starting Dataform."
          severity: "INFO"

    # 3. Dataform CompilationResult 作成
    - create_compilation:
        call: http.post
        args:
          url: $${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"}
          auth:
            type: OAuth2
          body:
            releaseConfig: $${release_config}
        result: compilation

    # 4. Dataform WorkflowInvocation 作成(実行開始)
    - create_invocation:
        call: http.post
        args:
          url: $${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"}
          auth:
            type: OAuth2
          body:
            compilationResult: $${compilation.body.name}
        result: invocation

    - log_invocation_started:
        call: sys.log
        args:
          text: '$${"Dataform invocation started - " + invocation.body.name}'
          severity: "INFO"

    # 5. Dataform ポーリングで完了待ち
    - poll_dataform:
        call: http.get
        args:
          url: $${"https://dataform.googleapis.com/v1beta1/" + invocation.body.name}
          auth:
            type: OAuth2
        result: poll_result

    - check_dataform_status:
        switch:
          - condition: $${poll_result.body.state == "SUCCEEDED"}
            next: done
          - condition: $${poll_result.body.state == "FAILED"}
            raise: '$${"Dataform execution failed - " + json.encode_to_string(poll_result.body)}'
          - condition: $${poll_result.body.state == "CANCELLED"}
            raise: "Dataform execution cancelled"

    - wait_before_poll:
        call: sys.sleep
        args:
          seconds: 30
        next: poll_dataform

    - done:
        call: sys.log
        args:
          text: "Pipeline completed successfully."
          severity: "INFO"
ワークフロー

ワークフローの処理フロー:

ステップ処理
init変数初期化(転送設定 ID、Dataform リポジトリ名)
run_all_transfersDTS 転送を parallel で並列実行。GCS と S3 Parquet を並行して起動。S3 Parquet はさらにネストした parallel + for で全テーブル分を並列実行
check_transfer_errorstransfer_errors にエラーがあれば Cloud Logging に出力し raise で停止
create_compilationDataform の CompilationResult を作成(リリース設定を元にコンパイル)
create_invocationWorkflowInvocation を作成してワークフロー実行を開始
poll_dataformcheck_dataform_statuswait_before_poll30秒間隔でポーリングし、SUCCEEDED / FAILED / CANCELLED を判定

templatefile のエスケープ

Workflows の YAML を templatefile で読み込む場合、Terraform の ${...} と Workflows の ${...} が衝突する。

記法展開タイミング用途
${...}terraform planTerraform 変数の注入(転送設定 ID 等)
$${...}Workflows ランタイムWorkflows の式評価(sys.now() 等)

file() で読み込む場合はこのエスケープは不要だが、templatefile で変数を注入する場合は $${...} と書く必要がある。

DTS コネクタの LRO 自動待機

googleapis.bigquerydatatransfer.v1.projects.locations.transferConfigs.startManualRunscall で呼ぶと、コネクタが LRO(Long Running Operation)の完了を自動的にブロッキング待機する。明示的なポーリングコードは不要。デフォルトのタイムアウトは30分。

shared 変数

parallel ブロック内で書き込みが必要な変数は shared で明示的に宣言する。transfer_errorsshared に指定することで、各並列分岐からエラー情報を書き込める。shared に指定しない変数は読み取り専用。

適用

cd terraform/environments/dev

terraform init
terraform plan
terraform apply

workflows モジュールで作成されるリソースは以下。

リソース説明
google_service_account.workflow_executorWorkflows 実行用 SA
google_project_iam_member × 3workflow-executor の IAM(bigquery.admin, dataform.editor, logging.logWriter)
google_service_account_iam_memberworkflow-executor → dataform-executor の act-as 権限
google_service_account.scheduler_executorCloud Scheduler 用 SA
google_project_iam_memberscheduler-executor の workflows.invoker
google_workflows_workflow.etl_pipelineワークフロー定義
google_cloud_scheduler_job.trigger_etl_pipelineスケジューラジョブ

動作確認

apply 後、Cloud Scheduler の「強制実行」または Workflows コンソールからの手動実行でパイプライン全体が動作することを確認する。

  1. Cloud Scheduler コンソール → trigger-etl-pipeline → 「強制実行」
  2. Workflows コンソール → etl-pipeline → 実行状態を確認
  3. BigQuery で DTS の転送先テーブルにデータが入っていることを確認
  4. Dataform の実行履歴で SUCCEEDED を確認

まとめ

Terraform で作成したリソースの全体像。

terraform/
├── modules/
│   ├── dataform/
│   │   ├── dataform-executor SA + IAM
│   │   ├── Secret Manager(GitHub PAT 参照 + IAM)
│   │   └── Dataform リポジトリ + リリース設定 + ワークフロー設定
│   │
│   ├── dts_gcs/
│   │   ├── SA の GCS IAM(objectViewer)
│   │   └── DTS 転送設定(for_each)
│   │
│   ├── dts_s3/
│   │   ├── Secret Manager data source(AWS 認証情報)
│   │   └── DTS 転送設定(for_each)
│   │
│   ├── spreadsheet/
│   │   └── 外部テーブル + スケジュールドクエリ
│   │
│   └── workflows/
│       ├── workflow-executor SA + IAM
│       ├── scheduler-executor SA + IAM
│       ├── Workflows ワークフロー(etl-pipeline)
│       └── Cloud Scheduler ジョブ(trigger-etl-pipeline)
│
└── environments/
    └── dev/
        ├── locals(project_id, region 等)
        ├── API 有効化(5 API)
        ├── DTS 転送グループ定義(locals + YAML テーブルリスト)
        └── module 呼び出し(5 module)

検証で得た知見:

  • DTS には組み込みコネクタがあり LRO 自動待機で転送完了を待てる。Dataform にはコネクタがなく REST API + ポーリングが必要
  • parallel 内の未ハンドル例外は他の並列分岐をキャンセルする。各分岐内に try/except を配置して対策する
  • templatefile で Workflows YAML を読み込む場合、Workflows の式 ${...}$${...} にエスケープする
  • bigquery.transfers.update を含むロールは roles/bigquery.admin のみ。DTS API 呼び出し用の最小ロールは存在しない
  • Dataform の WorkflowInvocation 作成には act-as チェックがあり、roles/iam.serviceAccountUser が必要
  • DTS の schedule は Cloud Scheduler で一元管理する場合は不要(Terraform から削除)
  • Workflows の月間ステップ数は無料枠(5,000内部ステップ)に収まる

Author

rito

rito

  • Backend Engineer
  • Tokyo, Japan
  • PHP 5 技術者認定上級試験 認定者
  • 統計検定 3 級