Action Engine
Loading...
Searching...
No Matches
chunk_store_reader.h
1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#ifndef ACTIONENGINE_STORES_CHUNK_STORE_READER_H_
16#define ACTIONENGINE_STORES_CHUNK_STORE_READER_H_
17
18#include <algorithm>
19#include <cstddef>
20#include <memory>
21#include <optional>
22#include <utility>
23#include <vector>
24
25#include <absl/base/nullability.h>
26#include <absl/base/thread_annotations.h>
27#include <absl/log/check.h>
28#include <absl/status/status.h>
29#include <absl/status/statusor.h>
30#include <absl/time/time.h>
31
33#include "actionengine/data/serialization.h"
36#include "actionengine/util/status_macros.h"
37
38namespace act {
39
46 bool ordered = true;
47
52 bool remove_chunks = true;
53
56 size_t n_chunks_to_buffer = 32;
57
60 absl::Duration timeout = absl::InfiniteDuration();
61
62 size_t start_seq_or_offset = 0;
63};
64
78 public:
89 explicit ChunkStoreReader(
90 ChunkStore* absl_nonnull chunk_store,
92
93 // This class is not copyable or movable.
94 ChunkStoreReader(const ChunkStoreReader&) = delete;
95 ChunkStoreReader& operator=(const ChunkStoreReader&) = delete;
96
98
107 void Cancel() const;
108
126 void SetOptions(const ChunkStoreReaderOptions& options);
127
134 [[nodiscard]]
136 return options_;
137 }
138
160 absl::StatusOr<std::optional<Chunk>> Next(
161 std::optional<absl::Duration> timeout = std::nullopt);
162
163 absl::StatusOr<std::optional<NodeFragment>> NextFragment(
164 std::optional<absl::Duration> timeout = std::nullopt) {
165 act::MutexLock lock(&mu_);
166 absl::StatusOr<std::optional<std::pair<int, Chunk>>> seq_and_chunk =
167 GetNextSeqAndChunkFromBuffer(timeout.value_or(options_.timeout));
168 RETURN_IF_ERROR(seq_and_chunk.status());
169 ASSIGN_OR_RETURN(const int64_t final_seq, chunk_store_->GetFinalSeq());
170 if (!seq_and_chunk->has_value()) {
171 return std::nullopt;
172 }
173 if (seq_and_chunk->value().second.IsNull()) {
174 // If the chunk is null, it means that the stream has ended.
175 // TODO: this logic is not ideal, as it does not allow to distinguish
176 // between an empty chunk and the end of the stream. We should rethink it.
177 return std::nullopt;
178 }
179 auto& chunk = seq_and_chunk->value().second;
180 const int seq = seq_and_chunk->value().first;
181 return NodeFragment{std::string(chunk_store_->GetId()), std::move(chunk),
182 seq, final_seq == -1 || seq != final_seq};
183 }
184
193 template <typename T>
194 absl::StatusOr<std::optional<T>> Next(
195 std::optional<absl::Duration> timeout = std::nullopt) {
196 ASSIGN_OR_RETURN(std::optional<Chunk> chunk, Next(timeout));
197 if (!chunk) {
198 return std::nullopt;
199 }
200 if (chunk->metadata && chunk->metadata->mimetype == "__status__") {
201 absl::StatusOr<absl::Status> status = ConvertTo<absl::Status>(*chunk);
202 if (!status.ok()) {
203 return status.status();
204 }
205 // Nested status
206 if (!status->ok()) {
207 return *status;
208 }
209 return absl::FailedPreconditionError(
210 "Next<T>() cannot return an OK status chunk if T is not "
211 "absl::Status. ");
212 }
213 ASSIGN_OR_RETURN(T result, FromChunkAs<T>(*std::move(chunk)));
214 return std::optional{result};
215 }
216
217 // Definitions follow in the header for some well-known types. If the next
218 // chunk cannot be read, this operator will crash. Prefer to use the Next()
219 // method instead, which returns an absl::StatusOr<std::optional<T>>,
220 // unless you are sure that the next chunk will always be available.
221 template <typename T>
222 friend ChunkStoreReader& operator>>(ChunkStoreReader& reader, T& value);
223
230 absl::Status GetStatus() const;
231
232 private:
233 friend class Action;
234
235 void EnsurePrefetchIsRunningOrHasCompleted()
236 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
237
238 absl::StatusOr<std::optional<Chunk>> GetNextChunkFromBuffer(
239 absl::Duration timeout) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
240 absl::StatusOr<std::optional<std::pair<int, Chunk>>>
241 GetNextSeqAndChunkFromBuffer(absl::Duration timeout)
242 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
243
244 absl::StatusOr<std::optional<std::pair<int, Chunk>>>
245 GetNextUnorderedSeqAndChunkFromStore() const
246 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
247
248 absl::Status RunPrefetchLoop() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
249
250 ChunkStore* const absl_nonnull chunk_store_;
252
253 std::unique_ptr<thread::Fiber> fiber_;
254 std::unique_ptr<thread::Channel<std::optional<std::pair<int, Chunk>>>>
255 buffer_;
256 int total_chunks_read_ = 0;
257
258 absl::Status status_;
259 mutable act::Mutex mu_;
260};
261
262// These specializations simply take the mutex and call the internal methods
263// GetNextSeqAndChunkFromBuffer and GetNextChunkFromBuffer.
264template <>
265absl::StatusOr<std::optional<std::pair<int, Chunk>>> ChunkStoreReader::Next(
266 std::optional<absl::Duration> timeout);
267
268template <>
269absl::StatusOr<std::optional<absl::Status>> ChunkStoreReader::Next(
270 std::optional<absl::Duration> timeout);
271
272template <typename T>
273ChunkStoreReader& operator>>(ChunkStoreReader& reader,
274 std::optional<T>& value) {
275 absl::StatusOr<std::optional<T>> next_value = reader.Next<T>();
276 CHECK_OK(next_value.status())
277 << "Failed to read next value: " << next_value.status();
278 value = *std::move(next_value);
279 return reader;
280}
281
282template <typename T>
283ChunkStoreReader& operator>>(ChunkStoreReader& reader, std::vector<T>& value) {
284 while (true) {
285 absl::StatusOr<std::optional<T>> status_or_element = reader.Next<T>();
286 CHECK_OK(status_or_element.status())
287 << "Failed to read next element: " << status_or_element.status();
288
289 std::optional<T>& element = *status_or_element;
290 if (!element) {
291 break;
292 }
293
294 value.push_back(*std::move(element));
295 }
296 return reader;
297}
298
299template <typename T>
300ChunkStoreReader* absl_nonnull operator>>(ChunkStoreReader* absl_nonnull reader,
301 T& value) {
302 *reader >> value;
303 return reader;
304}
305
306template <typename T>
307std::unique_ptr<ChunkStoreReader>& operator>>(
308 std::unique_ptr<ChunkStoreReader>& reader, T& value) {
309 *reader >> value;
310 return reader;
311}
312
313template <typename T>
314std::shared_ptr<ChunkStoreReader>& operator>>(
315 std::shared_ptr<ChunkStoreReader>& reader, T& value) {
316 *reader >> value;
317 return reader;
318}
319
320} // namespace act
321
322#endif // ACTIONENGINE_STORES_CHUNK_STORE_READER_H_
Concurrency utilities for ActionEngine.
Definition action.h:63
An abstract interface for raw data storage and retrieval for ActionEngine nodes.
Definition chunk_store.h:84
A utility for reading chunks from a ChunkStore.
Definition chunk_store_reader.h:77
absl::StatusOr< std::optional< Chunk > > Next(std::optional< absl::Duration > timeout=std::nullopt)
Reads the next chunk from the ChunkStore.
Definition chunk_store_reader.cc:86
ChunkStoreReader(ChunkStore *absl_nonnull chunk_store, ChunkStoreReaderOptions options=ChunkStoreReaderOptions())
Constructs a ChunkStoreReader for the given ChunkStore, setting options if provided.
Definition chunk_store_reader.cc:51
const ChunkStoreReaderOptions & GetOptions() const
Returns the current options of the ChunkStoreReader.
Definition chunk_store_reader.h:135
absl::Status GetStatus() const
Returns the status of the background prefetch loop.
Definition chunk_store_reader.cc:286
void Cancel() const
Cancels the background prefetch fiber and stops reading from the store.
Definition chunk_store_reader.cc:71
absl::StatusOr< std::optional< T > > Next(std::optional< absl::Duration > timeout=std::nullopt)
Same as Next(), but casts the chunk to the specified type T.
Definition chunk_store_reader.h:194
void SetOptions(const ChunkStoreReaderOptions &options)
Sets the options for the ChunkStoreReader.
Definition chunk_store_reader.cc:79
An abstract interface for raw data storage and retrieval for ActionEngine nodes.
Definition types.h:104
Options for the ChunkStoreReader.
Definition chunk_store_reader.h:41
absl::Duration timeout
The timeout for reading chunks from the store, which applies to the Next() method.
Definition chunk_store_reader.h:60
bool remove_chunks
Whether to remove chunks from the store after reading them. If false, chunks will remain in the store...
Definition chunk_store_reader.h:52
bool ordered
Whether to read chunks in the explicit seq order. If false, chunks will be read in the order they arr...
Definition chunk_store_reader.h:46
size_t n_chunks_to_buffer
The number of chunks to buffer in memory before the background fiber blocks on reading more chunks.
Definition chunk_store_reader.h:56
ActionEngine data structures used to implement actions and nodes (data streams).