Ritolabo
  1. Home
  2. GoogleCloud
  3. BigQuery
  4. BigQuery Data Transfer Service で Cloud Storage の CSV を BigQuery に取り込む

BigQuery Data Transfer Service で Cloud Storage の CSV を BigQuery に取り込む

  • 公開日
  • カテゴリBigQuery
  • タグBigQuery,DataTransferService,Terraform
BigQuery Data Transfer Service で Cloud Storage の CSV を BigQuery に取り込む

BigQuery Data Transfer Service(DTS)の Cloud Storage コネクタを使うと、GCS 上のファイルを BigQuery にスケジュール取り込みできる。GCS コネクタは無料コネクタのため、転送オーケストレーション費用はかからない。

本記事では、この構成を検証し Terraform で構築する。

contents

  1. 構成
    1. Write preference
    2. SA のアーキテクチャ
  2. 前提
  3. サンプルデータ
    1. 宛先テーブルの作成
  4. 転送設定の作成(UI)
    1. ソースのタイプ
    2. 転送構成名
    3. スケジュール オプション
    4. 転送先の設定
    5. データソースの詳細
    6. Transfer Options - All Formats
    7. Transfer Options - CSV
    8. サービスアカウント
  5. パラメータ化されたパス
  6. バックフィル(過去日付の再実行)
    1. 注意点
  7. エラー時の挙動
  8. 転送設定の編集制限
  9. ディレクトリ構成
  10. Terraform
    1. module の呼び出し
    2. dts_gcs/variables.tf
    3. dts_gcs/bucket.tf
    4. dts_gcs/transfer.tf
  11. 適用
    1. 動作確認

構成

GCS バケット(CSV ファイル)
  │
  │ DTS Cloud Storage コネクタ(無料)
  ↓
BigQuery ネイティブテーブル(dl_gcs.stores)

DTS は宛先テーブルにデータを書き込む。宛先テーブルは事前に作成しておく必要がある(自動作成はされない)。

Write preference

モード動作ユースケース
MIRROR全件洗い替え固定ファイルの全量同期(マスタデータなど)
APPEND毎回全行を追記パラメータ化されたパスと組み合わせて日付別ファイルを蓄積

APPEND は差分検知をしない。固定ファイル + APPEND だと実行のたびに同じデータが重複する。APPEND で増分取り込みを実現するには {run_date} 等のパラメータ化されたパスと組み合わせる。

SA のアーキテクチャ

DTS の転送は2つの SA が連携して動く。

GCS バケット
  │
  │ dataform-executor(roles/storage.objectViewer)
  │ → GCS からファイルを読む
  ↓
DTS サービスエージェント
  (service-{project_number}@gcp-sa-bigquerydatatransfer.iam.gserviceaccount.com)
  │
  │ BigQuery データ編集者(自動付与)
  │ → BigQuery にデータを書き込む
  ↓
dl_gcs.stores
SA役割権限管理方法
dataform-executorGCS からの読み取りroles/storage.objectViewer(バケット単位)Terraform
DTS サービスエージェントBigQuery への書き込みBigQuery データ編集者(データセット単位)Google 管理(自動付与)

前提

  • GCP プロジェクトが作成済み
  • Terraform がインストール済み
  • gcloud で認証済み
  • Dataform 環境を Terraform で構築する の構成が適用済み(サービスアカウント dataform-executor が存在する状態)
  • BigQuery Data Transfer API が有効化済み(前回の記事 で有効化済み)
  • GCS バケットが作成済み
  • dl_gcs データセットと宛先テーブルが作成済み

サンプルデータ

店舗マスタの CSV を使用する。日付別ファイルで日次取り込みするケースを想定。

store_id,store_name,prefecture,opened_at
1,東京本店,東京都,2020-04-01
2,大阪支店,大阪府,2021-01-15
3,福岡支店,福岡県,2022-06-01

宛先テーブルの作成

DTS は宛先テーブルを自動作成しないため、事前に作成する。

CREATE TABLE `<YOUR-PROJECT-ID>.dl_gcs.stores` (
  store_id INT64,
  store_name STRING,
  prefecture STRING,
  opened_at DATE
);

転送設定の作成(UI)

BigQuery コンソール → データ転送 → 「転送を作成」

ソースのタイプ

項目設定
ソースGoogle Cloud Storage

転送構成名

項目設定
表示名load_stores

スケジュール オプション

項目設定
繰り返しの頻度24 時間ごと
開始すぐに開始

スケジュール時刻は UTC で入力する。JST 09:00 に実行したい場合は UTC 00:00 と設定する。UI の「開始日と実行時間」は JST で表示されるが、繰り返しの時刻設定は UTC。

転送先の設定

項目設定
データセットdl_gcs
宛先テーブルのタイプNative table

データソースの詳細

項目設定
Destination tablestores
Cloud Storage URI<BUCKET-NAME>/stores_{run_date}.csv
Write preferenceAPPEND
Delete source files after transferOFF
File formatCSV

Transfer Options - All Formats

項目設定説明
Number of errors allowed01行でもエラーがあれば転送失敗
Decimal target types(空)デフォルトのまま
Ignore unknown valuesOFFスキーマにないカラムを無視しない

Transfer Options - CSV

項目設定説明
Field delimiter,カンマ区切り
Quote character(空)デフォルト("
Header rows to skip1ヘッダー行をスキップ(デフォルト0から要変更)
Allow quoted newlinesONフィールド内改行を許可
Allow jagged rowsOFFカラム数不足の行を許可しない
Preserve ASCII control charactersOFF制御文字を保持しない
EncodingUTF8

サービスアカウント

項目設定
Service accountdataform-executor@<YOUR-PROJECT-ID>.iam.gserviceaccount.com

SA には GCS バケットへの roles/storage.objectViewer が必要。権限がないと storage.objects.get エラーになる。

パラメータ化されたパス

日付別ファイルの取り込みにはランタイムパラメータを使う。

パラメータ説明例(2/28 実行時)
{run_date}実行日(%Y%m%d20260228
{run_time|"%Y%m%d"}実行時刻をフォーマット指定20260228
{run_time-24h|"%Y%m%d"}1日前の日付20260227
{run_time+9h|"%Y%m%d"}UTC+9(JST 変換)20260228

Cloud Storage URI に stores_{run_date}.csv と指定すると、実行日に対応するファイルが自動的に読み込まれる。

参考: Cloud Storage 転送のランタイム パラメータ

バックフィル(過去日付の再実行)

転送が失敗した場合のリカバリとして、DTS にはバックフィル機能がある。

BigQuery コンソール → データ転送 → 対象の転送設定 → 「スケジュール バックフィルを実行」

開始日・終了日で日付範囲を指定すると、その範囲内のスケジュール時刻に対応する日付分が実行される。

例: 月曜出社で土日月の3日分をリカバリ
  1. バックフィル(開始: 土 0:00 / 終了: 月 当日時刻)→ 土・日分を実行
  2. 「1 回限りの転送を行う」→ 当日(月曜)分を実行

注意点

  • 当日分: バックフィルでは実行不可の場合がある。「1 回限りの転送を行う」で対応
  • 終了日: 対象日のスケジュール時刻を含む範囲にする(例: 2/26〜2/27 の2日分なら終了日は 2/28 0:00
  • データ重複: APPEND モードでも処理済みデータは重複しない(再実行時にスキップされる)

エラー時の挙動

ケース結果
ファイルが存在しない成功扱い。データ変更なし。MIRROR でもテーブルは上書きされない
CSV のカラム数がスキーマと不一致エラーCSV processing encountered too many errors

ファイル不在が成功になるため、実運用ではレコード数0件を検知する仕組みが必要。

転送設定の編集制限

作成後に編集不可になる項目がある。

項目作成後の編集
表示名可能
スケジュール可能
Write preference可能
Destination table不可
Cloud Storage URI不可

URI や宛先テーブルを変更する場合は、新規作成 → 旧設定を削除。Terraform 管理であれば destroy & recreate になるため、この制約は問題にならない。

ディレクトリ構成

前回の記事で構築した Terraform に dts_gcs/ module を追加する。

terraform/
├── main.tf              # module "dts_gcs" 追加
├── dataform/            # 既存
├── spreadsheet/         # 既存
└── dts_gcs/             # 新規
    ├── variables.tf
    ├── bucket.tf        # SA の GCS 読み取り権限
    └── transfer.tf      # DTS 転送設定

データセット・テーブル・バケットは既に運用中の前提のため Terraform で管理しない。DTS 転送設定と SA の IAM のみ管理する。

Terraform

module の呼び出し

ルートの main.tf に module 呼び出しを追加する。

# main.tf に追記
module "dts_gcs" {
  source                  = "./dts_gcs"
  project_id              = var.project_id
  region                  = var.region
  dataform_executor_email = module.dataform.dataform_executor_email
  bucket_name             = "${var.project_id}-dts-source"
}

dts_gcs/variables.tf

variable "project_id" {
  description = "GCP プロジェクト ID"
  type        = string
}

variable "region" {
  description = "デフォルトリージョン"
  type        = string
}

variable "dataform_executor_email" {
  description = "Dataform 実行用サービスアカウントのメールアドレス"
  type        = string
}

variable "bucket_name" {
  description = "DTS 転送元の GCS バケット名"
  type        = string
}

dts_gcs/bucket.tf

# SA に GCS バケットへの読み取り権限を付与
resource "google_storage_bucket_iam_member" "dts_source_viewer" {
  bucket = var.bucket_name
  role   = "roles/storage.objectViewer"
  member = "serviceAccount:${var.dataform_executor_email}"
}

dts_gcs/transfer.tf

# DTS 転送設定: GCS → BigQuery
resource "google_bigquery_data_transfer_config" "load_stores" {
  display_name           = "load_stores"
  location               = var.region
  data_source_id         = "google_cloud_storage"
  destination_dataset_id = "dl_gcs"
  disabled               = false

  # UTC 00:00 = JST 09:00
  schedule = "every 24 hours"

  params = {
    data_path_template              = "gs://${var.bucket_name}/stores_{run_date}.csv"
    destination_table_name_template = "stores"
    file_format                     = "CSV"
    write_disposition               = "APPEND"
    skip_leading_rows               = "1"
    field_delimiter                 = ","
    max_bad_records                 = "0"
    allow_quoted_newlines            = "true"
  }

  service_account_name = var.dataform_executor_email
}
項目説明
data_source_idgoogle_cloud_storage を指定(GCS コネクタ)
destination_dataset_id書き込み先データセット(既存)
disabledfalsetrue にするとバックフィルも含め一切実行不可になる
scheduleevery 24 hours で日次実行(UTC 00:00 開始)
data_path_templateGCS パス。{run_date} で日付パラメータを展開
write_dispositionAPPEND で増分追記
skip_leading_rowsCSV ヘッダー行のスキップ
allow_quoted_newlinesフィールド内改行の許可
service_account_nameGCS 読み取りに使用する SA

適用

cd terraform

terraform init    # dts_gcs module の初期化
terraform plan
terraform apply

Plan: 2 to add で以下のリソースが作成される。

リソース説明
SA の IAM(objectViewer)GCS バケットへの読み取り権限
DTS 転送設定(load_storesGCS → BigQuery の定期取り込み

動作確認

apply 後、バックフィルで過去日付の CSV が転送されることを確認する。

SELECT * FROM `<YOUR-PROJECT-ID>.dl_gcs.stores`;

まとめ

Terraform で作成したリソースの全体像。

terraform/
└── dts_gcs/(module)
    ├── bucket.tf
    │   └── SA の IAM(GCS objectViewer)
    │
    └── transfer.tf
        └── DTS 転送設定(GCS → BigQuery、APPEND、{run_date})

検証で得た知見:

  • 宛先テーブルは事前作成が必要(DTS は自動作成しない)
  • Cloud Storage URI と Destination table は作成後に変更不可
  • スケジュール時刻は UTC で入力(Terraform も同様)
  • disabled = true にするとバックフィルも含め一切実行不可
  • ファイル不在は成功扱い(レコード数0件の検知が必要)
  • APPEND モードでもバックフィル再実行時にデータは重複しない
  • GCS コネクタは無料。BigQuery ストレージ料金のみ発生

Author

rito

rito

  • Backend Engineer
  • Tokyo, Japan
  • PHP 5 技術者認定上級試験 認定者
  • 統計検定 3 級