My goal is to use the AWS Kinesis API to create a Kinesis stream with a particular name if it doesn't already exist and then write to it whether it was there in the first place or not.
This is what I've come up with so far. Attempt to create the stream. If it fails with code 400 and returns a request ID then maybe the stream already exists. Then write to the stream to make sure it's there. In Go:
k := kinesis.New(session.New())
_, err := k.CreateStream(&kinesis.CreateStreamInput{
ShardCount: aws.Int64(2),
StreamName: aws.String("stream"),
})
if err != nil {
if reqerr, ok := err.(awserr.RequestFailure); ok {
if reqerr.RequestID() == "" {
log.Fatal("request was not delivered as it has no ID",
reqerr.Code(),
reqerr.Message(),
)
}
if reqerr.StatusCode() != 400 {
log.Fatal("unexpected status code", reqerr.StatusCode())
}
} else {
log.Fatal(err)
}
}
// Code 400 + requestID does not necessarily mean that the stream exists
// So write to the stream to confirm it exists
_, err = k.PutRecord(&kinesis.PutRecordInput{
Data: []byte("Hello Kinesis"),
PartitionKey: aws.String("partitionkey"),
StreamName: aws.String("stream"),
})
if err != nil {
log.Fatal(err)
}
The approach above seems convoluted and more importantly I don't think it effectively matches on the exact error I'm expecting. Doing a string compare on the error message seems like a bad choice too because that could easily change.
I'm wondering if there is a more reliable and straightforward way to achieve this? Listing all the available streams to search is a pain because it is a linear search and involves multiple requests with new values of ExclusiveStartStreamName
.