Action Engine
Loading...
Searching...
No Matches
service_pybind11.h
1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#ifndef ACTIONENGINE_PYBIND11_ACTIONENGINE_SERVICE_H_
16#define ACTIONENGINE_PYBIND11_ACTIONENGINE_SERVICE_H_
17
18#include <optional>
19#include <string>
20#include <string_view>
21#include <utility>
22
23#include <absl/base/optimization.h>
24#include <absl/log/log.h>
25#include <absl/status/status.h>
26#include <absl/status/statusor.h>
27#include <absl/time/time.h>
28#include <pybind11/cast.h>
29#include <pybind11/gil.h>
30#include <pybind11/pybind11.h>
31#include <pybind11/pytypes.h>
32#include <pybind11_abseil/absl_casters.h>
33#include <pybind11_abseil/status_caster.h>
34#include <pybind11_abseil/statusor_caster.h>
35
37#include "actionengine/net/stream.h"
38#include "actionengine/stores/chunk_store.h" // IWYU pragma: keep
39#include "actionengine/stores/chunk_store_pybind11.h" // IWYU pragma: keep
40#include "actionengine/util/utils_pybind11.h"
41
42namespace act::pybindings {
43
44namespace py = ::pybind11;
45
46void BindStream(py::handle scope, std::string_view name = "WireStream");
47void BindSession(py::handle scope, std::string_view name = "Session");
48void BindService(py::handle scope, std::string_view name = "Service");
49void BindStreamToSessionConnection(
50 py::handle scope, std::string_view name = "StreamToSessionConnection");
51
69class PyWireStream final : public WireStream {
70 public:
71 using WireStream::WireStream;
72
73 PyWireStream() : WireStream() {}
74
75 absl::Status Send(WireMessage message) override {
76 py::gil_scoped_acquire gil;
77 const py::function function = py::get_override(this, "send");
78
79 if (!function) {
80 return absl::UnimplementedError(
81 "send is not implemented in the Python subclass of "
82 "WireStream.");
83 }
84 const py::object py_result = function(message);
85
86 const absl::StatusOr<py::object> result =
87 pybindings::RunThreadsafeIfCoroutine(py_result);
88
89 if (!result.ok()) {
90 return result.status();
91 }
92 return absl::OkStatus();
93 }
94
95 absl::StatusOr<std::optional<WireMessage>> Receive(
96 absl::Duration timeout) override {
97 py::gil_scoped_acquire gil;
98 const py::function function = py::get_override(this, "receive");
99
100 if (!function) {
101 LOG(FATAL) << "receive is not implemented in the Python subclass of "
102 "WireStream.";
103 ABSL_ASSUME(false);
104 }
105 const py::object py_result = function(absl::ToDoubleSeconds(timeout));
106
107 absl::StatusOr<py::object> result =
108 pybindings::RunThreadsafeIfCoroutine(py_result);
109
110 if (!result.ok()) {
111 return result.status();
112 }
113
114 if (result->is_none()) {
115
116 return std::nullopt;
117 }
118 return std::move(result)->cast<WireMessage>();
119 }
120
121 absl::Status Accept() override {
122 py::gil_scoped_acquire gil;
123 const py::function function = py::get_override(this, "accept");
124
125 if (!function) {
126 return absl::UnimplementedError(
127 "accept is not implemented in the Python subclass of "
128 "WireStream.");
129 }
130 try {
131 const py::object py_result = function();
132 const absl::StatusOr<py::object> result =
133 pybindings::RunThreadsafeIfCoroutine(py_result);
134
135 if (!result.ok()) {
136 return result.status();
137 }
138 } catch (const py::error_already_set& e) {
139 return absl::InternalError(e.what());
140 }
141
142 return absl::OkStatus();
143 }
144
145 absl::Status Start() override {
146 py::gil_scoped_acquire gil;
147 const py::function function = py::get_override(this, "start");
148
149 if (!function) {
150 return absl::UnimplementedError(
151 "start is not implemented in the Python subclass of "
152 "WireStream.");
153 }
154 try {
155 const py::object py_result = function();
156 const absl::StatusOr<py::object> result =
157 pybindings::RunThreadsafeIfCoroutine(py_result);
158
159 if (!result.ok()) {
160 return result.status();
161 }
162 } catch (const py::error_already_set& e) {
163 return absl::InternalError(e.what());
164 }
165
166 return absl::OkStatus();
167 }
168
169 void HalfClose() override {
170 py::gil_scoped_acquire gil;
171 const py::function function = py::get_override(this, "half_close");
172
173 if (!function) {
174 LOG(FATAL) << "half_close is not implemented in the Python subclass of "
175 "WireStream.";
176 ABSL_ASSUME(false);
177 }
178 try {
179 const py::object py_result = function();
180 const absl::StatusOr<py::object> result =
181 pybindings::RunThreadsafeIfCoroutine(py_result);
182
183 if (!result.ok()) {
184 LOG(ERROR) << "Error in half_close: " << result.status();
185 }
186 } catch (const py::error_already_set& e) {
187 LOG(ERROR) << "Error in half_close: " << e.what();
188 }
189 }
190
191 void Abort() override {
192 py::gil_scoped_acquire gil;
193 const py::function function = py::get_override(this, "abort");
194
195 if (!function) {
196 LOG(FATAL) << "abort is not implemented in the Python subclass of "
197 "WireStream.";
198 ABSL_ASSUME(false);
199 }
200 try {
201 const py::object py_result = function();
202 const absl::StatusOr<py::object> result =
203 pybindings::RunThreadsafeIfCoroutine(py_result);
204
205 if (!result.ok()) {
206 LOG(ERROR) << "Error in abort: " << result.status();
207 }
208 } catch (const py::error_already_set& e) {
209 LOG(ERROR) << "Error in abort: " << e.what();
210 }
211 }
212
213 absl::Status GetStatus() const override {
214 PYBIND11_OVERRIDE_PURE_NAME(absl::Status, PyWireStream, "get_status",
215 GetStatus, );
216 }
217
218 [[nodiscard]] py::object GetLoop() const {
219 PYBIND11_OVERRIDE_PURE_NAME(py::object, PyWireStream, "get_loop",
220 GetLoop, );
221 }
222
223 [[nodiscard]] std::string GetId() const override {
224 PYBIND11_OVERRIDE_PURE_NAME(std::string, PyWireStream, "get_id", GetId, );
225 }
226};
227
228py::module_ MakeServiceModule(py::module_ scope,
229 std::string_view module_name = "service");
230} // namespace act::pybindings
231
232#endif // ACTIONENGINE_PYBIND11_ACTIONENGINE_SERVICE_H_
Definition stream.h:44
Definition service_pybind11.h:69
void HalfClose() override
Definition service_pybind11.h:169
std::string GetId() const override
Definition service_pybind11.h:223
void Abort() override
Definition service_pybind11.h:191
absl::Status Start() override
Definition service_pybind11.h:145
absl::Status Send(WireMessage message) override
Definition service_pybind11.h:75
absl::Status GetStatus() const override
Definition service_pybind11.h:213
absl::Status Accept() override
Definition service_pybind11.h:121
absl::StatusOr< std::optional< WireMessage > > Receive(absl::Duration timeout) override
Definition service_pybind11.h:95
An abstract interface for raw data storage and retrieval for ActionEngine nodes.
Definition types.h:274
ActionEngine data structures used to implement actions and nodes (data streams).