15#ifndef ACTIONENGINE_NODES_ASYNC_NODE_H_
16#define ACTIONENGINE_NODES_ASYNC_NODE_H_
25#include <absl/base/nullability.h>
26#include <absl/base/optimization.h>
27#include <absl/base/thread_annotations.h>
28#include <absl/container/flat_hash_map.h>
29#include <absl/log/check.h>
30#include <absl/log/log.h>
31#include <absl/status/status.h>
32#include <absl/status/statusor.h>
33#include <absl/strings/str_cat.h>
34#include <absl/time/clock.h>
35#include <absl/time/time.h>
38#include "actionengine/data/serialization.h"
40#include "actionengine/net/stream.h"
42#include "actionengine/stores/chunk_store_reader.h"
43#include "actionengine/stores/chunk_store_writer.h"
44#include "actionengine/util/status_macros.h"
83 explicit AsyncNode(std::string_view
id =
"",
84 NodeMap* absl_nullable node_map =
nullptr,
85 std::unique_ptr<ChunkStore> chunk_store =
nullptr);
116 auto Put(
Chunk value,
int seq = -1,
bool final =
false) -> absl::Status;
148 template <
typename T>
149 auto Put(T value,
int seq = -1,
bool final =
false) -> absl::Status;
175 [[nodiscard]]
auto GetId()
const -> std::string;
194 absl::StatusOr<std::optional<Chunk>>
Next(
195 std::optional<absl::Duration> timeout = std::nullopt);
197 template <
typename T>
198 auto Next(std::optional<absl::Duration> timeout = std::nullopt)
199 -> absl::StatusOr<std::optional<T>>;
201 std::optional<Chunk> NextOrDie(
202 std::optional<absl::Duration> timeout = std::nullopt);
204 template <
typename T>
205 auto NextOrDie(std::optional<absl::Duration> timeout = std::nullopt)
208 template <
typename T>
209 absl::StatusOr<T> ConsumeAs(
210 std::optional<absl::Duration> timeout = std::nullopt);
213 auto GetReaderStatus()
const -> absl::Status;
215 -> std::unique_ptr<ChunkStoreReader>;
219 template <
typename T>
222 template <
typename T>
227 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
230 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
233 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
234 absl::Status PutInternal(
Chunk chunk,
int seq = -1,
bool final =
false)
235 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
237 NodeMap* absl_nullable node_map_ =
nullptr;
238 std::unique_ptr<ChunkStore> chunk_store_;
240 mutable act::Mutex mu_;
241 mutable act::CondVar cv_ ABSL_GUARDED_BY(mu_);
242 std::unique_ptr<ChunkStoreReader> default_reader_ ABSL_GUARDED_BY(mu_);
243 std::unique_ptr<ChunkStoreWriter> default_writer_ ABSL_GUARDED_BY(mu_);
248 auto chunk = ToChunk(std::move(value));
250 return chunk.status();
252 return Put(*std::move(chunk), seq,
final);
257 -> absl::StatusOr<std::optional<T>> {
259 return reader.
Next<T>(timeout);
264 -> absl::StatusOr<std::optional<NodeFragment>> {
265 ChunkStoreReader& reader = GetReader();
266 return reader.NextFragment(timeout);
270auto AsyncNode::NextOrDie(std::optional<absl::Duration> timeout)
271 -> std::optional<T> {
272 auto next = Next<T>(timeout);
273 CHECK_OK(next.status());
274 return *std::move(next);
278absl::StatusOr<T> AsyncNode::ConsumeAs(std::optional<absl::Duration> timeout) {
279 timeout = timeout.value_or(GetReader().GetOptions().timeout);
280 const absl::Time started_at = absl::Now();
283 ASSIGN_OR_RETURN(std::optional<T> item,
Next<T>(*timeout));
285 return absl::FailedPreconditionError(
286 "AsyncNode is empty at current offset, "
287 "cannot consume item as type T.");
290 const absl::Duration elapsed = absl::Now() - started_at;
291 if (elapsed > *timeout) {
292 return absl::DeadlineExceededError(
293 absl::StrCat(
"Timed out after ", absl::FormatDuration(elapsed),
294 " while consuming item as type T."));
298 ASSIGN_OR_RETURN(
const std::optional<Chunk> must_be_nullopt,
300 if (must_be_nullopt.has_value()) {
301 return absl::FailedPreconditionError(
302 "AsyncNode must be empty after consuming the item.");
305 return *std::move(item);
316 std::optional<Chunk> chunk;
319 if (!chunk.has_value()) {
320 value = std::nullopt;
324 auto status_or_value = FromChunkAs<T>(*std::move(chunk));
325 if (!status_or_value.ok()) {
326 LOG(FATAL) <<
"Failed to convert chunk to value: "
327 << status_or_value.status();
330 value = std::move(status_or_value.value());
337 return node << *ToChunk(std::move(value));
356std::unique_ptr<AsyncNode>& operator>>(std::unique_ptr<AsyncNode>& node,
363std::shared_ptr<AsyncNode>& operator>>(std::shared_ptr<AsyncNode>& node,
382 for (
auto& element : std::move(value)) {
383 auto status = node.Put(std::move(element));
385 LOG(ERROR) <<
"Failed to put element: " << status;
396 *node << std::move(value);
401std::unique_ptr<AsyncNode>& operator<<(std::unique_ptr<AsyncNode>& node,
403 *node << std::move(value);
408std::shared_ptr<AsyncNode>& operator<<(std::shared_ptr<AsyncNode>& node,
410 *node << std::move(value);
Concurrency utilities for ActionEngine.
Definition async_node.h:70
auto GetWriterStatus() const -> absl::Status
Definition async_node.cc:126
AsyncNode(std::string_view id="", NodeMap *absl_nullable node_map=nullptr, std::unique_ptr< ChunkStore > chunk_store=nullptr)
Definition async_node.cc:37
auto Put(Chunk value, int seq=-1, bool final=false) -> absl::Status
Definition async_node.cc:198
ChunkStoreWriter & GetWriter()
Definition async_node.cc:121
absl::StatusOr< std::optional< Chunk > > Next(std::optional< absl::Duration > timeout=std::nullopt)
Definition async_node.cc:138
A utility for reading chunks from a ChunkStore.
Definition chunk_store_reader.h:77
absl::StatusOr< std::optional< Chunk > > Next(std::optional< absl::Duration > timeout=std::nullopt)
Reads the next chunk from the ChunkStore.
Definition chunk_store_reader.cc:86
A writer for the ChunkStore that allows writing chunks to the store in a buffered manner.
Definition chunk_store_writer.h:48
An abstract interface for raw data storage and retrieval for ActionEngine nodes.
Options for the ChunkStoreReader.
Definition chunk_store_reader.h:41
ActionEngine data structures used to implement actions and nodes (data streams).