BigQuery Storage APIを使ってGoで書き込むメモ
最初に
BigQuery Storage APIはBigQueryに即座にデータを取り込む方法として、新しいプロジェクトではStreamingよりもこっちを使えと推奨されている方法ですが、 2022/09時点であまりにもドキュメントが乏しいため とりあえずデータを書き込むところまでのメモを残しておきます。
毎月2TBの書き込みが無料枠なので、是非とも使っていきたい所存です。
注意書き
- 使用している
cloud.google.com/go/bigquery/storage/managedwriter
は EXPERIMENTAL なライブラリです
ディレクトリ構成
hogehoge
|-- go.mod
|-- go.sum
|-- main.go
|-- mypb
| `-- hoge.pb.go # <- protocコマンドで生成される
`-- proto
`-- hoge.proto # <- proto2定義ファイル
流れ
1. Protocol Buffersの環境をインストール
BigQuery Storage APIは Protocol Buffers が必要なのでインストール
brew install protobuf
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
2. proto/hoge.proto
を書く
syntax = "proto2";
option go_package = "github.com/miiton/hogehoge/mypb";
package mypb;
message ExampleMsg {
required string msg = 1;
}
3. protocコマンドを叩く
protoc --proto_path=proto --go_out=mypb --go_opt=paths=source_relative proto/hoge.proto
hoge.pb.go
ができる
4. main.goの中身
managedwriter package - cloud.google.com/go/bigquery/storage/managedwriter - Go Packages にあるサンプルをコピペして少し書き換えています。
package main
import (
"context"
"log"
"os"
"cloud.google.com/go/bigquery/storage/managedwriter"
"github.com/miiton/hogehoge/mypb"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
)
func init() {
log.SetFlags(log.LstdFlags | log.Lshortfile)
}
func main() {
ctx := context.Background()
c, err := managedwriter.NewClient(ctx, "PROJECT_ID") // 書き換えポイント
if err != nil {
log.Println(err)
os.Exit(1)
}
defer c.Close()
m := &mypb.ExampleMsg{}
descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
managedStream, err := c.NewManagedStream(ctx,
managedwriter.WithDestinationTable("projects/PROJECT_ID/datasets/DATASET_ID/tables/TABLE_ID"), // 書き換えポイント
managedwriter.WithSchemaDescriptor(descriptorProto))
if err != nil {
log.Println(err)
os.Exit(1)
}
// Define a couple of messages.
mesgs := []*mypb.ExampleMsg{
{
Msg: proto.String("hogehogepien"), // 書き換えポイント
},
}
// Encode the messages into binary format.
encoded := make([][]byte, len(mesgs))
for k, v := range mesgs {
b, err := proto.Marshal(v)
if err != nil {
log.Println(err)
os.Exit(1)
}
encoded[k] = b
}
// Send the rows to the service, and specify an offset for managing deduplication.
result, err := managedStream.AppendRows(ctx, encoded)
if err != nil {
log.Println(err)
os.Exit(1)
}
// Block until the write is complete and return the result.
_, err = result.GetResult(ctx) // 書き換えポイント
if err != nil {
log.Println(err)
os.Exit(1)
}
}
5. 実行
go run main.go
これでBigQueryの指定テーブルに書き込まれます。