Question
I have a recurring crash (panic: runtime error: slice bounds out of range
) from the flatbuffers
generated server files when attempting to access a part of a buffer (created with flatbuffers
) which contains a message streamed from one client to a server.
This problem only appears in integration of both client and server. When tested on their own, both client and server succeed in using flatbuffers
and no crash occurs in the server // see the boundary tests below
Knowing that:
- the sent and received byte[]
is the same, (which excludes a problem in the communication method)
- the sent data is correctly formed before it is put in the flatbuffers
' buffer
and sent.
What could be causing this?
Problem context and details
I have a c++
client and a go
server that communicate using FlatBuffers
.
The client and server both have automated boundary-tests which confirm that they each "correctly" use flatbuffers
the way they should. (i.e. the client creates the buffer before sending it and the server receives before access it)
These tests work.
We are using FlatBuffers-v1.10.0
My problem is that when they are used together, the following error always occurs in the server when accessing to the buffer:
panic: runtime error: slice bounds out of range
goroutine 19 [running]:
github.com/google/flatbuffers/go.(*Table).GetVOffsetT(...)
/home/.../go/github.com/google/flatbuffers/go/table.go:134
github.com/google/flatbuffers/go.(*Table).OffsetT(0x4000045c68, 0x4000000004, 0x4000160008)
/home/.../go/github.com/google/flatbuffers/go/table.go:16 +0xf0
github.com/PhantomIntelligence/Server/lib/Protocol/Stream.(*StreamedData).Id(0x4000045c68, 0x4000045c68)
/home/.../go/github.com/PhantomIntelligence/Server/lib/Protocol/Stream/SteamedData.go:30 +0x2c
github.com/PhantomIntelligence/Server/dataAccess/conversion/flatBuffers.ConvertStreamMessage(0x4000015a000, 0xa7c, 0xe00, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0)
/home/.../go/github.com/PhantomIntelligence/Server/dataAccess/conversion/flatBuffers/deserialization.go:55 +0x1fc
... // the rest is irrelevant for this question's purpose
Data causing the crash
The crash always happen on the first buffer received by the server when it tries to access it to use it. More specifically, it always happen when attempting to access the either the (0th, 12th or 15th) ContainerData
of the 7th Container
.
Know that for this use case, we send a StreamedData with 16 Containers, each holding 16 ContainerData
Code details
Note that only the structure is remains from my original implementation. I am not allowed to divulgate the nature of the data we manipulate. Thus the object names might seem weird, but the structure is the relevant part.
Schema compilation
The c++
client runs this during its compilation:
flatc -o lib/ --no-js-exports --no-prefix --scoped-enums --cpp schemas/*.fbs
The go
server runs this during its compilation:
flatc -o lib/ --no-js-exports --no-prefix --scoped-enums --go schemas/*.fbs
Schemas
Both the client and server use the following schema:
Please note that this extract is not the complete schema, only the part relevant for this question.
// file: stream_streamed_data.fbs
namespace Protocol.Stream;
table ContainerData {
id:uint16 = 0;
a:uint16 = 0;
b:uint16 = 0;
c:int16 = 0;
d:int16 = 0;
e:uint8 = 0;
}
table Container {
id:uint16 = 65535;
data:[ContainerData];
}
table StreamedData {
id:uint16 = 0;
containers:[Container];
}
----
// file: stream_payload.fbs
include "stream_streamed_data.fbs";
/* some more includes ... */
namespace Protocol.Stream;
union PayloadContent { StreamedData, ..., Request, Result }
table Payload {
sensor_id:string;
content:PayloadContent;
}
----
// file: header.fbs
namespace Protocol;
table Header {
source_id:string;
destination_id:string;
timestamp:string;
}
----
// file: protocol_message.fbs
include "headers.fbs";
include "stream_payload.fbs";
/* some more includes ... */
namespace Protocol;
union Content { Stream.Payload, ..., Other.Payload }
table Message {
header:Header;
content:Content;
}
root_type Message;
The client has to stream Protocol.Message
s that contain Protocol.Stream.Payload
, which themselves contain a Protocol.Stream.StreamedMessage
.
Client implementation
#ifndef CLIENT_FLATBUFFERSCONVERTER_HPP
#define CLIENT_FLATBUFFERSCONVERTER_HPP
#include <flatbuffers/flatbuffers.h>
#include <lib/protocol/protocol_message_generated.h>
#include <chrono>
#include <iostream>
namespace ServerCommunication {
enum class ProtocolStructure : uint8_t {
NONE = 0,
/* some more types ... */
SENSOR_MESSAGE = 12,
/* some more types ... */
UNRECOGNIZED_TYPE = // A NUMBER,
MIN = NONE,
MAX = UNRECOGNIZED_TYPE
};
typedef uint8_t* FlatBuffersBytes;
struct BytesToSend {
FlatBuffersBytes data;
size_t dataLength;
};
namespace BufferSize {
size_t const STREAMED_MESSAGE = 2560;
}
template<class T>
class FlatBuffersConverter {
public:
explicit FlatBuffersConverter() = delete;
~FlatBuffersConverter() noexcept = delete;
FlatBuffersConverter(FlatBuffersConverter const& other) = delete;
FlatBuffersConverter(FlatBuffersConverter&& other) noexcept = delete;
FlatBuffersConverter& operator=(FlatBuffersConverter const& other)& = delete;
FlatBuffersConverter& operator=(FlatBuffersConverter&& other)& noexcept = delete;
static BytesToSend const convertSensorMessage(typename T::Message&& message) noexcept;
private:
static std::string const generateSerializationTimestamp() noexcept;
};
template<class T>
BytesToSend const FlatBuffersConverter<T>::convertStreamMessage(typename T::StreamMessage&& message) noexcept {
flatbuffers::FlatBufferBuilder builder(BufferSize::STREAM_MESSAGE);
auto streamIdFromMessage = message.getStreamId();
auto streamId = builder.CreateString(streamIdFromMessage.c_str(), streamIdFromMessage.size());
auto source = streamId;
auto destination = builder.CreateString("Server");
auto timestamp = builder.CreateString(generateSerializationTimestamp());
auto header = Protocol::CreateHeader(
builder,
source,
destination,
timestamp);
auto containersFromStreamMessage = message.getContainers();
std::vector<flatbuffers::Offset<Protocol::Stream::Container>> containerVector;
auto containerIterator = containersFromStreamMessage->begin();
auto containerEnd = containersFromStreamMessage->end();
for (; containerIterator != containerEnd; ++containerIterator) {
auto dataFromContainer = containerIterator->getData();
std::vector<flatbuffers::Offset<GatewayProtocol::Stream::ContainerData>> containerDataVector;
auto containerDataIterator = dataFromContainer->begin();
auto containerDataEnd = dataFromContainer->end();
for (; containerDataIterator != containerDataEnd; ++containerDataIterator) {
auto track = Protocol::Stream::CreateContainerData(
builder,
containerDataIterator->id,
containerDataIterator->a,
containerDataIterator->b,
containerDataIterator->c,
containerDataIterator->d,
containerDataIterator->e);
containerDataVector.push_back(containerData);
}
auto containerDataFBVector = builder.CreateVector(containerDataVector);
auto container = Protocol::Stream::CreateContainer(
builder,
containerIterator->id,
containerDataFBVector);
containerVector.push_back(container);
}
auto containers = builder.CreateVector(containerVector);
auto streamMessageContent = Protocol::Stream::CreateStreamedData(
builder,
message.messageId,
containers);
auto streamPayload = Protocol::Stream::CreatePayload(
builder,
streamId,
Protocol::Stream::PayloadContent::StreamedData,
streamMessageContent.Union());
auto convertedMessage = Protocol::CreateMessage(
builder,
header,
Protocol::Content::Stream_Payload,
sensorPayload.Union());
builder.Finish(convertedMessage);
auto size = builder.GetSize();
auto data = builder.GetBufferPointer();
BytesToSend bytesToSend{data, size};
return bytesToSend;
}
template<class T>
std::string const FlatBuffersConverter<T>::generateSerializationTimestamp() noexcept {
std::size_t const ARBITRARY_BIG_ENOUGH_SIZE = 128;
auto timestamp = std::chrono::high_resolution_clock::now();
auto time_tTimestamp = std::chrono::system_clock::to_time_t(timestamp);
auto utcTime = gmtime(&time_tTimestamp);
char charArrayTime[ARBITRARY_BIG_ENOUGH_SIZE];
auto numberOfCharacterWritten = strftime(charArrayTime, sizeof(charArrayTime), "%D %T", utcTime);
std::string formattedTime(std::begin(charArrayTime), std::begin(charArrayTime) + numberOfCharacterWritten);
return formattedTime;
}
}
#endif //CLIENT_FLATBUFFERSCONVERTER_HPP
Client Boundary test
#ifndef CLIENT_SERVERCOMMUNICATORTEST_CPP
#define CLIENT_SERVERCOMMUNICATORTEST_CPP
#include <gtest/gtest.h>
/* some other include */
#include "spirit-sensor-gateway/server-communication/WebSocketServerCommunicationStrategy.hpp"
#include "test/utilities/stub/WebSocketServerStub.h" // <--- Receives and accesses the buffer
using ServerCommunication::WebSocketServerCommunicationStrategy;
using Stub::WebSocketServerStub;
using TestFunctions::DataTestUtil;
class WebSocketServerCommunicatorTest : public ::testing::Test {
protected:
};
TEST_F(WebSocketServerCommunicatorTest, given_aNumberOfRandomStreamDataMessage_when_send_then_theDataIsPutInFlatBuffersAndReceivedByTheServer) {
auto numberOfMessageToSend = 10;
WebSocketServerStub webSocketServerStub;
WebSocketServerCommunicationStrategy<Sensor::Test::ServerLike::Structures> webSocketServerCommunicationStrategy;
webSocketServerStub.run();
webSocketServerCommunicationStrategy.openConnection(webSocketServerStub.getAddress());
ServerStructuresLists::StreamDataMessages sentStreamDataMessages;
for (auto i = 0; i < numberOfMessageToSend; i++) {
auto streamDataMessage = DataTestUtil::createRandomStreamDataMessage();
auto streamDataMessageCopy = DataModel::StreamDataMessage(streamDataMessage);
sentStreamDataMessages.push_back(std::move(streamDataMessageCopy));
webSocketServerCommunicationStrategy.sendMessage(std::move(streamDataMessage));
}
auto numberOfReceivedMessages = webSocketServerStub.getNumberOfMessageReceived();
while(numberOfMessageToSend != numberOfReceivedMessages) {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
std::this_thread::yield();
numberOfReceivedMessages = webSocketServerStub.getNumberOfMessageReceived();
}
webSocketServerCommunicationStrategy.closeConnection();
auto receivedStreamDataMessages = webSocketServerStub.getStreamDataMessages();
auto sameMessageSentAndReceived = sentStreamDataMessages.size() == receivedStreamDataMessages.size();
for (auto streamDataMessageIndex = 0;
sameMessageSentAndReceived && streamDataMessageIndex < sentStreamDataMessages.size(); ++streamDataMessageIndex) {
sameMessageSentAndReceived = sameMessageSentAndReceived &&
sentStreamDataMessages.front() == receivedStreamDataMessages.front();
sentStreamDataMessages.pop_front();
receivedStreamDataMessages.pop_front();
}
if (!sameMessageSentAndReceived) {
while (!sentStreamDataMessages.empty() && !receivedStreamDataMessages.empty()) {
std::cout << "Sent: " << sentStreamDataMessages.front().toString() << std::endl;
std::cout << "Received: " << receivedStreamDataMessages.front().toString() << std::endl;
sentStreamDataMessages.pop_front();
receivedStreamDataMessages.pop_front();
}
}
ASSERT_TRUE(sameMessageSentAndReceived);
}
#endif //CLIENT_SERVERCOMMUNICATORTEST_CPP
Client's ServerStub FlatBuffer Conversion function
#include "ServerStubFlatBuffersConverter.h"
using Stub::ServerFlatBuffersConverter;
using ServerCommunication::ProtocolStructure;
ServerFlatBuffersConverter::StreamDataMessage
ServerFlatBuffersConverter::convertToStreamDataMessage(const ServerCommunication::FlatBuffersBytes flatBuffersBytes) {
// the identification step has been done at this point, we know the []byte holds a StreamedData
auto message = Protocol::GetMessage(flatBuffersBytes);
auto streamDataMessagePayload = message->content_as_Stream_Payload();
auto streamedData = streamDataMessagePayload->content_as_StreamedData();
auto messageId = streamedData->id();
auto streamId = flatbuffers::GetString(streamDataMessagePayload->stream_id());
auto ContainersBuffer = streamedData->containers();
std::vector<DataModel::MessageContainer> containers;
for (flatbuffers::uoffset_t containerIndex = 0;
containerIndex < ContainersBuffer->Length();
++containerIndex) {
auto containerFromBuffer = ContainersBuffer->Get(containerIndex);
auto containerId = containerFromBuffer->id();
auto containerDatasBuffer = containerFromBuffer->data();
std::vector<DataModel::ContainerData> containerDatas;
for (auto dataIterator = containerDatasBuffer->begin();
dataIterator != containerDatasBuffer->end();
++dataIterator) {
auto data = DataModel::ContainerData{
dataIterator->id(),
dataIterator->a(),
dataIterator->b(),
dataIterator->c(),
dataIterator->d(),
dataIterator->e()
};
containerDatas.push_back(data);
}
auto container = DataModel::MessageContainer(containerId, containerDatas);
containers.push_back(container);
}
StreamDataMessage message(streamId, messageId, containers);
return message;
}
Server Implementation
package flatBuffers
import (
"github.com/PhantomIntelligence/Server/domain/protocol"
"github.com/PhantomIntelligence/Server/lib/Protocol"
"github.com/PhantomIntelligence/Server/lib/Protocol/Stream"
"github.com/google/flatbuffers/go"
)
type GatewayMessageType = byte
const (
NONE = 0
/* some more types ... */
SENSOR_MESSAGE = 12
/* some more types ... */
UNRECOGNIZED_TYPE = // A NUMBER
)
func ConvertStreamMessage(messageBytes []byte) protocol.StreamMessage {
// the identification step has been done at this point, we know the []byte holds a StreamedData
var protocolMessageFlatBuffersTable = new(flatbuffers.Table)
var protocolMessageContentFlatBuffersTable = new(flatbuffers.Table)
var clientMessageOffset = Protocol.GetRootAsMessage(messageBytes, 0)
var header = new(Protocol.Header)
clientMessageOffset.Header(header)
clientMessageOffset.Content(protocolMessageFlatBuffersTable)
var messageTimestampString = string(header.Timestamp())
var streamedPayload = new(Stream.Payload)
streamedPayload.Init(protocolMessageFlatBuffersTable.Bytes, protocolMessageFlatBuffersTable.Pos)
streamedPayload.Content(protocolMessageContentFlatBuffersTable)
var streamIdFromClient = string(streamedPayload.StreamId())
var streamedDataFromClient = new(Stream.StreamedData)
streamedDataFromClient.Init(protocolMessageContentFlatBuffersTable.Bytes, protocolMessageContentFlatBuffersTable.Pos)
var numberOfContainers = streamedDataFromClient.ContainersLength()
var containers []protocol.Container
for containerIndex := 0; containerIndex < numberOfContainers; containerIndex++ {
var containerFromStream = new(Stream.Container)
streamedDataFromClient.Containers(containerFromStream, containerIndex)
var numberOfContainerDatas = containerFromStream.ContainerDatasLength()
var datas []protocol.ContainerData
for containerDataIndex := 0; containerDataIndex < numberOfContainerDatas; containerDataIndex++ {
var dataFromContainer = new(Stream.ContainerData)
containerFromStream.Data(dataFromContainer, dataIndex)
datas = append(datas, protocol.ContainerData{
Id: protocol.IdType(dataFromContainer.Id()), // <--- This line crashes ! always @ containerIndex = 6, containerDataIndex = 0, 12 or 15
A: protocol.AType(dataFromContainer.A()),
B: protocol.BType(dataFromContainer.B()),
C: protocol.CType(dataFromContainer.C()),
D: protocol.DType(dataFromContainer.D()),
E: protocol.EType(dataFromContainer.E()),
})
}
containers = append(containers, protocol.Container{
ContainerId: protocol.ContainerIdType(containerFromStream.Id()),
ContainerDatas: datas,
})
}
var streamedMessage = protocol.StreamedMessage{
StreamId: protocol.SensorIdType(streamIdFromClient),
MessageId: protocol.MessageIdType(streamedDataFromClient.Id()),
Containers: containers,
}
return streamedMessage
}
Server Boundary test
This test pass, we also have a similar test that sends n random data instead of 1 and it passes too
package receptionFromGateway_test
import (
"/* some more imports */
"github.com/PhantomIntelligence/Server/test/utilities/clientStub"
"os"
"runtime"
"testing"
"time"
)
func TestFlatBuffersReceptionAndAccessFromClient(test *testing.T) {
defer os.RemoveAll("./logs")
test.Run("given_aStreamedDataMessageSentFromClientStub"+
"_when_receivedAndAccessedByServer"+
"_then_streamedDataMessageIntegrityIsConserved", func(subTest *testing.T) {
sentStreamedDataMessage := utilities.GenerateRandomStreamedDataMessage(16, 16) // 16 container, 16 data each
deserializer := serialization.NewFlatBufferDeserializationFilter()
pipe := dataFlow.NewPipe(deserializer)
procedure := dataFlowMock.NewProcedurePassToPipeThenSave(pipe)
pipeline := dataFlow.NewPipeline(procedure)
client := clientStub.NewWebSocketCommunicator() // <-- this calls `convertStreamMessageToFlatBuffers` written below
server := serving.NewServer()
server.Router.Mediator.Pipeline = pipeline
go server.Serve(":3591")
runtime.Gosched()
time.Sleep(50 * time.Millisecond)
client.Start()
client.Send(sentStreamedDataMessage)
runtime.Gosched()
time.Sleep(50 * time.Millisecond)
client.Stop()
pipeline.GetProducingPipe().TerminateProcess()
var receivedStreamedDataMessage = pipeline.GetProducingPipe().Filter.(*dataFlowMock.FilterSaveObjectReceived).ObjectReceived
utilities.AssertEqual(subTest, receivedStreamedDataMessage, sentStreamedDataMessage)
})
}
Server ClientStub's serialization function
package client
Stub
import (
"github.com/PhantomIntelligence/Server/dataAccess/conversion/flatBuffers"
"github.com/PhantomIntelligence/Server/domain/protocol"
"github.com/PhantomIntelligence/Server/lib/Protocol"
"github.com/PhantomIntelligence/Server/lib/Protocol/Stream"
"github.com/google/flatbuffers/go"
)
const (
streamedDataMessageInitialSize = 2560
)
func convertStreamMessageToFlatBuffers(message protocol.StreamMessage) []byte {
builder := flatbuffers.NewBuilder(streamedDataMessageInitialSize)
var streamIdOffset = builder.CreateString(string(message.StreamId))
var headerOffset = createFlatBufferHeaders(builder, streamIdOffset)
var numberOfContainers int
var containerOffsets = make(map[int]flatbuffers.UOffsetT)
for containerIndex, container := range message.Containers {
var numberOfData int
var containerDataOffsets = make(map[int]flatbuffers.UOffsetT)
for containerIndex, container := range container.ContainerDatas {
Stream.ContainerDataStart(builder)
Stream.ContainerDataAddId(builder, uint16(container.ContainerDataId))
Stream.ContainerDataAddA(builder, uint16(container.A))
Stream.ContainerDataAddB(builder, uint16(container.B))
Stream.ContainerDataAddC(builder, int16(container.C))
Stream.ContainerDataAddD(builder, int16(container.D))
Stream.ContainerDataAddE(builder, byte(container.E))
containerDataOffset := Stream.ContainerDataEnd(builder)
containerDataOffsets[containerIndex] = containerDataOffset
numberOfData = containerIndex + 1
}
Stream.ContainerStartDataVector(builder, numberOfData)
// FlatBuffer UOffsetT vector requires to be filled by pre-pending backward the offsets
for dataOffsetIndex := numberOfData - 1; dataOffsetIndex >= 0; dataOffsetIndex-- {
builder.PrependUOffsetT(containerDataOffsets[dataOffsetIndex])
}
var dataOffsetVector = builder.EndVector(numberOfData)
Stream.ContainerStart(builder)
Stream.ContainerAddId(builder, uint16(container.ContainerId))
Stream.ContainerAddData(builder, dataOffsetVector)
containerOffset := Stream.ContainerEnd(builder)
containerOffsets[containerIndex] = containerOffset
numberOfContainers = containerIndex + 1
}
Stream.StreamedDataStartContainersVector(builder, numberOfContainers)
// FlatBuffer UOffsetT vector requires to be filled by pre-pending backward the offsets
for containerOffsetIndex := numberOfContainers - 1; containerOffsetIndex >= 0; containerOffsetIndex-- {
builder.PrependUOffsetT(containerOffsets[containerOffsetIndex])
}
var containerOffsetVector = builder.EndVector(numberOfContainers)
Stream.StreamedDataStart(builder)
Stream.StreamedDataAddId(builder, uint16(message.MessageId))
Stream.StreamedDataAddContainers(builder, containerOffsetVector)
var streamedDataMessageOffset = Stream.StreamedDataEnd(builder)
Stream.PayloadStart(builder)
Stream.PayloadAddSensorId(builder, streamIdOffset)
Stream.PayloadAddContentType(builder, Stream.PayloadContentStreamedData)
Stream.PayloadAddContent(builder, streamedDataMessageOffset)
var streamPayloadOffset = Stream.PayloadEnd(builder)
Protocol.MessageStart(builder)
Protocol.MessageAddHeader(builder, headerOffset)
Protocol.MessageAddContentType(builder, Protocol.ContentStream_Payload)
Protocol.MessageAddContent(builder, streamPayloadOffset)
clientStreamMessageOffset := Protocol.MessageEnd(builder)
builder.Finish(clientStreamMessageOffset)
flatBuffersBytes := builder.Bytes[builder.Head():]
return flatBuffersBytes
}
func createFlatBufferHeaders(builder *flatbuffers.Builder, sensorIdOffset flatbuffers.UOffsetT) flatbuffers.UOffsetT {
destinationIdOffset := builder.CreateString("Server")
offset := flatBuffers.CreateHeaderOffset(builder, destinationIdOffset, sensorIdOffset)
return offset
}