Action Engine
Loading...
Searching...
No Matches
service.h
1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#ifndef ACTIONENGINE_SERVICE_SERVICE_H_
16#define ACTIONENGINE_SERVICE_SERVICE_H_
17
18#include <functional>
19#include <memory>
20#include <string>
21#include <string_view>
22#include <vector>
23
24#include <absl/base/nullability.h>
25#include <absl/base/thread_annotations.h>
26#include <absl/container/flat_hash_map.h>
27#include <absl/container/flat_hash_set.h>
28#include <absl/status/status.h>
29#include <absl/status/statusor.h>
30
34#include "actionengine/net/stream.h"
36#include "actionengine/service/session.h"
38
39namespace act {
40
53 std::shared_ptr<WireStream> stream = nullptr;
54 Session* absl_nullable session = nullptr;
55
56 std::string session_id; // dead sessions may lose their id.
57 std::string stream_id; // dead streams may lose their id.
58
59 absl::Status status;
60};
61
62std::unique_ptr<Action> MakeActionInConnection(
63 const StreamToSessionConnection& connection, std::string_view action_name,
64 std::string_view action_id = "");
65
82using ConnectionHandler = std::function<absl::Status(
83 const std::shared_ptr<WireStream>&, Session* absl_nonnull)>;
84
103absl::Status RunSimpleSession(std::shared_ptr<WireStream> stream,
104 Session* absl_nonnull session);
105
131class Service : public std::enable_shared_from_this<Service> {
132 public:
133 explicit Service(ActionRegistry* absl_nullable action_registry = nullptr,
134 ConnectionHandler connection_handler = RunSimpleSession,
135 ChunkStoreFactory chunk_store_factory = {});
136
137 ~Service();
138
139 // This class is not copyable or movable.
140 Service(const Service& other) = delete;
141 Service& operator=(const Service& other) = delete;
142
151 auto GetStream(std::string_view stream_id) const -> WireStream* absl_nullable;
160 auto GetSession(std::string_view session_id) const -> Session* absl_nullable;
161 auto GetSessionKeys() const -> std::vector<std::string>;
162
185 auto EstablishConnection(std::shared_ptr<WireStream>&& stream,
186 ConnectionHandler connection_handler = nullptr)
187 -> absl::StatusOr<std::shared_ptr<StreamToSessionConnection>>;
209 auto JoinConnection(StreamToSessionConnection* absl_nonnull connection)
210 -> absl::Status;
211
231 auto SetActionRegistry(const ActionRegistry& action_registry) const -> void;
232
233 void JoinConnectionsAndCleanUp(bool cancel = false) ABSL_LOCKS_EXCLUDED(mu_);
234
235 private:
236 void CleanupConnection(const StreamToSessionConnection& connection)
237 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
238
239 std::unique_ptr<ActionRegistry> action_registry_;
240 ConnectionHandler connection_handler_;
241 ChunkStoreFactory chunk_store_factory_;
242
243 mutable act::Mutex mu_;
244 absl::flat_hash_map<std::string, std::shared_ptr<WireStream>> streams_
245 ABSL_GUARDED_BY(mu_);
246 // for now, we only support one-to-one session-stream mapping, therefore we
247 // use the stream id as the session id.
248 absl::flat_hash_map<std::string, std::unique_ptr<NodeMap>> node_maps_
249 ABSL_GUARDED_BY(mu_);
250 absl::flat_hash_map<std::string, std::unique_ptr<Session>> sessions_
251 ABSL_GUARDED_BY(mu_);
252 absl::flat_hash_map<std::string, std::shared_ptr<StreamToSessionConnection>>
253 connections_ ABSL_GUARDED_BY(mu_);
254 absl::flat_hash_map<std::string, absl::flat_hash_set<std::string>>
255 streams_per_session_ ABSL_GUARDED_BY(mu_);
256 absl::flat_hash_map<std::string, std::unique_ptr<thread::Fiber>>
257 connection_fibers_ ABSL_GUARDED_BY(mu_);
258
259 bool cleanup_started_ ABSL_GUARDED_BY(mu_) = false;
260 thread::PermanentEvent cleanup_done_;
261};
262
263} // namespace act
264
265#endif // ACTIONENGINE_SERVICE_SERVICE_H_
An interface for ActionEngine Action launch helper / handler context.
Concurrency utilities for ActionEngine.
Definition registry.h:47
auto GetStream(std::string_view stream_id) const -> WireStream *absl_nullable
Definition service.cc:103
auto JoinConnection(StreamToSessionConnection *absl_nonnull connection) -> absl::Status
Definition service.cc:211
auto SetActionRegistry(const ActionRegistry &action_registry) const -> void
Definition service.cc:242
auto EstablishConnection(std::shared_ptr< WireStream > &&stream, ConnectionHandler connection_handler=nullptr) -> absl::StatusOr< std::shared_ptr< StreamToSessionConnection > >
Definition service.cc:130
auto GetSession(std::string_view session_id) const -> Session *absl_nullable
Definition service.cc:111
Definition session.h:87
Definition stream.h:44
Provides the NodeMap class for managing ActionEngine nodes.
A registry for ActionEngine actions.
An abstract interface for raw data storage and retrieval for ActionEngine nodes.
Definition service.h:52