幅広い知識と幅広いスキルを求められている系システムエンジニアです。リモートワークしかやりません。

BigQuery Storage APIを使ってGoで書き込むメモ

最初に

BigQuery Storage APIはBigQueryに即座にデータを取り込む方法として、新しいプロジェクトではStreamingよりもこっちを使えと推奨されている方法ですが、 2022/09時点であまりにもドキュメントが乏しいため とりあえずデータを書き込むところまでのメモを残しておきます。

毎月2TBの書き込みが無料枠なので、是非とも使っていきたい所存です。

注意書き

  • 使用している cloud.google.com/go/bigquery/storage/managedwriter は EXPERIMENTAL なライブラリです

ディレクトリ構成

hogehoge
|-- go.mod
|-- go.sum
|-- main.go
|-- mypb
|   `-- hoge.pb.go   # <- protocコマンドで生成される
`-- proto
    `-- hoge.proto   # <- proto2定義ファイル

流れ

1. Protocol Buffersの環境をインストール

BigQuery Storage APIは Protocol Buffers が必要なのでインストール

brew install protobuf
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest

2. proto/hoge.proto を書く

syntax = "proto2";
option go_package = "github.com/miiton/hogehoge/mypb";
package mypb;

message ExampleMsg {
    required string msg = 1;
}

3. protocコマンドを叩く

protoc --proto_path=proto --go_out=mypb --go_opt=paths=source_relative proto/hoge.proto

hoge.pb.go ができる

4. main.goの中身

managedwriter package - cloud.google.com/go/bigquery/storage/managedwriter - Go Packages にあるサンプルをコピペして少し書き換えています。

package main

import (
	"context"
	"log"
	"os"

	"cloud.google.com/go/bigquery/storage/managedwriter"
	"github.com/miiton/hogehoge/mypb"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
)

func init() {
	log.SetFlags(log.LstdFlags | log.Lshortfile)
}

func main() {
	ctx := context.Background()
	c, err := managedwriter.NewClient(ctx, "PROJECT_ID")  // 書き換えポイント
	if err != nil {
		log.Println(err)
		os.Exit(1)
	}
	defer c.Close()

	m := &mypb.ExampleMsg{}
	descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
	managedStream, err := c.NewManagedStream(ctx,
		managedwriter.WithDestinationTable("projects/PROJECT_ID/datasets/DATASET_ID/tables/TABLE_ID"),  // 書き換えポイント
		managedwriter.WithSchemaDescriptor(descriptorProto))
	if err != nil {
		log.Println(err)
		os.Exit(1)
	}

	// Define a couple of messages.
	mesgs := []*mypb.ExampleMsg{
		{
			Msg: proto.String("hogehogepien"),  // 書き換えポイント
		},
	}

	// Encode the messages into binary format.
	encoded := make([][]byte, len(mesgs))
	for k, v := range mesgs {
		b, err := proto.Marshal(v)
		if err != nil {
			log.Println(err)
			os.Exit(1)
		}
		encoded[k] = b
	}

	// Send the rows to the service, and specify an offset for managing deduplication.
	result, err := managedStream.AppendRows(ctx, encoded)
	if err != nil {
		log.Println(err)
		os.Exit(1)
	}

	// Block until the write is complete and return the result.
	_, err = result.GetResult(ctx) // 書き換えポイント
	if err != nil {
		log.Println(err)
		os.Exit(1)
	}
}

5. 実行

go run main.go

これでBigQueryの指定テーブルに書き込まれます。

参考リンク

© 2023 @miiton