Action Engine
Loading...
Searching...
No Matches
action.h
Go to the documentation of this file.
1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#ifndef ACTIONENGINE_ACTIONS_ACTION_H_
16#define ACTIONENGINE_ACTIONS_ACTION_H_
17
18#include <memory>
19#include <optional>
20#include <string>
21#include <string_view>
22#include <utility>
23#include <vector>
24
25#include <absl/base/nullability.h>
26#include <absl/base/thread_annotations.h>
27#include <absl/container/flat_hash_map.h>
28#include <absl/container/flat_hash_set.h>
29#include <absl/status/status.h>
30#include <absl/time/time.h>
31
36#include "actionengine/net/stream.h"
39#include "actionengine/stores/chunk_store_reader.h"
40
41namespace act {
42class Session;
43} // namespace act
44
53
54namespace act {
55
63class Action : public std::enable_shared_from_this<Action> {
64 public:
86 explicit Action(ActionSchema schema, std::string_view id = "",
87 std::vector<Port> inputs = {},
88 std::vector<Port> outputs = {});
89
90 ~Action();
91
98 [[nodiscard]] ActionMessage GetActionMessage() const;
99
110 AsyncNode* absl_nullable GetNode(std::string_view id);
111
125 AsyncNode* absl_nullable GetInput(
126 std::string_view name, std::optional<bool> bind_stream = std::nullopt);
127
141 AsyncNode* absl_nullable GetOutput(
142 std::string_view name,
143 const std::optional<bool> bind_stream = std::nullopt) {
144 act::MutexLock lock(&mu_);
145 return GetOutputInternal(name, bind_stream);
146 }
147
155 void BindHandler(ActionHandler handler) { handler_ = std::move(handler); }
156
157 void BindNodeMap(NodeMap* absl_nullable node_map);
158
160 [[nodiscard]] NodeMap* absl_nullable GetNodeMap() const;
161
174 void BindStream(WireStream* absl_nullable stream);
175
177 [[nodiscard]] WireStream* absl_nullable GetStream() const;
178
190 void BindSession(Session* absl_nullable session);
191
193 [[nodiscard]] Session* absl_nullable GetSession() const;
194
209 std::unique_ptr<Action> MakeActionInSameSession(
210 std::string_view name, std::string_view action_id = "") const;
211
213 [[nodiscard]] ActionRegistry* absl_nullable GetRegistry() const;
214
216 [[nodiscard]] std::string GetId() const { return id_; }
217
218 [[nodiscard]] const ActionSchema& GetSchema() const { return schema_; }
219
234 absl::Status Await(absl::Duration timeout = absl::InfiniteDuration());
235
247 absl::Status Call();
248
260 absl::Status Run();
261
266 void ClearInputsAfterRun(bool clear = true);
267
272 void ClearOutputsAfterRun(bool clear = true);
273
291 void Cancel() const {
292 act::MutexLock lock(&mu_);
293 CancelInternal();
294 }
295
304 thread::Case OnCancel() const { return cancelled_->OnEvent(); }
305
312 [[nodiscard]] bool Cancelled() const;
313
314 void BindStreamsOnInputsByDefault(bool bind) {
315 act::MutexLock lock(&mu_);
316 bind_streams_on_inputs_default_ = bind;
317 }
318
319 void BindStreamsOnOutputsByDefault(bool bind) {
320 act::MutexLock lock(&mu_);
321 bind_streams_on_outputs_default_ = bind;
322 }
323
324 void SetUserData(std::shared_ptr<void> data) {
325 act::MutexLock lock(&mu_);
326 user_data_ = std::move(data);
327 }
328
329 [[nodiscard]] void* absl_nullable GetUserData() const {
330 act::MutexLock lock(&mu_);
331 return user_data_.get();
332 }
333
334 private:
335 void CancelInternal() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
336
337 // Implementation detail: gets the input node ID for the given name, unique
338 // to this particular action run/call.
339 std::string GetInputId(std::string_view name) const;
340
341 // Implementation detail: gets the input node ID for the given name, unique
342 // to this particular action run/call.
343 std::string GetOutputId(std::string_view name) const;
344
345 AsyncNode* absl_nonnull GetOutputInternal(
346 std::string_view name, std::optional<bool> bind_stream = std::nullopt)
347 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
348
349 void UnbindStreams() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
350
351 mutable act::Mutex mu_{};
352 act::CondVar cv_ ABSL_GUARDED_BY(mu_);
353
354 ActionSchema schema_;
355 absl::flat_hash_map<std::string, std::string> input_name_to_id_;
356 absl::flat_hash_map<std::string, std::string> output_name_to_id_;
357
358 ActionHandler handler_;
359 bool has_been_called_ ABSL_GUARDED_BY(mu_) = false;
360 bool has_been_run_ ABSL_GUARDED_BY(mu_) = false;
361 std::string id_;
362
363 NodeMap* absl_nullable node_map_ ABSL_GUARDED_BY(mu_) = nullptr;
364 WireStream* absl_nullable stream_ ABSL_GUARDED_BY(mu_) = nullptr;
365 Session* absl_nullable session_ ABSL_GUARDED_BY(mu_) = nullptr;
366
367 absl::flat_hash_set<ChunkStoreReader*> reffed_readers_ ABSL_GUARDED_BY(mu_);
368
369 bool bind_streams_on_inputs_default_ = true;
370 bool bind_streams_on_outputs_default_ = false;
371 absl::flat_hash_set<AsyncNode*> nodes_with_bound_streams_
372 ABSL_GUARDED_BY(mu_);
373
374 std::unique_ptr<thread::PermanentEvent> cancelled_;
375
376 bool clear_inputs_after_run_ ABSL_GUARDED_BY(mu_) = false;
377 bool clear_outputs_after_run_ ABSL_GUARDED_BY(mu_) = false;
378 std::optional<absl::Status> run_status_ ABSL_GUARDED_BY(mu_) = std::nullopt;
379
380 std::shared_ptr<void> user_data_ ABSL_GUARDED_BY(mu_) = nullptr;
381};
382
383} // namespace act
384
385#endif // ACTIONENGINE_ACTIONS_ACTION_H_
Concurrency utilities for ActionEngine.
Provides the AsyncNode class for handling asynchronous data streams.
absl::Status Await(absl::Duration timeout=absl::InfiniteDuration())
Block until the action has been completed, either by being run or called.
Definition action.cc:181
WireStream *absl_nullable GetStream() const
Definition action.cc:166
std::string GetId() const
Definition action.h:216
Action(ActionSchema schema, std::string_view id="", std::vector< Port > inputs={}, std::vector< Port > outputs={})
Constructor. Creates an action in the context given by node_map, stream, and session.
Definition action.cc:34
bool Cancelled() const
Returns whether the action has been cancelled.
Definition action.cc:347
ActionRegistry *absl_nullable GetRegistry() const
Definition action.cc:393
absl::Status Call()
Calls the action by sending an ActionEngine action message to associated stream.
Definition action.cc:215
AsyncNode *absl_nullable GetInput(std::string_view name, std::optional< bool > bind_stream=std::nullopt)
Gets an AsyncNode input with the given name from the node map. If no input with the given name is fou...
Definition action.cc:115
NodeMap *absl_nullable GetNodeMap() const
Definition action.cc:156
void Cancel() const
Cancels the action and all its inputs.
Definition action.h:291
Session *absl_nullable GetSession() const
Definition action.cc:176
AsyncNode *absl_nullable GetNode(std::string_view id)
Gets an AsyncNode with the given id from the node map.
Definition action.cc:104
void BindSession(Session *absl_nullable session)
Binds a session to the action.
Definition action.cc:171
std::unique_ptr< Action > MakeActionInSameSession(std::string_view name, std::string_view action_id="") const
Makes a different action in the same session. Should be used to create nested actions.
Definition action.cc:401
absl::Status Run()
Run the action handler. Clients usually do not call this method directly.
Definition action.cc:230
ActionMessage GetActionMessage() const
Makes an action message to be sent on a WireStream.
Definition action.cc:77
void ClearInputsAfterRun(bool clear=true)
Specifies whether the action should delete its input nodes from its bound node map after it's run.
Definition action.cc:322
void ClearOutputsAfterRun(bool clear=true)
Specifies whether the action should delete its output nodes from its bound node map after it's run.
Definition action.cc:327
AsyncNode *absl_nullable GetOutput(std::string_view name, const std::optional< bool > bind_stream=std::nullopt)
Gets an AsyncNode output with the given name from the node map. If no output with the given name is f...
Definition action.h:141
thread::Case OnCancel() const
Returns a thread::Case that handlers can use to synchronise with cancellation.
Definition action.h:304
void BindStream(WireStream *absl_nullable stream)
Binds a given stream to the action.
Definition action.cc:161
void BindHandler(ActionHandler handler)
Sets the action handler.
Definition action.h:155
Definition registry.h:47
Definition async_node.h:70
Definition node_map.h:53
Definition session.h:87
Definition stream.h:44
Provides the NodeMap class for managing ActionEngine nodes.
A registry for ActionEngine actions.
A type that represents schemas for ActionEngine actions.
Definition types.h:232
Definition schema.h:68
A Case represents a selectable case in a Select statement.
Definition cases.h:80
ActionEngine data structures used to implement actions and nodes (data streams).