Ritolabo
  1. Home
  2. GoogleCloud
  3. BigQuery
  4. BigQuery Data Transfer Service で Amazon S3 の CSV / Parquet を BigQuery に取り込む

BigQuery Data Transfer Service で Amazon S3 の CSV / Parquet を BigQuery に取り込む

  • 公開日
  • カテゴリBigQuery
  • タグBigQuery,DataTransferService,Terraform
BigQuery Data Transfer Service で Amazon S3 の CSV / Parquet を BigQuery に取り込む

BigQuery Data Transfer Service(DTS)の Amazon S3 コネクタを使うと、S3 上のファイルを BigQuery にスケジュール取り込みできる。S3 コネクタは無料コネクタのため、データ取り込みのオーケストレーション料金はかからない(AWS 側の Egress 料金は月100GB 以下であれば無料)。

本記事では、前回の Cloud Storage コネクタ との差分を中心に、CSV と Parquet の両形式で S3 からの取り込みを検証し、Terraform で構築する。

contents

  1. 構成
    1. GCS コネクタとの違い
  2. 前提
  3. AWS 側の準備
    1. S3 バケット作成
    2. IAM ユーザー作成
    3. IAM ポリシー
    4. アクセスキーの作成
  4. サンプルデータ
    1. CSV
    2. Parquet
    3. S3 への配置
    4. 宛先テーブルの作成
  5. 転送設定の作成(CSV)
    1. ソースのタイプ
    2. 転送構成名
    3. スケジュール オプション
    4. 転送先の設定
    5. データソースの詳細
    6. Transfer Options
    7. サービスアカウント
    8. 転送実行・確認
  6. 転送設定の作成(Parquet)
    1. データソースの詳細
    2. 転送実行・確認
  7. CSV vs Parquet
    1. 設定項目の違い
    2. スキーマの扱い
    3. ファイル形式の比較
  8. パラメータ化されたパス
  9. ディレクトリ構成
  10. Terraform
    1. module の呼び出し
    2. dts_s3/variables.tf
    3. dts_s3/secrets.tf
    4. dts_s3/transfer_csv.tf
    5. dts_s3/transfer_parquet.tf
  11. 適用
    1. 動作確認

構成

S3 バケット(CSV / Parquet ファイル)
  │
  │ DTS Amazon S3 コネクタ(無料)
  │ (AWS アクセスキーで認証)
  ↓
BigQuery ネイティブテーブル(dl_s3.*)

GCS コネクタとの違い

項目GCS コネクタS3 コネクタ
認証SA の GCS IAMAWS アクセスキー(access_key_id + secret_access_key)
URI 形式gs://bucket/path(UI ではバケット名から入力)s3://bucket/paths3:// を含めて入力)
Write Disposition の UI 表記Write preference(MIRROR / APPEND)Write Disposition(WRITE_TRUNCATE / WRITE_APPEND)
Terraform data_source_idgoogle_cloud_storageamazon_s3

前提

  • GCP プロジェクトが作成済み
  • Terraform がインストール済み
  • gcloud で認証済み
  • 前回記事の構成が適用済み(SA dataform-executor が存在する状態)
  • BigQuery Data Transfer API が有効化済み
  • AWS アカウントを持っている

AWS 側の準備

S3 バケット作成

項目設定
バケット名任意(例: sample-etl-dts-source-202603
リージョン任意(例: ap-northeast-1
パブリックアクセス全てブロック

IAM ユーザー作成

DTS が S3 にアクセスするための専用 IAM ユーザーを作成する。

項目設定
ユーザー名任意(例: bigquery-dts-reader
許可のオプションポリシーを直接アタッチする

IAM ポリシー

最小権限のカスタムポリシーを作成して IAM ユーザーにアタッチする。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::<YOUR-BUCKET-NAME>",
                "arn:aws:s3:::<YOUR-BUCKET-NAME>/*"
            ]
        }
    ]
}

アクセスキーの作成

IAM ユーザーのアクセスキーを作成する。ユースケースは「サードパーティーサービス」を選択する。

  • access_key_idsecret_access_key が発行される

DTS S3 コネクタは AWS アクセスキー方式のみ対応しており、IAM ロールや Workload Identity Federation には対応していない。

認証情報の管理

AWS のアクセスキーは GCP Secret Manager に登録し、Terraform から参照する。

シークレット名
aws-dts-access-key-idアクセスキー ID
aws-dts-secret-access-keyシークレットアクセスキー

サンプルデータ

GCS コネクタの記事と同じ店舗マスタを、CSV と Parquet の2形式で用意する。

CSV

store_id,store_name,prefecture,opened_at
1,東京本店,東京都,2020-04-01
2,大阪支店,大阪府,2021-01-15
3,福岡支店,福岡県,2022-06-01

Parquet

CSV と同じデータを Parquet 形式で作成する。Parquet はバイナリ形式のため catless では読めず、pyarrow 等のツールで中身を確認する。

import datetime

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

df = pd.DataFrame({
    'store_id': [1, 2, 3],
    'store_name': ['東京本店', '大阪支店', '福岡支店'],
    'prefecture': ['東京都', '大阪府', '福岡県'],
    'opened_at': [datetime.date(2020, 4, 1), datetime.date(2021, 1, 15), datetime.date(2022, 6, 1)],
})

schema = pa.schema([
    ('store_id', pa.int64()),
    ('store_name', pa.string()),
    ('prefecture', pa.string()),
    ('opened_at', pa.date32()),
])
table = pa.Table.from_pandas(df, schema=schema)

# Snappy 圧縮(デフォルト)で出力
pq.write_table(table, 'stores.parquet', compression='snappy')

Parquet の特徴:

  • スキーマ(型情報、カラム名)がファイルに埋め込まれている
  • 列単位で圧縮される(多くのツールがデフォルトで Snappy 圧縮を適用)
  • 拡張子は .parquet(圧縮されていても変わらない)

S3 への配置

s3://<YOUR-BUCKET-NAME>/csv/stores.csv
s3://<YOUR-BUCKET-NAME>/parquet/stores.parquet

宛先テーブルの作成

DTS は宛先テーブルを自動作成しない。存在しない状態で転送設定を保存しようとすると Cannot find the destination table provided in request エラーになる。

CREATE TABLE `<YOUR-PROJECT-ID>.dl_s3.stores_csv` (
  store_id INT64,
  store_name STRING,
  prefecture STRING,
  opened_at DATE
);

CREATE TABLE `<YOUR-PROJECT-ID>.dl_s3.stores_parquet` (
  store_id INT64,
  store_name STRING,
  prefecture STRING,
  opened_at DATE
);

転送設定の作成(CSV)

BigQuery コンソール → データ転送 → 「転送を作成」

ソースのタイプ

項目設定
ソースAmazon S3

転送構成名

項目設定
表示名load_stores_csv

スケジュール オプション

項目設定
繰り返しの頻度24 時間ごと
開始すぐに開始

転送先の設定

項目設定
データセットdl_s3

データソースの詳細

項目設定
Destination tablestores_csv
Amazon S3 URIs3://<YOUR-BUCKET-NAME>/csv/stores.csv
Access key ID(IAM ユーザーのアクセスキー ID)
Secret access key(IAM ユーザーのシークレットアクセスキー)
Write DispositionWRITE_TRUNCATE
File formatCSV

Transfer Options

セクション項目設定説明
All FormatsNumber of errors allowed0許容するエラー行数。0 = 1行でもエラーがあれば転送失敗
All FormatsDecimal target types(空)DECIMAL/NUMERIC 型のマッピング先
JSON, CSVIgnore unknown valuesOFFスキーマにないカラムの値を無視するか
CSVField delimiter,フィールド区切り文字
CSVHeader rows to skip1ヘッダー行のスキップ(デフォルト0から要変更)
CSVAllow quoted newlinesOFFダブルクォート内の改行を許可するか
CSVAllow jagged rowsOFFカラム数不足の行を許可するか

GCS コネクタにあった Quote character / Preserve ASCII control characters / Encoding の項目は S3 コネクタにはない。

サービスアカウント

項目設定
Service accountdataform-executor@<YOUR-PROJECT-ID>.iam.gserviceaccount.com

転送実行・確認

SELECT * FROM `<YOUR-PROJECT-ID>.dl_s3.stores_csv`;

転送設定の作成(Parquet)

CSV と同じ手順で、Parquet 形式の転送設定を作成する。

データソースの詳細

項目設定
表示名load_stores_parquet
Destination tablestores_parquet
Amazon S3 URIs3://<YOUR-BUCKET-NAME>/parquet/stores.parquet
File formatPARQUET

転送実行・確認

SELECT * FROM `<YOUR-PROJECT-ID>.dl_s3.stores_parquet`;

CSV vs Parquet

設定項目の違い

項目CSVParquet
Field delimiter必要不要
Header rows to skip必要不要
Allow quoted newlines設定可能不要
Allow jagged rows設定可能不要

Parquet はスキーマがファイルに埋め込まれているため、CSV に比べて設定項目が少ない。

スキーマの扱い

CSV と Parquet でスキーマの決定方法が異なる。

形式スキーマの決定
CSV宛先テーブルのスキーマに従う(テーブル定義が正)
ParquetParquet ファイル内のスキーマが宛先テーブルを上書きする

Parquet + WRITE_TRUNCATE の場合、転送のたびにテーブルが再作成され、Parquet のスキーマで上書きされる。宛先テーブルで opened_at DATE と定義していても、Parquet 内の型が優先される。

型マッピング例:

ParquetBigQuery
int64INTEGER
stringSTRING
date32DATE
timestamp[ns]TIMESTAMP

Parquet を使う場合、ファイル生成時に正しい型で出力することが重要。

ファイル形式の比較

項目CSVParquet
形式テキストバイナリ
スキーマなし(外部で定義)ファイルに埋め込み
圧縮なし列単位で圧縮(Snappy 等)
ファイルサイズ大きい小さい
人間の可読性ありなし

パラメータ化されたパス

GCS コネクタと同様に、S3 コネクタでも {run_date} 等のランタイムパラメータが使える。

s3://<YOUR-BUCKET-NAME>/parquet/stores_{run_date}.parquet

実行日が 2026-03-09 の場合、stores_20260309.parquet が読み込まれる。

ディレクトリ構成

前回の記事で構築した Terraform に dts_s3/ module を追加する。

terraform/
├── main.tf              # module "dts_s3" 追加
├── dataform/            # 既存
├── spreadsheet/         # 既存
├── dts_gcs/             # 既存
└── dts_s3/              # 新規
    ├── variables.tf
    ├── secrets.tf       # Secret Manager から AWS 認証情報を取得
    ├── transfer_csv.tf  # DTS 転送設定(CSV)
    └── transfer_parquet.tf  # DTS 転送設定(Parquet)

GCS コネクタとの違いとして、S3 コネクタでは GCS バケットの IAM 設定が不要。代わりに Secret Manager から AWS のアクセスキーを取得し params に渡す。

Terraform

module の呼び出し

# main.tf に追記
module "dts_s3" {
  source                  = "./dts_s3"
  project_id              = var.project_id
  region                  = var.region
  dataform_executor_email = module.dataform.dataform_executor_email
  s3_bucket_name          = "<YOUR-S3-BUCKET-NAME>"
}

dts_s3/variables.tf

variable "project_id" {
  description = "GCP プロジェクト ID"
  type        = string
}

variable "region" {
  description = "デフォルトリージョン"
  type        = string
}

variable "dataform_executor_email" {
  description = "Dataform 実行用サービスアカウントのメールアドレス"
  type        = string
}

variable "s3_bucket_name" {
  description = "転送元の S3 バケット名"
  type        = string
}

dts_s3/secrets.tf

Secret Manager から AWS 認証情報を取得する。

data "google_secret_manager_secret_version" "aws_access_key_id" {
  secret = "aws-dts-access-key-id"
}

data "google_secret_manager_secret_version" "aws_secret_access_key" {
  secret = "aws-dts-secret-access-key"
}

dts_s3/transfer_csv.tf

resource "google_bigquery_data_transfer_config" "load_stores_csv" {
  display_name           = "load_stores_csv"
  location               = var.region
  data_source_id         = "amazon_s3"
  destination_dataset_id = "dl_s3"
  disabled               = false

  schedule = "every 24 hours"

  params = {
    data_path                       = "s3://${var.s3_bucket_name}/csv/stores.csv"
    destination_table_name_template = "stores_csv"
    file_format                     = "CSV"
    write_disposition               = "WRITE_TRUNCATE"
    field_delimiter                 = ","
    skip_leading_rows               = "1"
    max_bad_records                 = "0"
    access_key_id                   = data.google_secret_manager_secret_version.aws_access_key_id.secret_data
    secret_access_key               = data.google_secret_manager_secret_version.aws_secret_access_key.secret_data
  }

  service_account_name = var.dataform_executor_email
}

dts_s3/transfer_parquet.tf

resource "google_bigquery_data_transfer_config" "load_stores_parquet" {
  display_name           = "load_stores_parquet"
  location               = var.region
  data_source_id         = "amazon_s3"
  destination_dataset_id = "dl_s3"
  disabled               = false

  schedule = "every 24 hours"

  params = {
    data_path                       = "s3://${var.s3_bucket_name}/parquet/stores_{run_date}.parquet"
    destination_table_name_template = "stores_parquet"
    file_format                     = "PARQUET"
    write_disposition               = "WRITE_TRUNCATE"
    access_key_id                   = data.google_secret_manager_secret_version.aws_access_key_id.secret_data
    secret_access_key               = data.google_secret_manager_secret_version.aws_secret_access_key.secret_data
  }

  service_account_name = var.dataform_executor_email
}

GCS コネクタとの params の違い:

項目GCSS3
パスのキーdata_path_templatedata_path
認証不要(SA の IAM で制御)access_key_id + secret_access_key
write_disposition の値APPEND / MIRRORWRITE_APPEND / WRITE_TRUNCATE

適用

cd terraform

terraform init    # dts_s3 module の初期化
terraform plan
terraform apply

Plan: 2 to add で以下のリソースが作成される。

リソース説明
DTS 転送設定(load_stores_csvS3 CSV → BigQuery
DTS 転送設定(load_stores_parquetS3 Parquet → BigQuery

動作確認

apply 後、手動実行(1回限りの転送)でデータが転送されることを確認する。

SELECT * FROM `<YOUR-PROJECT-ID>.dl_s3.stores_csv`;
SELECT * FROM `<YOUR-PROJECT-ID>.dl_s3.stores_parquet`;

まとめ

Terraform で作成したリソースの全体像。

terraform/
└── dts_s3/(module)
    ├── secrets.tf
    │   └── Secret Manager data source(AWS 認証情報の取得)
    │
    ├── transfer_csv.tf
    │   └── DTS 転送設定(S3 CSV → BigQuery、WRITE_TRUNCATE)
    │
    └── transfer_parquet.tf
        └── DTS 転送設定(S3 Parquet → BigQuery、WRITE_TRUNCATE、{run_date})

検証で得た知見:

  • S3 コネクタの認証は AWS アクセスキー方式のみ(Workload Identity Federation 非対応)
  • CSV のスキーマは宛先テーブル定義に従うが、Parquet のスキーマはファイル内の型が宛先テーブルを上書きする
  • Parquet を使う場合はファイル生成時に正しい型で出力することが重要
  • S3 コネクタでも {run_date} によるパラメータ化パスが使える
  • S3 コネクタは無料コネクタ。AWS Egress 料金は月100GB 以下なら無料

Author

rito

rito

  • Backend Engineer
  • Tokyo, Japan
  • PHP 5 技術者認定上級試験 認定者
  • 統計検定 3 級