Action Engine
Loading...
Searching...
No Matches
async_node.h
Go to the documentation of this file.
1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#ifndef ACTIONENGINE_NODES_ASYNC_NODE_H_
16#define ACTIONENGINE_NODES_ASYNC_NODE_H_
17
18#include <memory>
19#include <optional>
20#include <string>
21#include <string_view>
22#include <utility>
23#include <vector>
24
25#include <absl/base/nullability.h>
26#include <absl/base/optimization.h>
27#include <absl/base/thread_annotations.h>
28#include <absl/container/flat_hash_map.h>
29#include <absl/log/check.h>
30#include <absl/log/log.h>
31#include <absl/status/status.h>
32#include <absl/status/statusor.h>
33#include <absl/strings/str_cat.h>
34#include <absl/time/clock.h>
35#include <absl/time/time.h>
36
38#include "actionengine/data/serialization.h"
40#include "actionengine/net/stream.h"
42#include "actionengine/stores/chunk_store_reader.h"
43#include "actionengine/stores/chunk_store_writer.h"
44#include "actionengine/util/status_macros.h"
45
46namespace act {
47class NodeMap;
48}
49
59namespace act {
60
70class AsyncNode {
71 public:
83 explicit AsyncNode(std::string_view id = "",
84 NodeMap* absl_nullable node_map = nullptr,
85 std::unique_ptr<ChunkStore> chunk_store = nullptr);
86
87 // This class cannot be copied as each AsyncNode contains non-trivial state
88 // that cannot be shared between copies, such as reader and writer fibers.
89 AsyncNode(AsyncNode& other) = delete;
90 AsyncNode& operator=(AsyncNode& other) = delete;
91
92 // AsyncNode objects can be moved by value.
93 AsyncNode(AsyncNode&& other) noexcept;
94 AsyncNode& operator=(AsyncNode&& other) noexcept;
95
96 ~AsyncNode();
97
116 auto Put(Chunk value, int seq = -1, bool final = false) -> absl::Status;
128 auto Put(NodeFragment value) -> absl::Status;
129
148 template <typename T>
149 auto Put(T value, int seq = -1, bool final = false) -> absl::Status;
150
160 [[nodiscard]] ChunkStoreWriter& GetWriter();
172 [[nodiscard]]
173 auto GetWriterStatus() const -> absl::Status;
174
175 [[nodiscard]] auto GetId() const -> std::string;
176
194 absl::StatusOr<std::optional<Chunk>> Next(
195 std::optional<absl::Duration> timeout = std::nullopt);
196
197 template <typename T>
198 auto Next(std::optional<absl::Duration> timeout = std::nullopt)
199 -> absl::StatusOr<std::optional<T>>;
200
201 std::optional<Chunk> NextOrDie(
202 std::optional<absl::Duration> timeout = std::nullopt);
203
204 template <typename T>
205 auto NextOrDie(std::optional<absl::Duration> timeout = std::nullopt)
206 -> std::optional<T>;
207
208 template <typename T>
209 absl::StatusOr<T> ConsumeAs(
210 std::optional<absl::Duration> timeout = std::nullopt);
211
212 ChunkStoreReader& GetReader() ABSL_LOCKS_EXCLUDED(mu_);
213 auto GetReaderStatus() const -> absl::Status;
214 [[nodiscard]] auto MakeReader(ChunkStoreReaderOptions options) const
215 -> std::unique_ptr<ChunkStoreReader>;
216 auto SetReaderOptions(const ChunkStoreReaderOptions& options) -> AsyncNode&;
217 auto ResetReader() -> AsyncNode&;
218
219 template <typename T>
220 friend AsyncNode& operator>>(AsyncNode& node, std::optional<T>& value);
221
222 template <typename T>
223 friend AsyncNode& operator<<(AsyncNode& node, T value);
224
225 private:
226 ChunkStoreReader* absl_nonnull EnsureReader()
227 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
228
229 ChunkStoreWriter* absl_nonnull EnsureWriter(int n_chunks_to_buffer = -1)
230 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
231
232 absl::Status PutInternal(NodeFragment fragment)
233 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
234 absl::Status PutInternal(Chunk chunk, int seq = -1, bool final = false)
235 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
236
237 NodeMap* absl_nullable node_map_ = nullptr;
238 std::unique_ptr<ChunkStore> chunk_store_;
239
240 mutable act::Mutex mu_;
241 mutable act::CondVar cv_ ABSL_GUARDED_BY(mu_);
242 std::unique_ptr<ChunkStoreReader> default_reader_ ABSL_GUARDED_BY(mu_);
243 std::unique_ptr<ChunkStoreWriter> default_writer_ ABSL_GUARDED_BY(mu_);
244};
245
246template <typename T>
247auto AsyncNode::Put(T value, int seq, bool final) -> absl::Status {
248 auto chunk = ToChunk(std::move(value));
249 if (!chunk.ok()) {
250 return chunk.status();
251 }
252 return Put(*std::move(chunk), seq, final);
253}
254
255template <typename T>
256auto AsyncNode::Next(std::optional<absl::Duration> timeout)
257 -> absl::StatusOr<std::optional<T>> {
258 ChunkStoreReader& reader = GetReader();
259 return reader.Next<T>(timeout);
260}
261
262template <>
263inline auto AsyncNode::Next<NodeFragment>(std::optional<absl::Duration> timeout)
264 -> absl::StatusOr<std::optional<NodeFragment>> {
265 ChunkStoreReader& reader = GetReader();
266 return reader.NextFragment(timeout);
267}
268
269template <typename T>
270auto AsyncNode::NextOrDie(std::optional<absl::Duration> timeout)
271 -> std::optional<T> {
272 auto next = Next<T>(timeout);
273 CHECK_OK(next.status());
274 return *std::move(next);
275}
276
277template <typename T>
278absl::StatusOr<T> AsyncNode::ConsumeAs(std::optional<absl::Duration> timeout) {
279 timeout = timeout.value_or(GetReader().GetOptions().timeout);
280 const absl::Time started_at = absl::Now();
281
282 // The node being consumed must contain an element.
283 ASSIGN_OR_RETURN(std::optional<T> item, Next<T>(*timeout));
284 if (!item) {
285 return absl::FailedPreconditionError(
286 "AsyncNode is empty at current offset, "
287 "cannot consume item as type T.");
288 }
289
290 const absl::Duration elapsed = absl::Now() - started_at;
291 if (elapsed > *timeout) {
292 return absl::DeadlineExceededError(
293 absl::StrCat("Timed out after ", absl::FormatDuration(elapsed),
294 " while consuming item as type T."));
295 }
296
297 // The node must be empty after consuming the item.
298 ASSIGN_OR_RETURN(const std::optional<Chunk> must_be_nullopt,
299 Next<Chunk>(*timeout - elapsed));
300 if (must_be_nullopt.has_value()) {
301 return absl::FailedPreconditionError(
302 "AsyncNode must be empty after consuming the item.");
303 }
304
305 return *std::move(item);
306}
307
308// -----------------------------------------------------------------------------
309// IO operators for AsyncNode. These templates have concrete instantiations for
310// Chunk and NodeFragment, and a default overload for all other types, which is
311// implemented in terms of ConstructFrom<Chunk>(T) and MoveAs<T>(Chunk) and
312// therefore specified for types for which these functions are defined.
313// -----------------------------------------------------------------------------
314template <typename T>
315AsyncNode& operator>>(AsyncNode& node, std::optional<T>& value) {
316 std::optional<Chunk> chunk;
317 node >> chunk;
318
319 if (!chunk.has_value()) {
320 value = std::nullopt;
321 return node;
322 }
323
324 auto status_or_value = FromChunkAs<T>(*std::move(chunk));
325 if (!status_or_value.ok()) {
326 LOG(FATAL) << "Failed to convert chunk to value: "
327 << status_or_value.status();
328 ABSL_ASSUME(false);
329 }
330 value = std::move(status_or_value.value());
331 return node;
332}
333
334template <typename T>
335AsyncNode& operator<<(AsyncNode& node, T value) {
336 node.EnsureWriter();
337 return node << *ToChunk(std::move(value));
338}
339
340// -----------------------------------------------------------------------------
341
342// Concrete instantiation for the operator>> for Chunk.
343template <>
344AsyncNode& operator>>(AsyncNode& node, std::optional<Chunk>& value);
345
346// -----------------------------------------------------------------------------
347
348// Helpers for the operator>> on pointers to AsyncNode.
349template <typename T>
350AsyncNode* absl_nullable operator>>(AsyncNode* absl_nullable node, T& value) {
351 *node >> value;
352 return node;
353}
354
355template <typename T>
356std::unique_ptr<AsyncNode>& operator>>(std::unique_ptr<AsyncNode>& node,
357 T& value) {
358 *node >> value;
359 return node;
360}
361
362template <typename T>
363std::shared_ptr<AsyncNode>& operator>>(std::shared_ptr<AsyncNode>& node,
364 T& value) {
365 *node >> value;
366 return node;
367}
368
369// -----------------------------------------------------------------------------
370// "Concrete" instantiations for the operator<< for Chunk and NodeFragment.
371// -----------------------------------------------------------------------------
372template <>
373AsyncNode& operator<<(AsyncNode& node, NodeFragment value);
374
375template <>
376AsyncNode& operator<<(AsyncNode& node, Chunk value);
377
378// -----------------------------------------------------------------------------
379
380template <typename T>
381AsyncNode& operator<<(AsyncNode& node, std::vector<T> value) {
382 for (auto& element : std::move(value)) {
383 auto status = node.Put(std::move(element));
384 if (!status.ok()) {
385 LOG(ERROR) << "Failed to put element: " << status;
386 break;
387 }
388 }
389 return node;
390}
391
392// Convenience operators to write to an AsyncNode pointers (such as in the case
393// of action->GetOutput("text"))
394template <typename T>
395AsyncNode* absl_nonnull operator<<(AsyncNode* absl_nonnull node, T value) {
396 *node << std::move(value);
397 return node;
398}
399
400template <typename T>
401std::unique_ptr<AsyncNode>& operator<<(std::unique_ptr<AsyncNode>& node,
402 T value) {
403 *node << std::move(value);
404 return node;
405}
406
407template <typename T>
408std::shared_ptr<AsyncNode>& operator<<(std::shared_ptr<AsyncNode>& node,
409 T value) {
410 *node << std::move(value);
411 return node;
412}
413
414} // namespace act
415
416#endif // ACTIONENGINE_NODES_ASYNC_NODE_H_
Concurrency utilities for ActionEngine.
Definition async_node.h:70
auto GetWriterStatus() const -> absl::Status
Definition async_node.cc:126
AsyncNode(std::string_view id="", NodeMap *absl_nullable node_map=nullptr, std::unique_ptr< ChunkStore > chunk_store=nullptr)
Definition async_node.cc:37
auto Put(Chunk value, int seq=-1, bool final=false) -> absl::Status
Definition async_node.cc:198
ChunkStoreWriter & GetWriter()
Definition async_node.cc:121
absl::StatusOr< std::optional< Chunk > > Next(std::optional< absl::Duration > timeout=std::nullopt)
Definition async_node.cc:138
A utility for reading chunks from a ChunkStore.
Definition chunk_store_reader.h:77
absl::StatusOr< std::optional< Chunk > > Next(std::optional< absl::Duration > timeout=std::nullopt)
Reads the next chunk from the ChunkStore.
Definition chunk_store_reader.cc:86
A writer for the ChunkStore that allows writing chunks to the store in a buffered manner.
Definition chunk_store_writer.h:48
Definition node_map.h:53
An abstract interface for raw data storage and retrieval for ActionEngine nodes.
Definition types.h:104
Options for the ChunkStoreReader.
Definition chunk_store_reader.h:41
Definition types.h:184
ActionEngine data structures used to implement actions and nodes (data streams).