Action Engine
Loading...
Searching...
No Matches
fiber_aware_websocket_stream.h
1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#ifndef ACTIONENGINE_NET_WEBSOCKETS_FIBER_AWARE_WEBSOCKET_STREAM_H_
16#define ACTIONENGINE_NET_WEBSOCKETS_FIBER_AWARE_WEBSOCKET_STREAM_H_
17
18#define BOOST_ASIO_NO_DEPRECATED
19
20#include <absl/status/status.h>
21#include <absl/status/statusor.h>
22#include <boost/asio/cancellation_signal.hpp>
23#include <boost/asio/ip/tcp.hpp>
24#include <boost/asio/strand.hpp>
25#include <boost/asio/thread_pool.hpp>
26#include <boost/beast/core.hpp>
27#include <boost/beast/websocket.hpp>
28
30
31namespace act::net {
32
33using BoostWebsocketStream =
34 boost::beast::websocket::stream<boost::asio::ip::tcp::socket>;
35
36using PrepareStreamFn =
37 std::function<absl::Status(BoostWebsocketStream* absl_nonnull)>;
38
39absl::Status PrepareClientStream(BoostWebsocketStream* absl_nonnull stream);
40absl::Status PrepareServerStream(BoostWebsocketStream* absl_nonnull stream);
41
42using PerformHandshakeFn =
43 std::function<absl::Status(BoostWebsocketStream* absl_nonnull)>;
44
45class FiberAwareWebsocketStream {
46 public:
47 explicit FiberAwareWebsocketStream(
48 std::unique_ptr<BoostWebsocketStream> stream = nullptr,
49 PerformHandshakeFn handshake_fn = {});
50
51 FiberAwareWebsocketStream(const FiberAwareWebsocketStream&) = delete;
52 FiberAwareWebsocketStream& operator=(const FiberAwareWebsocketStream&) =
53 delete;
54
55 FiberAwareWebsocketStream(FiberAwareWebsocketStream&&) noexcept;
56 FiberAwareWebsocketStream& operator=(FiberAwareWebsocketStream&&) noexcept;
57
58 ~FiberAwareWebsocketStream();
59
60 template <typename ExecutionContext>
61 static absl::StatusOr<FiberAwareWebsocketStream> Connect(
62 ExecutionContext& context, std::string_view address, uint16_t port,
63 std::string_view target = "/",
64 PrepareStreamFn prepare_stream_fn = PrepareClientStream);
65
66 static absl::StatusOr<FiberAwareWebsocketStream> Connect(
67 std::string_view address, uint16_t port, std::string_view target = "/",
68 PrepareStreamFn prepare_stream_fn = PrepareClientStream);
69
70 BoostWebsocketStream& GetStream() const;
71
72 absl::Status Accept() const noexcept;
73 absl::Status Close() const noexcept;
74 absl::Status Read(absl::Duration timeout,
75 std::vector<uint8_t>* absl_nonnull buffer,
76 bool* absl_nullable got_text = nullptr) noexcept;
77 absl::Status ReadText(absl::Duration timeout,
78 std::string* absl_nonnull buffer) noexcept;
79 absl::Status Start() const noexcept;
80 absl::Status Write(const std::vector<uint8_t>& message_bytes) noexcept;
81 absl::Status WriteText(const std::string& message) noexcept;
82
83 void CancelRead() noexcept {
84 act::MutexLock lock(&mu_);
85 cancel_signal_.emit(boost::asio::cancellation_type::total);
86 }
87
88 template <typename Sink>
89 friend void AbslStringify(Sink& sink,
90 const FiberAwareWebsocketStream& stream) {
91 const auto endpoint = stream.stream_->next_layer().remote_endpoint();
92 sink.Append(absl::StrFormat("FiberAwareWebsocketStream: %s:%d",
93 endpoint.address().to_string(),
94 endpoint.port()));
95 }
96
97 private:
98 absl::Status CloseInternal() const noexcept
99 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
100
101 std::unique_ptr<BoostWebsocketStream> stream_;
102 PerformHandshakeFn handshake_fn_;
103
104 mutable act::Mutex mu_;
105 mutable act::CondVar cv_ ABSL_GUARDED_BY(mu_);
106 bool write_pending_ ABSL_GUARDED_BY(mu_) = false;
107 bool read_pending_ ABSL_GUARDED_BY(mu_) = false;
108 boost::asio::cancellation_signal cancel_signal_ ABSL_GUARDED_BY(mu_);
109};
110
111template <typename ExecutionContext>
112absl::Status ResolveAndConnect(ExecutionContext& context,
113 BoostWebsocketStream* absl_nonnull stream,
114 std::string_view address, uint16_t port) {
115 boost::system::error_code error;
116 boost::asio::ip::tcp::resolver resolver(context);
117
118 const auto endpoints =
119 resolver.resolve(address, std::to_string(port),
120 boost::asio::ip::resolver_query_base::flags(), error);
121
122 if (error) {
123 return absl::InternalError(error.message());
124 }
125
126 boost::asio::connect(stream->next_layer(), endpoints, error);
127
128 if (thread::Cancelled()) {
129 stream->next_layer().cancel();
130 stream->next_layer().shutdown(boost::asio::ip::tcp::socket::shutdown_both,
131 error);
132 return absl::CancelledError("FiberAwareWebsocketStream Connect cancelled");
133 }
134
135 if (error) {
136 return absl::InternalError(error.message());
137 }
138
139 return absl::OkStatus();
140}
141
142absl::Status ResolveAndConnect(BoostWebsocketStream* absl_nonnull stream,
143 std::string_view address, uint16_t port);
144
145absl::Status DoHandshake(BoostWebsocketStream* absl_nonnull stream,
146 std::string_view host, std::string_view target = "/");
147
148template <typename ExecutionContext>
149absl::StatusOr<FiberAwareWebsocketStream> FiberAwareWebsocketStream::Connect(
150 ExecutionContext& context, std::string_view address, uint16_t port,
151 std::string_view target, PrepareStreamFn prepare_stream_fn) {
152 auto ws_stream = std::make_unique<BoostWebsocketStream>(context);
153
154 if (absl::Status resolve_status =
155 ResolveAndConnect(context, ws_stream.get(), address, port);
156 !resolve_status.ok()) {
157 return resolve_status;
158 }
159
160 if (prepare_stream_fn) {
161 if (absl::Status prepare_status =
162 std::move(prepare_stream_fn)(ws_stream.get());
163 !prepare_status.ok()) {
164 return prepare_status;
165 }
166 }
167
168 auto do_handshake = [host = absl::StrFormat("%s:%d", address, port),
169 target = std::string(target)](
170 BoostWebsocketStream* absl_nonnull stream) {
171 return DoHandshake(stream, host, target);
172 };
173
174 return FiberAwareWebsocketStream(std::move(ws_stream),
175 std::move(do_handshake));
176}
177
178} // namespace act::net
179
180#endif // ACTIONENGINE_NET_WEBSOCKETS_FIBER_AWARE_WEBSOCKET_STREAM_H_
Concurrency utilities for ActionEngine.