Action Engine
Loading...
Searching...
No Matches
byte_chunking.h
1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#ifndef ACTIONENGINE_STORES_BYTE_CHUNKING_H_
16#define ACTIONENGINE_STORES_BYTE_CHUNKING_H_
17
18#include <algorithm>
19#include <cstddef>
20#include <cstdint>
21#include <iterator>
22#include <optional>
23#include <utility>
24#include <variant>
25#include <vector>
26
27#include <absl/log/check.h>
28#include <absl/status/statusor.h>
29
30#include "actionengine/data/msgpack.h" // IWYU pragma: keep
31#include "actionengine/stores/local_chunk_store.h"
32
33namespace act::data {
34
35using Byte = uint8_t;
36
37enum BytePacketType {
38 kCompleteBytes = 0x00,
39 kByteChunk = 0x01,
40 kLengthSuffixedByteChunk = 0x02,
41};
42
43struct CompleteBytesPacket {
44 static constexpr Byte kType = BytePacketType::kCompleteBytes;
45
46 static uint32_t GetSerializedMetadataSize(uint64_t transient_id);
47
48 std::vector<Byte> serialized_message;
49 uint64_t transient_id;
50};
51
52struct ByteChunkPacket {
53 static constexpr Byte kType = BytePacketType::kByteChunk;
54
55 static uint32_t GetSerializedMetadataSize(uint64_t transient_id,
56 uint32_t seq);
57
58 std::vector<Byte> chunk;
59 uint32_t seq;
60 uint64_t transient_id;
61};
62
63struct LengthSuffixedByteChunkPacket {
64 static constexpr Byte kType = BytePacketType::kLengthSuffixedByteChunk;
65
66 static uint32_t GetSerializedMetadataSize(uint64_t transient_id, uint32_t seq,
67 uint32_t length);
68
69 std::vector<Byte> chunk;
70 uint32_t length;
71 uint32_t seq;
72 uint64_t transient_id;
73};
74
75using BytePacket = std::variant<CompleteBytesPacket, ByteChunkPacket,
76 LengthSuffixedByteChunkPacket>;
77
78BytePacket ProducePacket(std::vector<Byte>::const_iterator it,
79 std::vector<Byte>::const_iterator end,
80 uint64_t transient_id, uint32_t packet_size,
81 uint32_t seq = 0, int32_t length = -1,
82 bool force_no_length = false);
83
84absl::StatusOr<BytePacket> ParseBytePacket(Byte* data, size_t size);
85
86std::vector<BytePacket> SplitBytesIntoPackets(const std::vector<Byte>& data,
87 uint64_t transient_id,
88 uint64_t packet_size = 65536);
89
90uint64_t GetTransientIdFromPacket(const BytePacket& packet);
91
92std::vector<Byte> SerializeBytePacket(BytePacket packet);
93
94class ChunkedBytes {
95 public:
96 absl::StatusOr<std::vector<Byte>> ConsumeCompleteBytes();
97 absl::StatusOr<bool> FeedPacket(BytePacket packet);
98 absl::StatusOr<bool> FeedSerializedPacket(std::vector<Byte> data);
99
100 private:
101 LocalChunkStore chunk_store_;
102 size_t total_message_size_ = 0;
103 uint32_t total_expected_chunks_ = -1;
104};
105
106} // namespace act::data
107
108#endif // ACTIONENGINE_DATA_BYTE_CHUNKING_H_