15#ifndef ACTIONENGINE_STORES_BYTE_CHUNKING_H_
16#define ACTIONENGINE_STORES_BYTE_CHUNKING_H_
27#include <absl/log/check.h>
28#include <absl/status/statusor.h>
30#include "actionengine/data/msgpack.h"
31#include "actionengine/stores/local_chunk_store.h"
38 kCompleteBytes = 0x00,
40 kLengthSuffixedByteChunk = 0x02,
43struct CompleteBytesPacket {
44 static constexpr Byte kType = BytePacketType::kCompleteBytes;
46 static uint32_t GetSerializedMetadataSize(uint64_t transient_id);
48 std::vector<Byte> serialized_message;
49 uint64_t transient_id;
52struct ByteChunkPacket {
53 static constexpr Byte kType = BytePacketType::kByteChunk;
55 static uint32_t GetSerializedMetadataSize(uint64_t transient_id,
58 std::vector<Byte> chunk;
60 uint64_t transient_id;
63struct LengthSuffixedByteChunkPacket {
64 static constexpr Byte kType = BytePacketType::kLengthSuffixedByteChunk;
66 static uint32_t GetSerializedMetadataSize(uint64_t transient_id, uint32_t seq,
69 std::vector<Byte> chunk;
72 uint64_t transient_id;
75using BytePacket = std::variant<CompleteBytesPacket, ByteChunkPacket,
76 LengthSuffixedByteChunkPacket>;
78BytePacket ProducePacket(std::vector<Byte>::const_iterator it,
79 std::vector<Byte>::const_iterator end,
80 uint64_t transient_id, uint32_t packet_size,
81 uint32_t seq = 0, int32_t length = -1,
82 bool force_no_length =
false);
84absl::StatusOr<BytePacket> ParseBytePacket(Byte* data,
size_t size);
86std::vector<BytePacket> SplitBytesIntoPackets(
const std::vector<Byte>& data,
87 uint64_t transient_id,
88 uint64_t packet_size = 65536);
90uint64_t GetTransientIdFromPacket(
const BytePacket& packet);
92std::vector<Byte> SerializeBytePacket(BytePacket packet);
96 absl::StatusOr<std::vector<Byte>> ConsumeCompleteBytes();
97 absl::StatusOr<bool> FeedPacket(BytePacket packet);
98 absl::StatusOr<bool> FeedSerializedPacket(std::vector<Byte> data);
101 LocalChunkStore chunk_store_;
102 size_t total_message_size_ = 0;
103 uint32_t total_expected_chunks_ = -1;