AWSのS3から取得したCSVファイルのレコードを順番に取得する処理で、一部のレコードが取得できず途中で処理が終わってしまう事象が発生したのでメモ書きします。
事象が発生したのは下記コード。
package main
import (
"context"
"encoding/csv"
"fmt"
"io"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
)
type (
S3Client interface {
GetObject(ctx context.Context, key string) (*s3.GetObjectOutput, error)
}
s3Config struct {
AwsDefaultRegion string
EndPoint string
BucketName string
}
s3client struct {
clientCfg *s3Config
sdkCfg aws.Config
}
)
func main() {
cfg := s3Config{
AwsDefaultRegion: "ap-northeast-1",
EndPoint: "http://localhost:8080",
BucketName: "hogeBucket",
}
// S3クライアントの作成
c := NewS3Client(&cfg)
// ### S3のタイムアウト処理 ###
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(3)*time.Second)
defer cancel()
// S3からCSVファイルを取得
file, err := c.GetObject(ctx, "testDir/hoge.csv")
if err != nil {
fmt.Println("エラー")
}
// S3から取得したCSVファイルの読み込み
r := csv.NewReader(file.Body)
// ### csvファイルの読み込みが途中でおわる ###
for {
record, err := r.Read()
if err == io.EOF {
break
}
if err != nil {
fmt.Println("エラー")
}
fmt.Println(record)
}
}
// S3クライアントの作成する関数
func NewS3Client(cfg *s3Config) S3Client {
loadOptions := []func(*config.LoadOptions) error{config.WithRegion(cfg.AwsDefaultRegion)}
if cfg.EndPoint != "" {
endpoint := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
return aws.Endpoint{
URL: cfg.EndPoint,
}, nil
})
loadOptions = append(loadOptions, config.WithEndpointResolverWithOptions(endpoint))
}
sdkCfg, err := config.LoadDefaultConfig(context.TODO(), loadOptions...)
if err != nil {
panic(err)
}
return &s3client{cfg, sdkCfg}
}
// S3からファイルを取得する関数
func (c *s3client) GetObject(ctx context.Context, key string) (*s3.GetObjectOutput, error) {
api := s3.NewFromConfig(c.sdkCfg, func(options *s3.Options) {
options.UsePathStyle = true
})
return api.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(c.clientCfg.BucketName),
Key: aws.String(key),
})
}
原因
s3client.GetObject(ctx, ’testDir/hoge.csv')
に指定しているctx
の内容のcontext.WithTimeout()
は、s3の処理だけでなく、後続のCSVファイル読み込みにも影響を与えてしまっているのが原因のようでした。
下記コードのようにタイムアウト処理を削除したところ上手く動作するようになりました。
package main
import (
"context"
"encoding/csv"
"fmt"
"io"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
)
type (
S3Client interface {
GetObject(ctx context.Context, key string) (*s3.GetObjectOutput, error)
}
s3Config struct {
AwsDefaultRegion string
EndPoint string
BucketName string
}
s3client struct {
clientCfg *s3Config
sdkCfg aws.Config
}
)
func main() {
cfg := s3Config{
AwsDefaultRegion: "ap-northeast-1",
EndPoint: "http://localhost:8080",
BucketName: "hogeBucket",
}
// S3クライアントの作成
c := NewS3Client(&cfg)
// ### タイムアウト処理を削除 ###
ctx := context.Background()
// S3からCSVファイルを取得
file, err := c.GetObject(ctx, "testDir/hoge.csv")
if err != nil {
fmt.Println("エラー")
}
// S3から取得したCSVファイルの読み込み
r := csv.NewReader(file.Body)
// ### csvファイルの中身が全て読み込めるようになる ###
for {
record, err := r.Read()
if err == io.EOF {
break
}
if err != nil {
fmt.Println("エラー")
}
fmt.Println(record)
}
}
// S3クライアントの作成
func NewS3Client(cfg *s3Config) S3Client {
loadOptions := []func(*config.LoadOptions) error{config.WithRegion(cfg.AwsDefaultRegion)}
if cfg.EndPoint != "" {
endpoint := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
return aws.Endpoint{
URL: cfg.EndPoint,
}, nil
})
loadOptions = append(loadOptions, config.WithEndpointResolverWithOptions(endpoint))
}
sdkCfg, err := config.LoadDefaultConfig(context.TODO(), loadOptions...)
if err != nil {
panic(err)
}
return &s3client{cfg, sdkCfg}
}
// S3からファイルを取得する関数
func (c *s3client) GetObject(ctx context.Context, key string) (*s3.GetObjectOutput, error) {
api := s3.NewFromConfig(c.sdkCfg, func(options *s3.Options) {
options.UsePathStyle = true
})
return api.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(c.clientCfg.BucketName),
Key: aws.String(key),
})
}