dongying9712 2019-03-05 11:47
浏览 174
已采纳

并行执行DynamoDB查询(全局二级索引的BatchGetItems)

The idea here is run multiple DynamoDB queries in parallel as the query is run over a GSI. As of now BatchGetItems doesn't support querying over Indexes and the recommended approach is to Query the data in parallel. I'm using go routines with wg to take care of the executions of the routines in parallel.

The input to the function is an array of strings with an ID, the outputs are the attributes of the Ids.

When the function is run locally, there is no issue, however, when the function is run on AWS-Lambda, the returned data keeps growing;

ie; Input 2 items should output 2 items. If the function is tested on AWS-Lambda,

  • 1st time the function returns 2 item
  • 2nd time it returns 4 items (same items are repeated 2 times)
  • 3rd time it returns 6 items (same items are repeated 4 times)

and so on. Here is a snippet of the code. Is there something not handled correctly that's having the lambda output the extra set of data every-time the lambda is run?

package main

import (
    "context"
    "fmt"
    "os"
    "sync"
    "github.com/aws/aws-lambda-go/lambda"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/service/dynamodb"
    "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
)

//Final Output Interface
var bulkOutput []interface{}

func exitWithError(err error) {
    fmt.Fprintln(os.Stderr, err)
    os.Exit(1)
}

//LambdaInputJSON input for the lambda handler
type LambdaInputJSON struct {
    Ids      []string `json:"ids,omitempty"`
}

//HandleRequest : Lambda entry point
func HandleRequest(ctx context.Context, data LambdaInputJSON) ([]interface{}, error) {
    return DynamoDBBatchGetRecords(data), nil
}

func main() {
    lambda.Start(HandleRequest)
}

func DynamoDBBatchGetRecords(a LambdaInputJSON) []interface{} {

    var wg sync.WaitGroup
    var mutex = &sync.Mutex{}

    iterations := len(a.Ids)
    wg.Add(iterations)
    for i := 0; i < iterations; i++ {
        go QueryOutput(a.Ids[i], &wg, mutex)
    }

    wg.Wait()
    return bulkOutput

}

//QueryOutput GoRoutine
func QueryOutput(data string, wg *sync.WaitGroup, mtx *sync.Mutex) {
    var outputData []interface{}
    defer wg.Done()
    sess, err := session.NewSession(&aws.Config{
        Region: aws.String("aws-region"),
    })
    if err != nil {
        exitWithError(fmt.Errorf("failed to make Query API call, %v", err))
    }
    ddb := dynamodb.New(sess)
    queryInput := &dynamodb.QueryInput{
        Limit:                aws.Int64(1),
        TableName:            aws.String("table-name"),
        IndexName:            aws.String("gsi-index"),
        ScanIndexForward:     aws.Bool(false),
        ConsistentRead:       aws.Bool(false),
        KeyConditions: map[string]*dynamodb.Condition{
            "column_name": {
                ComparisonOperator: aws.String("EQ"),
                AttributeValueList: []*dynamodb.AttributeValue{
                    {
                        S: aws.String(data),
                    },
                },
            },
        },
    }
    output, err := ddb.Query(queryInput)
    if err != nil {
        exitWithError(fmt.Errorf("Failed to make Query API call, %v", err))
    }
    err = dynamodbattribute.UnmarshalListOfMaps(output.Items, &outputData)
    if err != nil {
        exitWithError(fmt.Errorf("Failed to unmarshal Query result items, %v", err))
    }
    mtx.Lock()
    bulkOutput = append(bulkOutput, outputData)
    mtx.Unlock()
}
  • 写回答

1条回答 默认 最新

  • doufen3786 2019-03-07 08:56
    关注

    According to documentation, global variables are independent of your Lambda function's handler code. This was causing the buffer to build up over time.

    Rectified reference pasted below.

    package main
    
    import (
        "context"
        "fmt"
        "os"
        "sync"
    
        "github.com/aws/aws-lambda-go/lambda"
        "github.com/aws/aws-sdk-go/aws"
        "github.com/aws/aws-sdk-go/aws/session"
        "github.com/aws/aws-sdk-go/service/dynamodb"
        "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
    )
    
    func exitWithError(err error) {
        fmt.Fprintln(os.Stderr, err)
        os.Exit(1)
    }
    
    //HandleRequest : Lambda entry point
    func HandleRequest(ctx context.Context, data LambdaInputJSON) ([]interface{}, error) {
        output := DynamoDBBatchGetRecords(data)
        return output, nil
    }
    
    func main() {
        lambda.Start(HandleRequest)
    }
    
    func DynamoDBBatchGetRecords(a LambdaInputJSON) []interface{} {
        var dataOut []interface{}
        var wg = &sync.WaitGroup{}
        var mtx = &sync.Mutex{}
    
        iterations := len(a.Ids)
        wg.Add(iterations)
        for i := 0; i < i; i++ {
            go func(i int) {
                defer wg.Done()
                var outputData []interface{}
                sess, err := session.NewSession(&aws.Config{
                    Region: aws.String("aws-region"),
                })
                if err != nil {
                    exitWithError(fmt.Errorf("failed to make Query API call, %v", err))
                }
                ddb := dynamodb.New(sess)
                queryInput := &dynamodb.QueryInput{
                    Limit:            aws.Int64(1),
                    TableName:        aws.String("table"),
                    IndexName:        aws.String("index"),
                    ScanIndexForward: aws.Bool(false),
                    ConsistentRead: aws.Bool(false),
                    KeyConditions: map[string]*dynamodb.Condition{
                        "index-column": {
                            ComparisonOperator: aws.String("EQ"),
                            AttributeValueList: []*dynamodb.AttributeValue{
                                {
                                    S: aws.String(a.Ids[i]),
                                },
                            },
                        },
                    },
                }
                output, err := ddb.Query(queryInput)
    
                if err != nil {
                    exitWithError(fmt.Errorf("E1 failed to make Query API call, %v", err))
                }
                err = dynamodbattribute.UnmarshalListOfMaps(output.Items, &outputData)
                if err != nil {
                    exitWithError(fmt.Errorf("E2 failed to unmarshal Query result items, %v", err))
                }
    
                mtx.Lock()
                dataOut = append(dataOut, outputData[0])
                mtx.Unlock()
    
            }(i)
        }
        wg.Wait()
        return dataOut
    }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 随身WiFi网络灯亮但是没有网络,如何解决?
  • ¥15 gdf格式的脑电数据如何处理matlab
  • ¥20 重新写的代码替换了之后运行hbuliderx就这样了
  • ¥100 监控抖音用户作品更新可以微信公众号提醒
  • ¥15 UE5 如何可以不渲染HDRIBackdrop背景
  • ¥70 2048小游戏毕设项目
  • ¥20 mysql架构,按照姓名分表
  • ¥15 MATLAB实现区间[a,b]上的Gauss-Legendre积分
  • ¥15 delphi webbrowser组件网页下拉菜单自动选择问题
  • ¥15 linux驱动,linux应用,多线程