BigQuery Data Transfer Service で Amazon S3 の CSV / Parquet を BigQuery に取り込む
- 公開日
- カテゴリ:BigQuery
- タグ:BigQuery,DataTransferService,Terraform

BigQuery Data Transfer Service(DTS)の Amazon S3 コネクタを使うと、S3 上のファイルを BigQuery にスケジュール取り込みできる。S3 コネクタは無料コネクタのため、データ取り込みのオーケストレーション料金はかからない(AWS 側の Egress 料金は月100GB 以下であれば無料)。
本記事では、前回の Cloud Storage コネクタ との差分を中心に、CSV と Parquet の両形式で S3 からの取り込みを検証し、Terraform で構築する。
contents
- 構成
- 前提
- AWS 側の準備
- サンプルデータ
- 転送設定の作成(CSV)
- 転送設定の作成(Parquet)
- CSV vs Parquet
- パラメータ化されたパス
- ディレクトリ構成
- Terraform
- 適用
構成
S3 バケット(CSV / Parquet ファイル)
│
│ DTS Amazon S3 コネクタ(無料)
│ (AWS アクセスキーで認証)
↓
BigQuery ネイティブテーブル(dl_s3.*)
GCS コネクタとの違い
| 項目 | GCS コネクタ | S3 コネクタ |
|---|---|---|
| 認証 | SA の GCS IAM | AWS アクセスキー(access_key_id + secret_access_key) |
| URI 形式 | gs://bucket/path(UI ではバケット名から入力) | s3://bucket/path(s3:// を含めて入力) |
| Write Disposition の UI 表記 | Write preference(MIRROR / APPEND) | Write Disposition(WRITE_TRUNCATE / WRITE_APPEND) |
Terraform data_source_id | google_cloud_storage | amazon_s3 |
前提
- GCP プロジェクトが作成済み
- Terraform がインストール済み
gcloudで認証済み- 前回記事の構成が適用済み(SA
dataform-executorが存在する状態) - BigQuery Data Transfer API が有効化済み
- AWS アカウントを持っている
AWS 側の準備
S3 バケット作成
| 項目 | 設定 |
|---|---|
| バケット名 | 任意(例: sample-etl-dts-source-202603) |
| リージョン | 任意(例: ap-northeast-1) |
| パブリックアクセス | 全てブロック |
IAM ユーザー作成
DTS が S3 にアクセスするための専用 IAM ユーザーを作成する。
| 項目 | 設定 |
|---|---|
| ユーザー名 | 任意(例: bigquery-dts-reader) |
| 許可のオプション | ポリシーを直接アタッチする |
IAM ポリシー
最小権限のカスタムポリシーを作成して IAM ユーザーにアタッチする。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::<YOUR-BUCKET-NAME>",
"arn:aws:s3:::<YOUR-BUCKET-NAME>/*"
]
}
]
}
アクセスキーの作成
IAM ユーザーのアクセスキーを作成する。ユースケースは「サードパーティーサービス」を選択する。
access_key_idとsecret_access_keyが発行される
DTS S3 コネクタは AWS アクセスキー方式のみ対応しており、IAM ロールや Workload Identity Federation には対応していない。
認証情報の管理
AWS のアクセスキーは GCP Secret Manager に登録し、Terraform から参照する。
| シークレット名 | 値 |
|---|---|
aws-dts-access-key-id | アクセスキー ID |
aws-dts-secret-access-key | シークレットアクセスキー |
サンプルデータ
GCS コネクタの記事と同じ店舗マスタを、CSV と Parquet の2形式で用意する。
CSV
store_id,store_name,prefecture,opened_at
1,東京本店,東京都,2020-04-01
2,大阪支店,大阪府,2021-01-15
3,福岡支店,福岡県,2022-06-01
Parquet
CSV と同じデータを Parquet 形式で作成する。Parquet はバイナリ形式のため cat や less では読めず、pyarrow 等のツールで中身を確認する。
import datetime
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
df = pd.DataFrame({
'store_id': [1, 2, 3],
'store_name': ['東京本店', '大阪支店', '福岡支店'],
'prefecture': ['東京都', '大阪府', '福岡県'],
'opened_at': [datetime.date(2020, 4, 1), datetime.date(2021, 1, 15), datetime.date(2022, 6, 1)],
})
schema = pa.schema([
('store_id', pa.int64()),
('store_name', pa.string()),
('prefecture', pa.string()),
('opened_at', pa.date32()),
])
table = pa.Table.from_pandas(df, schema=schema)
# Snappy 圧縮(デフォルト)で出力
pq.write_table(table, 'stores.parquet', compression='snappy')
Parquet の特徴:
- スキーマ(型情報、カラム名)がファイルに埋め込まれている
- 列単位で圧縮される(多くのツールがデフォルトで Snappy 圧縮を適用)
- 拡張子は
.parquet(圧縮されていても変わらない)
S3 への配置
s3://<YOUR-BUCKET-NAME>/csv/stores.csv
s3://<YOUR-BUCKET-NAME>/parquet/stores.parquet
宛先テーブルの作成
DTS は宛先テーブルを自動作成しない。存在しない状態で転送設定を保存しようとすると Cannot find the destination table provided in request エラーになる。
CREATE TABLE `<YOUR-PROJECT-ID>.dl_s3.stores_csv` (
store_id INT64,
store_name STRING,
prefecture STRING,
opened_at DATE
);
CREATE TABLE `<YOUR-PROJECT-ID>.dl_s3.stores_parquet` (
store_id INT64,
store_name STRING,
prefecture STRING,
opened_at DATE
);
転送設定の作成(CSV)
BigQuery コンソール → データ転送 → 「転送を作成」
ソースのタイプ
| 項目 | 設定 |
|---|---|
| ソース | Amazon S3 |
転送構成名
| 項目 | 設定 |
|---|---|
| 表示名 | load_stores_csv |
スケジュール オプション
| 項目 | 設定 |
|---|---|
| 繰り返しの頻度 | 24 時間ごと |
| 開始 | すぐに開始 |
転送先の設定
| 項目 | 設定 |
|---|---|
| データセット | dl_s3 |
データソースの詳細
| 項目 | 設定 |
|---|---|
| Destination table | stores_csv |
| Amazon S3 URI | s3://<YOUR-BUCKET-NAME>/csv/stores.csv |
| Access key ID | (IAM ユーザーのアクセスキー ID) |
| Secret access key | (IAM ユーザーのシークレットアクセスキー) |
| Write Disposition | WRITE_TRUNCATE |
| File format | CSV |
Transfer Options
| セクション | 項目 | 設定 | 説明 |
|---|---|---|---|
| All Formats | Number of errors allowed | 0 | 許容するエラー行数。0 = 1行でもエラーがあれば転送失敗 |
| All Formats | Decimal target types | (空) | DECIMAL/NUMERIC 型のマッピング先 |
| JSON, CSV | Ignore unknown values | OFF | スキーマにないカラムの値を無視するか |
| CSV | Field delimiter | , | フィールド区切り文字 |
| CSV | Header rows to skip | 1 | ヘッダー行のスキップ(デフォルト0から要変更) |
| CSV | Allow quoted newlines | OFF | ダブルクォート内の改行を許可するか |
| CSV | Allow jagged rows | OFF | カラム数不足の行を許可するか |
GCS コネクタにあった Quote character / Preserve ASCII control characters / Encoding の項目は S3 コネクタにはない。
サービスアカウント
| 項目 | 設定 |
|---|---|
| Service account | dataform-executor@<YOUR-PROJECT-ID>.iam.gserviceaccount.com |
転送実行・確認
SELECT * FROM `<YOUR-PROJECT-ID>.dl_s3.stores_csv`;
転送設定の作成(Parquet)
CSV と同じ手順で、Parquet 形式の転送設定を作成する。
データソースの詳細
| 項目 | 設定 |
|---|---|
| 表示名 | load_stores_parquet |
| Destination table | stores_parquet |
| Amazon S3 URI | s3://<YOUR-BUCKET-NAME>/parquet/stores.parquet |
| File format | PARQUET |
転送実行・確認
SELECT * FROM `<YOUR-PROJECT-ID>.dl_s3.stores_parquet`;
CSV vs Parquet
設定項目の違い
| 項目 | CSV | Parquet |
|---|---|---|
| Field delimiter | 必要 | 不要 |
| Header rows to skip | 必要 | 不要 |
| Allow quoted newlines | 設定可能 | 不要 |
| Allow jagged rows | 設定可能 | 不要 |
Parquet はスキーマがファイルに埋め込まれているため、CSV に比べて設定項目が少ない。
スキーマの扱い
CSV と Parquet でスキーマの決定方法が異なる。
| 形式 | スキーマの決定 |
|---|---|
| CSV | 宛先テーブルのスキーマに従う(テーブル定義が正) |
| Parquet | Parquet ファイル内のスキーマが宛先テーブルを上書きする |
Parquet + WRITE_TRUNCATE の場合、転送のたびにテーブルが再作成され、Parquet のスキーマで上書きされる。宛先テーブルで opened_at DATE と定義していても、Parquet 内の型が優先される。
型マッピング例:
| Parquet | BigQuery |
|---|---|
int64 | INTEGER |
string | STRING |
date32 | DATE |
timestamp[ns] | TIMESTAMP |
Parquet を使う場合、ファイル生成時に正しい型で出力することが重要。
ファイル形式の比較
| 項目 | CSV | Parquet |
|---|---|---|
| 形式 | テキスト | バイナリ |
| スキーマ | なし(外部で定義) | ファイルに埋め込み |
| 圧縮 | なし | 列単位で圧縮(Snappy 等) |
| ファイルサイズ | 大きい | 小さい |
| 人間の可読性 | あり | なし |
パラメータ化されたパス
GCS コネクタと同様に、S3 コネクタでも {run_date} 等のランタイムパラメータが使える。
s3://<YOUR-BUCKET-NAME>/parquet/stores_{run_date}.parquet
実行日が 2026-03-09 の場合、stores_20260309.parquet が読み込まれる。
ディレクトリ構成
前回の記事で構築した Terraform に dts_s3/ module を追加する。
terraform/
├── main.tf # module "dts_s3" 追加
├── dataform/ # 既存
├── spreadsheet/ # 既存
├── dts_gcs/ # 既存
└── dts_s3/ # 新規
├── variables.tf
├── secrets.tf # Secret Manager から AWS 認証情報を取得
├── transfer_csv.tf # DTS 転送設定(CSV)
└── transfer_parquet.tf # DTS 転送設定(Parquet)
GCS コネクタとの違いとして、S3 コネクタでは GCS バケットの IAM 設定が不要。代わりに Secret Manager から AWS のアクセスキーを取得し params に渡す。
Terraform
module の呼び出し
# main.tf に追記
module "dts_s3" {
source = "./dts_s3"
project_id = var.project_id
region = var.region
dataform_executor_email = module.dataform.dataform_executor_email
s3_bucket_name = "<YOUR-S3-BUCKET-NAME>"
}
dts_s3/variables.tf
variable "project_id" {
description = "GCP プロジェクト ID"
type = string
}
variable "region" {
description = "デフォルトリージョン"
type = string
}
variable "dataform_executor_email" {
description = "Dataform 実行用サービスアカウントのメールアドレス"
type = string
}
variable "s3_bucket_name" {
description = "転送元の S3 バケット名"
type = string
}
dts_s3/secrets.tf
Secret Manager から AWS 認証情報を取得する。
data "google_secret_manager_secret_version" "aws_access_key_id" {
secret = "aws-dts-access-key-id"
}
data "google_secret_manager_secret_version" "aws_secret_access_key" {
secret = "aws-dts-secret-access-key"
}
dts_s3/transfer_csv.tf
resource "google_bigquery_data_transfer_config" "load_stores_csv" {
display_name = "load_stores_csv"
location = var.region
data_source_id = "amazon_s3"
destination_dataset_id = "dl_s3"
disabled = false
schedule = "every 24 hours"
params = {
data_path = "s3://${var.s3_bucket_name}/csv/stores.csv"
destination_table_name_template = "stores_csv"
file_format = "CSV"
write_disposition = "WRITE_TRUNCATE"
field_delimiter = ","
skip_leading_rows = "1"
max_bad_records = "0"
access_key_id = data.google_secret_manager_secret_version.aws_access_key_id.secret_data
secret_access_key = data.google_secret_manager_secret_version.aws_secret_access_key.secret_data
}
service_account_name = var.dataform_executor_email
}
dts_s3/transfer_parquet.tf
resource "google_bigquery_data_transfer_config" "load_stores_parquet" {
display_name = "load_stores_parquet"
location = var.region
data_source_id = "amazon_s3"
destination_dataset_id = "dl_s3"
disabled = false
schedule = "every 24 hours"
params = {
data_path = "s3://${var.s3_bucket_name}/parquet/stores_{run_date}.parquet"
destination_table_name_template = "stores_parquet"
file_format = "PARQUET"
write_disposition = "WRITE_TRUNCATE"
access_key_id = data.google_secret_manager_secret_version.aws_access_key_id.secret_data
secret_access_key = data.google_secret_manager_secret_version.aws_secret_access_key.secret_data
}
service_account_name = var.dataform_executor_email
}
GCS コネクタとの params の違い:
| 項目 | GCS | S3 |
|---|---|---|
| パスのキー | data_path_template | data_path |
| 認証 | 不要(SA の IAM で制御) | access_key_id + secret_access_key |
write_disposition の値 | APPEND / MIRROR | WRITE_APPEND / WRITE_TRUNCATE |
適用
cd terraform
terraform init # dts_s3 module の初期化
terraform plan
terraform apply
Plan: 2 to add で以下のリソースが作成される。
| リソース | 説明 |
|---|---|
DTS 転送設定(load_stores_csv) | S3 CSV → BigQuery |
DTS 転送設定(load_stores_parquet) | S3 Parquet → BigQuery |
動作確認
apply 後、手動実行(1回限りの転送)でデータが転送されることを確認する。
SELECT * FROM `<YOUR-PROJECT-ID>.dl_s3.stores_csv`;
SELECT * FROM `<YOUR-PROJECT-ID>.dl_s3.stores_parquet`;
まとめ
Terraform で作成したリソースの全体像。
terraform/
└── dts_s3/(module)
├── secrets.tf
│ └── Secret Manager data source(AWS 認証情報の取得)
│
├── transfer_csv.tf
│ └── DTS 転送設定(S3 CSV → BigQuery、WRITE_TRUNCATE)
│
└── transfer_parquet.tf
└── DTS 転送設定(S3 Parquet → BigQuery、WRITE_TRUNCATE、{run_date})
検証で得た知見:
- S3 コネクタの認証は AWS アクセスキー方式のみ(Workload Identity Federation 非対応)
- CSV のスキーマは宛先テーブル定義に従うが、Parquet のスキーマはファイル内の型が宛先テーブルを上書きする
- Parquet を使う場合はファイル生成時に正しい型で出力することが重要
- S3 コネクタでも
{run_date}によるパラメータ化パスが使える - S3 コネクタは無料コネクタ。AWS Egress 料金は月100GB 以下なら無料

