2022.09.20  

【Go】S3から取得したCSVファイルが一部しか読み込めない

Go    

AWSのS3から取得したCSVファイルのレコードを順番に取得する処理で、一部のレコードが取得できず途中で処理が終わってしまう事象が発生したのでメモ書きします。

事象が発生したのは下記コード。

package main

import (
    "context"
    "encoding/csv"
    "fmt"
    "io"
    "time"

    "github.com/aws/aws-sdk-go-v2/aws"
    "github.com/aws/aws-sdk-go-v2/config"
    "github.com/aws/aws-sdk-go-v2/service/s3"
)

type (
    S3Client interface {
        GetObject(ctx context.Context, key string) (*s3.GetObjectOutput, error)
    }

    s3Config struct {
        AwsDefaultRegion string
        EndPoint         string
        BucketName       string
    }

    s3client struct {
        clientCfg *s3Config
        sdkCfg    aws.Config
    }
)

func main() {

    cfg := s3Config{
        AwsDefaultRegion: "ap-northeast-1",
        EndPoint:         "http://localhost:8080",
        BucketName:       "hogeBucket",
    }

    // S3クライアントの作成
    c := NewS3Client(&cfg)

    // ### S3のタイムアウト処理 ###
    ctx, cancel := context.WithTimeout(context.Background(), time.Duration(3)*time.Second)
    defer cancel()

    // S3からCSVファイルを取得
    file, err := c.GetObject(ctx, "testDir/hoge.csv")
    if err != nil {
        fmt.Println("エラー")
    }

    // S3から取得したCSVファイルの読み込み
    r := csv.NewReader(file.Body)

    // ### csvファイルの読み込みが途中でおわる ###
    for {
        record, err := r.Read()
        if err == io.EOF {
            break
        }
        if err != nil {
            fmt.Println("エラー")
        }
        fmt.Println(record)
    }
}

// S3クライアントの作成する関数
func NewS3Client(cfg *s3Config) S3Client {
    loadOptions := []func(*config.LoadOptions) error{config.WithRegion(cfg.AwsDefaultRegion)}

    if cfg.EndPoint != "" {
        endpoint := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
            return aws.Endpoint{
                URL: cfg.EndPoint,
            }, nil
        })
        loadOptions = append(loadOptions, config.WithEndpointResolverWithOptions(endpoint))
    }

    sdkCfg, err := config.LoadDefaultConfig(context.TODO(), loadOptions...)
    if err != nil {
        panic(err)
    }
    return &s3client{cfg, sdkCfg}
}

// S3からファイルを取得する関数
func (c *s3client) GetObject(ctx context.Context, key string) (*s3.GetObjectOutput, error) {
    api := s3.NewFromConfig(c.sdkCfg, func(options *s3.Options) {
        options.UsePathStyle = true
    })

    return api.GetObject(ctx, &s3.GetObjectInput{
        Bucket: aws.String(c.clientCfg.BucketName),
        Key:    aws.String(key),
    })
}

原因

s3client.GetObject(ctx, ’testDir/hoge.csv')に指定しているctxの内容のcontext.WithTimeout()は、s3の処理だけでなく、後続のCSVファイル読み込みにも影響を与えてしまっているのが原因のようでした。

下記コードのようにタイムアウト処理を削除したところ上手く動作するようになりました。

package main

import (
    "context"
    "encoding/csv"
    "fmt"
    "io"

    "github.com/aws/aws-sdk-go-v2/aws"
    "github.com/aws/aws-sdk-go-v2/config"
    "github.com/aws/aws-sdk-go-v2/service/s3"
)

type (
    S3Client interface {
        GetObject(ctx context.Context, key string) (*s3.GetObjectOutput, error)
    }

    s3Config struct {
        AwsDefaultRegion string
        EndPoint         string
        BucketName       string
    }

    s3client struct {
        clientCfg *s3Config
        sdkCfg    aws.Config
    }
)

func main() {

    cfg := s3Config{
        AwsDefaultRegion: "ap-northeast-1",
        EndPoint:         "http://localhost:8080",
        BucketName:       "hogeBucket",
    }

    // S3クライアントの作成
    c := NewS3Client(&cfg)

    // ### タイムアウト処理を削除 ###
    ctx := context.Background()

    // S3からCSVファイルを取得
    file, err := c.GetObject(ctx, "testDir/hoge.csv")
    if err != nil {
        fmt.Println("エラー")
    }

    // S3から取得したCSVファイルの読み込み
    r := csv.NewReader(file.Body)

    // ### csvファイルの中身が全て読み込めるようになる ###
    for {
        record, err := r.Read()
        if err == io.EOF {
            break
        }
        if err != nil {
            fmt.Println("エラー")
        }
        fmt.Println(record)
    }
}

// S3クライアントの作成
func NewS3Client(cfg *s3Config) S3Client {
    loadOptions := []func(*config.LoadOptions) error{config.WithRegion(cfg.AwsDefaultRegion)}

    if cfg.EndPoint != "" {
        endpoint := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
            return aws.Endpoint{
                URL: cfg.EndPoint,
            }, nil
        })
        loadOptions = append(loadOptions, config.WithEndpointResolverWithOptions(endpoint))
    }

    sdkCfg, err := config.LoadDefaultConfig(context.TODO(), loadOptions...)
    if err != nil {
        panic(err)
    }
    return &s3client{cfg, sdkCfg}
}

// S3からファイルを取得する関数
func (c *s3client) GetObject(ctx context.Context, key string) (*s3.GetObjectOutput, error) {
    api := s3.NewFromConfig(c.sdkCfg, func(options *s3.Options) {
        options.UsePathStyle = true
    })

    return api.GetObject(ctx, &s3.GetObjectInput{
        Bucket: aws.String(c.clientCfg.BucketName),
        Key:    aws.String(key),
    })
}
コメント
現在コメントはありません。
コメントする
コメント入力

名前 (※ 必須)

メールアドレス (※ 必須 画面には表示されません)

送信