dqayok7935 2018-11-06 23:06
浏览 158
已采纳

FlatBuffers的多语言集成问题

Question

I have a recurring crash (panic: runtime error: slice bounds out of range) from the flatbuffers generated server files when attempting to access a part of a buffer (created with flatbuffers) which contains a message streamed from one client to a server.
This problem only appears in integration of both client and server. When tested on their own, both client and server succeed in using flatbuffers and no crash occurs in the server // see the boundary tests below
Knowing that:
- the sent and received byte[] is the same, (which excludes a problem in the communication method)
- the sent data is correctly formed before it is put in the flatbuffers' buffer and sent.
What could be causing this?

Problem context and details

I have a c++ client and a go server that communicate using FlatBuffers.
The client and server both have automated boundary-tests which confirm that they each "correctly" use flatbuffers the way they should. (i.e. the client creates the buffer before sending it and the server receives before access it)
These tests work. We are using FlatBuffers-v1.10.0

My problem is that when they are used together, the following error always occurs in the server when accessing to the buffer:

panic: runtime error: slice bounds out of range  

goroutine 19 [running]:
github.com/google/flatbuffers/go.(*Table).GetVOffsetT(...)
     /home/.../go/github.com/google/flatbuffers/go/table.go:134
github.com/google/flatbuffers/go.(*Table).OffsetT(0x4000045c68, 0x4000000004, 0x4000160008)
     /home/.../go/github.com/google/flatbuffers/go/table.go:16 +0xf0
github.com/PhantomIntelligence/Server/lib/Protocol/Stream.(*StreamedData).Id(0x4000045c68, 0x4000045c68)
     /home/.../go/github.com/PhantomIntelligence/Server/lib/Protocol/Stream/SteamedData.go:30 +0x2c
github.com/PhantomIntelligence/Server/dataAccess/conversion/flatBuffers.ConvertStreamMessage(0x4000015a000, 0xa7c, 0xe00, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0)
     /home/.../go/github.com/PhantomIntelligence/Server/dataAccess/conversion/flatBuffers/deserialization.go:55 +0x1fc
... // the rest is irrelevant for this question's purpose 

Data causing the crash

The crash always happen on the first buffer received by the server when it tries to access it to use it. More specifically, it always happen when attempting to access the either the (0th, 12th or 15th) ContainerData of the 7th Container.

Know that for this use case, we send a StreamedData with 16 Containers, each holding 16 ContainerData


Code details

Note that only the structure is remains from my original implementation. I am not allowed to divulgate the nature of the data we manipulate. Thus the object names might seem weird, but the structure is the relevant part.

Schema compilation

The c++ client runs this during its compilation:

flatc -o lib/ --no-js-exports --no-prefix --scoped-enums --cpp schemas/*.fbs

The go server runs this during its compilation:

flatc -o lib/ --no-js-exports --no-prefix --scoped-enums --go schemas/*.fbs

Schemas

Both the client and server use the following schema:

Please note that this extract is not the complete schema, only the part relevant for this question.

// file: stream_streamed_data.fbs
namespace Protocol.Stream;
table ContainerData {
  id:uint16 = 0;
  a:uint16 = 0;
  b:uint16 = 0;
  c:int16 = 0;
  d:int16 = 0;
  e:uint8 = 0;
}
table Container {
  id:uint16 = 65535;
  data:[ContainerData];
}
table StreamedData {
  id:uint16 = 0;
  containers:[Container];
}
---- 
// file: stream_payload.fbs
include "stream_streamed_data.fbs";
/* some more includes ... */

namespace Protocol.Stream;
union PayloadContent { StreamedData, ..., Request, Result } 

table Payload {
  sensor_id:string;
  content:PayloadContent;
}

---- 
// file: header.fbs

namespace Protocol;
table Header {
  source_id:string;
  destination_id:string;
  timestamp:string;
}

---- 
// file: protocol_message.fbs
include "headers.fbs";
include "stream_payload.fbs";
/* some more includes ... */

namespace Protocol;
union Content { Stream.Payload, ..., Other.Payload } 

table Message {
  header:Header;
  content:Content;
}

root_type Message;  

The client has to stream Protocol.Messages that contain Protocol.Stream.Payload, which themselves contain a Protocol.Stream.StreamedMessage.

Client implementation

#ifndef CLIENT_FLATBUFFERSCONVERTER_HPP
#define CLIENT_FLATBUFFERSCONVERTER_HPP

#include <flatbuffers/flatbuffers.h>
#include <lib/protocol/protocol_message_generated.h>
#include <chrono>
#include <iostream>

namespace ServerCommunication {

    enum class ProtocolStructure : uint8_t {
        NONE = 0,
        /* some more types ... */

        SENSOR_MESSAGE = 12,

        /* some more types ... */

        UNRECOGNIZED_TYPE = // A NUMBER,
        MIN = NONE,
        MAX = UNRECOGNIZED_TYPE
    };

    typedef uint8_t* FlatBuffersBytes;

    struct BytesToSend {
        FlatBuffersBytes data;
        size_t dataLength;
    };

    namespace BufferSize {
        size_t const STREAMED_MESSAGE = 2560;
    }

    template<class T>
    class FlatBuffersConverter {

    public:

        explicit FlatBuffersConverter() = delete;

        ~FlatBuffersConverter() noexcept = delete;

        FlatBuffersConverter(FlatBuffersConverter const& other) = delete;

        FlatBuffersConverter(FlatBuffersConverter&& other) noexcept = delete;

        FlatBuffersConverter& operator=(FlatBuffersConverter const& other)& = delete;

        FlatBuffersConverter& operator=(FlatBuffersConverter&& other)& noexcept = delete;

        static BytesToSend const convertSensorMessage(typename T::Message&& message) noexcept;

    private:

        static std::string const generateSerializationTimestamp() noexcept;


    };

    template<class T>
    BytesToSend const FlatBuffersConverter<T>::convertStreamMessage(typename T::StreamMessage&& message) noexcept {
        flatbuffers::FlatBufferBuilder builder(BufferSize::STREAM_MESSAGE);

        auto streamIdFromMessage = message.getStreamId();
        auto streamId = builder.CreateString(streamIdFromMessage.c_str(), streamIdFromMessage.size());
        auto source = streamId;
        auto destination = builder.CreateString("Server");
        auto timestamp = builder.CreateString(generateSerializationTimestamp());
        auto header = Protocol::CreateHeader(
                builder,
                source,
                destination,
                timestamp);

        auto containersFromStreamMessage = message.getContainers();
        std::vector<flatbuffers::Offset<Protocol::Stream::Container>> containerVector;
        auto containerIterator = containersFromStreamMessage->begin();
        auto containerEnd = containersFromStreamMessage->end();

        for (; containerIterator != containerEnd; ++containerIterator) {

            auto dataFromContainer = containerIterator->getData();
            std::vector<flatbuffers::Offset<GatewayProtocol::Stream::ContainerData>> containerDataVector;
            auto containerDataIterator = dataFromContainer->begin();
            auto containerDataEnd = dataFromContainer->end();

            for (; containerDataIterator != containerDataEnd; ++containerDataIterator) {
                auto track = Protocol::Stream::CreateContainerData(
                        builder,
                        containerDataIterator->id,
                        containerDataIterator->a,
                        containerDataIterator->b,
                        containerDataIterator->c,
                        containerDataIterator->d,
                        containerDataIterator->e);
                containerDataVector.push_back(containerData);
            }

            auto containerDataFBVector = builder.CreateVector(containerDataVector);

            auto container = Protocol::Stream::CreateContainer(
                    builder,
                    containerIterator->id,
                    containerDataFBVector);
            containerVector.push_back(container);
        }

        auto containers = builder.CreateVector(containerVector);
        auto streamMessageContent = Protocol::Stream::CreateStreamedData(
                builder,
                message.messageId,
                containers);

        auto streamPayload = Protocol::Stream::CreatePayload(
                builder,
                streamId,
                Protocol::Stream::PayloadContent::StreamedData,
                streamMessageContent.Union());

        auto convertedMessage = Protocol::CreateMessage(
                builder,
                header,
                Protocol::Content::Stream_Payload,
                sensorPayload.Union());

        builder.Finish(convertedMessage);

        auto size = builder.GetSize();
        auto data = builder.GetBufferPointer();
        BytesToSend bytesToSend{data, size};
        return bytesToSend;
    }

    template<class T>
    std::string const FlatBuffersConverter<T>::generateSerializationTimestamp() noexcept {
        std::size_t const ARBITRARY_BIG_ENOUGH_SIZE = 128;
        auto timestamp = std::chrono::high_resolution_clock::now();
        auto time_tTimestamp = std::chrono::system_clock::to_time_t(timestamp);
        auto utcTime = gmtime(&time_tTimestamp);
        char charArrayTime[ARBITRARY_BIG_ENOUGH_SIZE];
        auto numberOfCharacterWritten = strftime(charArrayTime, sizeof(charArrayTime), "%D %T", utcTime);
        std::string formattedTime(std::begin(charArrayTime), std::begin(charArrayTime) + numberOfCharacterWritten);
        return formattedTime;
    }

}

#endif //CLIENT_FLATBUFFERSCONVERTER_HPP

Client Boundary test

#ifndef CLIENT_SERVERCOMMUNICATORTEST_CPP
#define CLIENT_SERVERCOMMUNICATORTEST_CPP

#include <gtest/gtest.h>

/* some other include */
#include "spirit-sensor-gateway/server-communication/WebSocketServerCommunicationStrategy.hpp"
#include "test/utilities/stub/WebSocketServerStub.h" // <--- Receives and accesses the buffer

using ServerCommunication::WebSocketServerCommunicationStrategy;
using Stub::WebSocketServerStub;
using TestFunctions::DataTestUtil;

class WebSocketServerCommunicatorTest : public ::testing::Test {
protected:
};

TEST_F(WebSocketServerCommunicatorTest, given_aNumberOfRandomStreamDataMessage_when_send_then_theDataIsPutInFlatBuffersAndReceivedByTheServer) {

    auto numberOfMessageToSend = 10;

    WebSocketServerStub webSocketServerStub;

    WebSocketServerCommunicationStrategy<Sensor::Test::ServerLike::Structures> webSocketServerCommunicationStrategy;
    webSocketServerStub.run();
    webSocketServerCommunicationStrategy.openConnection(webSocketServerStub.getAddress());

    ServerStructuresLists::StreamDataMessages sentStreamDataMessages;

    for (auto i = 0; i < numberOfMessageToSend; i++) {
        auto streamDataMessage = DataTestUtil::createRandomStreamDataMessage();
        auto streamDataMessageCopy = DataModel::StreamDataMessage(streamDataMessage);

        sentStreamDataMessages.push_back(std::move(streamDataMessageCopy));
        webSocketServerCommunicationStrategy.sendMessage(std::move(streamDataMessage));
    }

    auto numberOfReceivedMessages = webSocketServerStub.getNumberOfMessageReceived();
    while(numberOfMessageToSend != numberOfReceivedMessages) {
        std::this_thread::sleep_for(std::chrono::milliseconds(50));
        std::this_thread::yield();
        numberOfReceivedMessages = webSocketServerStub.getNumberOfMessageReceived();
    }

    webSocketServerCommunicationStrategy.closeConnection();
    auto receivedStreamDataMessages = webSocketServerStub.getStreamDataMessages();

    auto sameMessageSentAndReceived = sentStreamDataMessages.size() == receivedStreamDataMessages.size();

    for (auto streamDataMessageIndex = 0;
         sameMessageSentAndReceived && streamDataMessageIndex < sentStreamDataMessages.size(); ++streamDataMessageIndex) {
        sameMessageSentAndReceived = sameMessageSentAndReceived &&
                                     sentStreamDataMessages.front() == receivedStreamDataMessages.front();
        sentStreamDataMessages.pop_front();
        receivedStreamDataMessages.pop_front();
    }
    if (!sameMessageSentAndReceived) {
        while (!sentStreamDataMessages.empty() && !receivedStreamDataMessages.empty()) {
            std::cout << "Sent:     " << sentStreamDataMessages.front().toString() << std::endl;
            std::cout << "Received: " << receivedStreamDataMessages.front().toString() << std::endl;
            sentStreamDataMessages.pop_front();
            receivedStreamDataMessages.pop_front();
        }
    }

    ASSERT_TRUE(sameMessageSentAndReceived);
}

#endif //CLIENT_SERVERCOMMUNICATORTEST_CPP

Client's ServerStub FlatBuffer Conversion function

#include "ServerStubFlatBuffersConverter.h"

using Stub::ServerFlatBuffersConverter;
using ServerCommunication::ProtocolStructure;

ServerFlatBuffersConverter::StreamDataMessage
ServerFlatBuffersConverter::convertToStreamDataMessage(const ServerCommunication::FlatBuffersBytes flatBuffersBytes) {
    // the identification step has been done at this point, we know the []byte holds a StreamedData
    auto message = Protocol::GetMessage(flatBuffersBytes);
    auto streamDataMessagePayload = message->content_as_Stream_Payload();
    auto streamedData = streamDataMessagePayload->content_as_StreamedData();
    auto messageId = streamedData->id();
    auto streamId = flatbuffers::GetString(streamDataMessagePayload->stream_id());

    auto ContainersBuffer = streamedData->containers();

    std::vector<DataModel::MessageContainer> containers;
    for (flatbuffers::uoffset_t containerIndex = 0;
         containerIndex < ContainersBuffer->Length();
         ++containerIndex) {
        auto containerFromBuffer = ContainersBuffer->Get(containerIndex);

        auto containerId = containerFromBuffer->id();
        auto containerDatasBuffer = containerFromBuffer->data();
        std::vector<DataModel::ContainerData> containerDatas;
        for (auto dataIterator = containerDatasBuffer->begin();
             dataIterator != containerDatasBuffer->end();
             ++dataIterator) {
            auto data = DataModel::ContainerData{
                    dataIterator->id(),
                    dataIterator->a(),
                    dataIterator->b(),
                    dataIterator->c(),
                    dataIterator->d(),
                    dataIterator->e()
            };
            containerDatas.push_back(data);
        }
        auto container = DataModel::MessageContainer(containerId, containerDatas);
        containers.push_back(container);
    }

    StreamDataMessage message(streamId, messageId, containers);
    return message;
}

Server Implementation

package flatBuffers

import (
    "github.com/PhantomIntelligence/Server/domain/protocol"
    "github.com/PhantomIntelligence/Server/lib/Protocol"
    "github.com/PhantomIntelligence/Server/lib/Protocol/Stream"
    "github.com/google/flatbuffers/go"
)

type GatewayMessageType = byte

const (
    NONE                     = 0
     /* some more types ... */
    SENSOR_MESSAGE           = 12
    /* some more types ... */
    UNRECOGNIZED_TYPE        = // A NUMBER
)

func ConvertStreamMessage(messageBytes []byte) protocol.StreamMessage {
    // the identification step has been done at this point, we know the []byte holds a StreamedData
    var protocolMessageFlatBuffersTable = new(flatbuffers.Table)
    var protocolMessageContentFlatBuffersTable = new(flatbuffers.Table)

    var clientMessageOffset = Protocol.GetRootAsMessage(messageBytes, 0)

    var header = new(Protocol.Header)
    clientMessageOffset.Header(header)
    clientMessageOffset.Content(protocolMessageFlatBuffersTable)

    var messageTimestampString = string(header.Timestamp())

    var streamedPayload = new(Stream.Payload)
    streamedPayload.Init(protocolMessageFlatBuffersTable.Bytes, protocolMessageFlatBuffersTable.Pos)
    streamedPayload.Content(protocolMessageContentFlatBuffersTable)

    var streamIdFromClient = string(streamedPayload.StreamId())

    var streamedDataFromClient = new(Stream.StreamedData)
    streamedDataFromClient.Init(protocolMessageContentFlatBuffersTable.Bytes, protocolMessageContentFlatBuffersTable.Pos)

    var numberOfContainers = streamedDataFromClient.ContainersLength()
    var containers []protocol.Container
    for containerIndex := 0; containerIndex < numberOfContainers; containerIndex++ {
        var containerFromStream = new(Stream.Container)
        streamedDataFromClient.Containers(containerFromStream, containerIndex)

        var numberOfContainerDatas = containerFromStream.ContainerDatasLength()
        var datas []protocol.ContainerData
        for containerDataIndex := 0; containerDataIndex < numberOfContainerDatas; containerDataIndex++ {
            var dataFromContainer = new(Stream.ContainerData)
            containerFromStream.Data(dataFromContainer, dataIndex)
            datas = append(datas, protocol.ContainerData{
                Id:         protocol.IdType(dataFromContainer.Id()), // <--- This line crashes ! always @ containerIndex = 6, containerDataIndex = 0, 12 or 15
                A:        protocol.AType(dataFromContainer.A()),
                B:       protocol.BType(dataFromContainer.B()),
                C:           protocol.CType(dataFromContainer.C()),
                D:    protocol.DType(dataFromContainer.D()),
                E: protocol.EType(dataFromContainer.E()),
            })
        }

        containers = append(containers, protocol.Container{
            ContainerId:               protocol.ContainerIdType(containerFromStream.Id()),
            ContainerDatas:                datas,
        })
    }

    var streamedMessage = protocol.StreamedMessage{
        StreamId:  protocol.SensorIdType(streamIdFromClient),
        MessageId: protocol.MessageIdType(streamedDataFromClient.Id()),
        Containers:    containers,
    }

    return  streamedMessage
}

Server Boundary test

This test pass, we also have a similar test that sends n random data instead of 1 and it passes too

package receptionFromGateway_test

import (
    "/* some more imports */
    "github.com/PhantomIntelligence/Server/test/utilities/clientStub"
    "os"
    "runtime"
    "testing"
    "time"
)

func TestFlatBuffersReceptionAndAccessFromClient(test *testing.T) {
    defer os.RemoveAll("./logs")

    test.Run("given_aStreamedDataMessageSentFromClientStub"+
        "_when_receivedAndAccessedByServer"+
        "_then_streamedDataMessageIntegrityIsConserved", func(subTest *testing.T) {
        sentStreamedDataMessage := utilities.GenerateRandomStreamedDataMessage(16, 16) // 16 container, 16 data each

        deserializer := serialization.NewFlatBufferDeserializationFilter()
        pipe := dataFlow.NewPipe(deserializer)
        procedure := dataFlowMock.NewProcedurePassToPipeThenSave(pipe)
        pipeline := dataFlow.NewPipeline(procedure)

        client := clientStub.NewWebSocketCommunicator() // <-- this calls  `convertStreamMessageToFlatBuffers` written below
        server := serving.NewServer()
        server.Router.Mediator.Pipeline = pipeline

        go server.Serve(":3591")
        runtime.Gosched()
        time.Sleep(50 * time.Millisecond)
        client.Start()
        client.Send(sentStreamedDataMessage)
        runtime.Gosched()
        time.Sleep(50 * time.Millisecond)
        client.Stop()

        pipeline.GetProducingPipe().TerminateProcess()

        var receivedStreamedDataMessage = pipeline.GetProducingPipe().Filter.(*dataFlowMock.FilterSaveObjectReceived).ObjectReceived
        utilities.AssertEqual(subTest, receivedStreamedDataMessage, sentStreamedDataMessage)
    })
}

Server ClientStub's serialization function

package client

Stub

import (
    "github.com/PhantomIntelligence/Server/dataAccess/conversion/flatBuffers"
    "github.com/PhantomIntelligence/Server/domain/protocol"
    "github.com/PhantomIntelligence/Server/lib/Protocol"
    "github.com/PhantomIntelligence/Server/lib/Protocol/Stream"
    "github.com/google/flatbuffers/go"
)

const (
    streamedDataMessageInitialSize = 2560
)

func convertStreamMessageToFlatBuffers(message protocol.StreamMessage) []byte {
    builder := flatbuffers.NewBuilder(streamedDataMessageInitialSize)

    var streamIdOffset = builder.CreateString(string(message.StreamId))

    var headerOffset = createFlatBufferHeaders(builder, streamIdOffset)

    var numberOfContainers int
    var containerOffsets = make(map[int]flatbuffers.UOffsetT)
    for containerIndex, container := range message.Containers {

        var numberOfData int
        var containerDataOffsets = make(map[int]flatbuffers.UOffsetT)
        for containerIndex, container := range container.ContainerDatas {
            Stream.ContainerDataStart(builder)
            Stream.ContainerDataAddId(builder, uint16(container.ContainerDataId))
            Stream.ContainerDataAddA(builder, uint16(container.A))
            Stream.ContainerDataAddB(builder, uint16(container.B))
            Stream.ContainerDataAddC(builder, int16(container.C))
            Stream.ContainerDataAddD(builder, int16(container.D))
            Stream.ContainerDataAddE(builder, byte(container.E))

            containerDataOffset := Stream.ContainerDataEnd(builder)
            containerDataOffsets[containerIndex] = containerDataOffset

            numberOfData = containerIndex + 1
        }

        Stream.ContainerStartDataVector(builder, numberOfData)
        // FlatBuffer UOffsetT vector requires to be filled by pre-pending backward the offsets
        for dataOffsetIndex := numberOfData - 1; dataOffsetIndex >= 0; dataOffsetIndex-- {
            builder.PrependUOffsetT(containerDataOffsets[dataOffsetIndex])
        }
        var dataOffsetVector = builder.EndVector(numberOfData)

        Stream.ContainerStart(builder)
        Stream.ContainerAddId(builder, uint16(container.ContainerId))
        Stream.ContainerAddData(builder, dataOffsetVector)

        containerOffset := Stream.ContainerEnd(builder)
        containerOffsets[containerIndex] = containerOffset

        numberOfContainers = containerIndex + 1
    }

    Stream.StreamedDataStartContainersVector(builder, numberOfContainers)
    // FlatBuffer UOffsetT vector requires to be filled by pre-pending backward the offsets
    for containerOffsetIndex := numberOfContainers - 1; containerOffsetIndex >= 0; containerOffsetIndex-- {
        builder.PrependUOffsetT(containerOffsets[containerOffsetIndex])
    }
    var containerOffsetVector = builder.EndVector(numberOfContainers)

    Stream.StreamedDataStart(builder)
    Stream.StreamedDataAddId(builder, uint16(message.MessageId))
    Stream.StreamedDataAddContainers(builder, containerOffsetVector)
    var streamedDataMessageOffset = Stream.StreamedDataEnd(builder)

    Stream.PayloadStart(builder)
    Stream.PayloadAddSensorId(builder, streamIdOffset)
    Stream.PayloadAddContentType(builder, Stream.PayloadContentStreamedData)
    Stream.PayloadAddContent(builder, streamedDataMessageOffset)
    var streamPayloadOffset = Stream.PayloadEnd(builder)

    Protocol.MessageStart(builder)
    Protocol.MessageAddHeader(builder, headerOffset)
    Protocol.MessageAddContentType(builder, Protocol.ContentStream_Payload)
    Protocol.MessageAddContent(builder, streamPayloadOffset)
    clientStreamMessageOffset := Protocol.MessageEnd(builder)

    builder.Finish(clientStreamMessageOffset)

    flatBuffersBytes := builder.Bytes[builder.Head():]
    return flatBuffersBytes
}

func createFlatBufferHeaders(builder *flatbuffers.Builder, sensorIdOffset flatbuffers.UOffsetT) flatbuffers.UOffsetT {
    destinationIdOffset := builder.CreateString("Server")
    offset := flatBuffers.CreateHeaderOffset(builder, destinationIdOffset, sensorIdOffset)
    return offset
}
  • 写回答

2条回答 默认 最新

  • dsplos5731 2018-11-06 23:39
    关注

    Not sure if this the entire problem, but a quick glance at convertStreamMessage shows a major problem: the FlatBufferBuilder is a local variable which goes out of scope just as you are returning a raw pointer to its internal data which is being destructed.

    You either want to make sure the FlatBufferBuilder outlives the sending of the data, or use FlatBufferBuilder::Release() as the return value instead.

    Also note that in debugging these problems, you could have used the C++ verifier (or any code that reads the data) to check the data just before sending it, which would have caught this problem earlier.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)

报告相同问题?

悬赏问题

  • ¥50 comfyui下连接animatediff节点生成视频质量非常差的原因
  • ¥20 有关区间dp的问题求解
  • ¥15 多电路系统共用电源的串扰问题
  • ¥15 slam rangenet++配置
  • ¥15 有没有研究水声通信方面的帮我改俩matlab代码
  • ¥15 ubuntu子系统密码忘记
  • ¥15 信号傅里叶变换在matlab上遇到的小问题请求帮助
  • ¥15 保护模式-系统加载-段寄存器
  • ¥15 电脑桌面设定一个区域禁止鼠标操作
  • ¥15 求NPF226060磁芯的详细资料