Action Engine
Loading...
Searching...
No Matches
session.h
1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#ifndef ACTIONENGINE_SERVICE_SESSION_H_
16#define ACTIONENGINE_SERVICE_SESSION_H_
17
18#include <memory>
19#include <string_view>
20
21#include <absl/base/nullability.h>
22#include <absl/base/thread_annotations.h>
23#include <absl/container/flat_hash_map.h>
24#include <absl/status/status.h>
25#include <absl/time/time.h>
26
31#include "actionengine/net/stream.h"
35
36namespace act {
37
38class ActionContext {
39 public:
40 static constexpr absl::Duration kFiberCancellationTimeout = absl::Seconds(3);
41 static constexpr absl::Duration kActionDetachTimeout = absl::Seconds(10);
42
43 ActionContext() = default;
44 ~ActionContext();
45
46 absl::Status Dispatch(std::shared_ptr<Action> action);
47
48 void CancelContext();
49
50 void WaitForActionsToDetach(
51 absl::Duration cancel_timeout = kFiberCancellationTimeout,
52 absl::Duration detach_timeout = kActionDetachTimeout);
53
54 private:
55 void CancelContextInternal() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
56
57 std::unique_ptr<thread::Fiber> ExtractActionFiber(Action* absl_nonnull action)
58 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
59
60 void WaitForActionsToDetachInternal(
61 absl::Duration cancel_timeout = kFiberCancellationTimeout,
62 absl::Duration detach_timeout = kActionDetachTimeout)
63 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
64
65 act::Mutex mu_;
66 absl::flat_hash_map<Action*, std::unique_ptr<thread::Fiber>> running_actions_
67 ABSL_GUARDED_BY(mu_);
68 bool cancelled_ ABSL_GUARDED_BY(mu_) = false;
69 act::CondVar cv_ ABSL_GUARDED_BY(mu_);
70};
71
72struct StreamDispatchTask {
73 std::shared_ptr<WireStream> stream;
74 std::unique_ptr<thread::Fiber> fiber;
75 absl::Status status;
76 thread::PermanentEvent done;
77};
78
87class Session {
88 public:
101 explicit Session(NodeMap* absl_nonnull node_map,
102 ActionRegistry* absl_nullable action_registry = nullptr,
103 ChunkStoreFactory chunk_store_factory = {});
104 ~Session();
105
106 // This class is not copyable or movable.
107 Session(const Session& other) = delete;
108 Session& operator=(const Session& other) = delete;
109
110 [[nodiscard]] AsyncNode* absl_nonnull GetNode(
111 std::string_view id,
112 const ChunkStoreFactory& chunk_store_factory = {}) const;
113
114 void DispatchFrom(const std::shared_ptr<WireStream>& stream,
115 absl::AnyInvocable<void()> on_done = {});
116 absl::Status DispatchMessage(WireMessage message,
117 WireStream* absl_nullable stream = nullptr);
118
119 void StopDispatchingFrom(WireStream* absl_nonnull stream);
120 void StopDispatchingFromAll();
121
122 [[nodiscard]] absl::Duration GetRecvTimeout() const { return recv_timeout_; }
123
124 [[nodiscard]] NodeMap* absl_nullable GetNodeMap() const { return node_map_; }
125
126 [[nodiscard]] ActionRegistry* absl_nullable GetActionRegistry() const {
127 return action_registry_;
128 }
129
130 void SetActionRegistry(ActionRegistry* absl_nullable action_registry) {
131 action_registry_ = action_registry;
132 }
133
134 private:
135 void JoinDispatchers(bool cancel = false) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
136
137 act::Mutex mu_;
138 bool joined_ ABSL_GUARDED_BY(mu_) = false;
139 absl::flat_hash_map<WireStream*, std::unique_ptr<thread::Fiber>>
140 dispatch_tasks_ ABSL_GUARDED_BY(mu_){};
141 const absl::Duration recv_timeout_ = absl::Seconds(3600000);
142
143 NodeMap* absl_nonnull const node_map_;
144 ActionRegistry* absl_nullable action_registry_ = nullptr;
145 ChunkStoreFactory chunk_store_factory_;
146
147 std::unique_ptr<ActionContext> action_context_ = nullptr;
148};
149
150} // namespace act
151
152#endif // ACTIONENGINE_SERVICE_SESSION_H_
An interface for ActionEngine Action launch helper / handler context.
Concurrency utilities for ActionEngine.
Provides the AsyncNode class for handling asynchronous data streams.
Definition registry.h:47
Definition async_node.h:70
Definition node_map.h:53
Session(NodeMap *absl_nonnull node_map, ActionRegistry *absl_nullable action_registry=nullptr, ChunkStoreFactory chunk_store_factory={})
Definition session.cc:143
Definition stream.h:44
Provides the NodeMap class for managing ActionEngine nodes.
A registry for ActionEngine actions.
An abstract interface for raw data storage and retrieval for ActionEngine nodes.
Definition types.h:274
ActionEngine data structures used to implement actions and nodes (data streams).