15#ifndef ACTIONENGINE_STORES_CHUNK_STORE_READER_H_
16#define ACTIONENGINE_STORES_CHUNK_STORE_READER_H_
25#include <absl/base/nullability.h>
26#include <absl/base/thread_annotations.h>
27#include <absl/log/check.h>
28#include <absl/status/status.h>
29#include <absl/status/statusor.h>
30#include <absl/time/time.h>
33#include "actionengine/data/serialization.h"
36#include "actionengine/util/status_macros.h"
60 absl::Duration
timeout = absl::InfiniteDuration();
62 size_t start_seq_or_offset = 0;
160 absl::StatusOr<std::optional<Chunk>>
Next(
161 std::optional<absl::Duration> timeout = std::nullopt);
163 absl::StatusOr<std::optional<NodeFragment>> NextFragment(
164 std::optional<absl::Duration> timeout = std::nullopt) {
165 act::MutexLock lock(&mu_);
166 absl::StatusOr<std::optional<std::pair<int, Chunk>>> seq_and_chunk =
167 GetNextSeqAndChunkFromBuffer(timeout.value_or(options_.
timeout));
168 RETURN_IF_ERROR(seq_and_chunk.status());
169 ASSIGN_OR_RETURN(
const int64_t final_seq, chunk_store_->GetFinalSeq());
170 if (!seq_and_chunk->has_value()) {
173 if (seq_and_chunk->value().second.IsNull()) {
179 auto& chunk = seq_and_chunk->value().second;
180 const int seq = seq_and_chunk->value().first;
181 return NodeFragment{std::string(chunk_store_->GetId()), std::move(chunk),
182 seq, final_seq == -1 || seq != final_seq};
193 template <
typename T>
194 absl::StatusOr<std::optional<T>>
Next(
195 std::optional<absl::Duration> timeout = std::nullopt) {
196 ASSIGN_OR_RETURN(std::optional<Chunk> chunk,
Next(timeout));
200 if (chunk->metadata && chunk->metadata->mimetype ==
"__status__") {
201 absl::StatusOr<absl::Status> status = ConvertTo<absl::Status>(*chunk);
203 return status.status();
209 return absl::FailedPreconditionError(
210 "Next<T>() cannot return an OK status chunk if T is not "
213 ASSIGN_OR_RETURN(T result, FromChunkAs<T>(*std::move(chunk)));
214 return std::optional{result};
221 template <
typename T>
235 void EnsurePrefetchIsRunningOrHasCompleted()
236 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
238 absl::StatusOr<std::optional<
Chunk>> GetNextChunkFromBuffer(
239 absl::Duration timeout) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
240 absl::StatusOr<std::optional<std::pair<
int,
Chunk>>>
241 GetNextSeqAndChunkFromBuffer(absl::Duration timeout)
242 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
244 absl::StatusOr<std::optional<std::pair<
int,
Chunk>>>
245 GetNextUnorderedSeqAndChunkFromStore() const
246 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
248 absl::Status RunPrefetchLoop() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
253 std::unique_ptr<thread::Fiber> fiber_;
254 std::unique_ptr<thread::Channel<std::optional<std::pair<
int,
Chunk>>>>
256 int total_chunks_read_ = 0;
258 absl::Status status_;
259 mutable act::Mutex mu_;
266 std::optional<absl::Duration> timeout);
270 std::optional<absl::Duration> timeout);
274 std::optional<T>& value) {
275 absl::StatusOr<std::optional<T>> next_value = reader.
Next<T>();
276 CHECK_OK(next_value.status())
277 <<
"Failed to read next value: " << next_value.status();
278 value = *std::move(next_value);
285 absl::StatusOr<std::optional<T>> status_or_element = reader.Next<T>();
286 CHECK_OK(status_or_element.status())
287 <<
"Failed to read next element: " << status_or_element.status();
289 std::optional<T>& element = *status_or_element;
294 value.push_back(*std::move(element));
307std::unique_ptr<ChunkStoreReader>& operator>>(
308 std::unique_ptr<ChunkStoreReader>& reader, T& value) {
314std::shared_ptr<ChunkStoreReader>& operator>>(
315 std::shared_ptr<ChunkStoreReader>& reader, T& value) {
Concurrency utilities for ActionEngine.
An abstract interface for raw data storage and retrieval for ActionEngine nodes.
Definition chunk_store.h:84
A utility for reading chunks from a ChunkStore.
Definition chunk_store_reader.h:77
absl::StatusOr< std::optional< Chunk > > Next(std::optional< absl::Duration > timeout=std::nullopt)
Reads the next chunk from the ChunkStore.
Definition chunk_store_reader.cc:86
ChunkStoreReader(ChunkStore *absl_nonnull chunk_store, ChunkStoreReaderOptions options=ChunkStoreReaderOptions())
Constructs a ChunkStoreReader for the given ChunkStore, setting options if provided.
Definition chunk_store_reader.cc:51
const ChunkStoreReaderOptions & GetOptions() const
Returns the current options of the ChunkStoreReader.
Definition chunk_store_reader.h:135
absl::Status GetStatus() const
Returns the status of the background prefetch loop.
Definition chunk_store_reader.cc:286
void Cancel() const
Cancels the background prefetch fiber and stops reading from the store.
Definition chunk_store_reader.cc:71
absl::StatusOr< std::optional< T > > Next(std::optional< absl::Duration > timeout=std::nullopt)
Same as Next(), but casts the chunk to the specified type T.
Definition chunk_store_reader.h:194
void SetOptions(const ChunkStoreReaderOptions &options)
Sets the options for the ChunkStoreReader.
Definition chunk_store_reader.cc:79
An abstract interface for raw data storage and retrieval for ActionEngine nodes.
Options for the ChunkStoreReader.
Definition chunk_store_reader.h:41
absl::Duration timeout
The timeout for reading chunks from the store, which applies to the Next() method.
Definition chunk_store_reader.h:60
bool remove_chunks
Whether to remove chunks from the store after reading them. If false, chunks will remain in the store...
Definition chunk_store_reader.h:52
bool ordered
Whether to read chunks in the explicit seq order. If false, chunks will be read in the order they arr...
Definition chunk_store_reader.h:46
size_t n_chunks_to_buffer
The number of chunks to buffer in memory before the background fiber blocks on reading more chunks.
Definition chunk_store_reader.h:56
ActionEngine data structures used to implement actions and nodes (data streams).