Action Engine
Loading...
Searching...
No Matches
wire_stream.h
1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#ifndef ACTIONENGINE_NET_WEBRTC_WIRE_STREAM_H_
16#define ACTIONENGINE_NET_WEBRTC_WIRE_STREAM_H_
17
18#include <cstddef>
19#include <cstdint>
20#include <memory>
21#include <optional>
22#include <string>
23#include <string_view>
24#include <vector>
25
26#include <absl/base/nullability.h>
27#include <absl/base/thread_annotations.h>
28#include <absl/container/flat_hash_map.h>
29#include <absl/hash/hash.h>
30#include <absl/status/status.h>
31#include <absl/status/statusor.h>
32#include <absl/time/time.h>
33#include <rtc/configuration.hpp>
34#include <rtc/datachannel.hpp>
35#include <rtc/peerconnection.hpp>
36
39#include "actionengine/net/stream.h"
40#include "actionengine/stores/byte_chunking.h"
41
42namespace act::net {
43
44struct TurnServer {
45 static absl::StatusOr<TurnServer> FromString(std::string_view url);
46
47 bool operator==(const TurnServer& other) const;
48
49 std::string hostname;
50 uint16_t port = 3478;
51 std::string username;
52 std::string password;
53};
54
55bool AbslParseFlag(std::string_view text, TurnServer* absl_nonnull server,
56 std::string* absl_nonnull error);
57std::string AbslUnparseFlag(const TurnServer& server);
58
59bool AbslParseFlag(std::string_view text,
60 std::vector<act::net::TurnServer>* absl_nonnull servers,
61 std::string* absl_nonnull error);
62std::string AbslUnparseFlag(const std::vector<act::net::TurnServer>& servers);
63
64struct RtcConfig {
65 static constexpr int kDefaultMaxMessageSize =
66 65536; // 64 KiB to match the defaults of several browsers
67
68 [[nodiscard]] rtc::Configuration BuildLibdatachannelConfig() const;
69
70 std::optional<size_t> max_message_size = kDefaultMaxMessageSize;
71
72 uint16_t port_range_begin = 19002;
73 uint16_t port_range_end = 19002;
74 bool enable_ice_udp_mux = true;
75
76 std::vector<std::string> stun_servers = {
77 "stun.l.google.com:19302", // Google's public STUN server
78 };
79 std::vector<TurnServer> turn_servers;
80};
81
82struct WebRtcDataChannelConnection {
83 std::shared_ptr<rtc::PeerConnection> connection;
84 std::shared_ptr<rtc::DataChannel> data_channel;
85};
86
87absl::StatusOr<WebRtcDataChannelConnection> StartWebRtcDataChannel(
88 std::string_view identity, std::string_view peer_identity = "server",
89 std::string_view signalling_address = "localhost",
90 uint16_t signalling_port = 80,
91 std::optional<RtcConfig> rtc_config = std::nullopt);
92
103class WebRtcWireStream final : public WireStream {
104 public:
105 static constexpr int kBufferSize = 256;
106 static constexpr absl::Duration kHalfCloseTimeout = absl::Seconds(5);
107
108 explicit WebRtcWireStream(
109 std::shared_ptr<rtc::DataChannel> data_channel,
110 std::shared_ptr<rtc::PeerConnection> connection = nullptr);
111
112 ~WebRtcWireStream() override;
113
114 absl::Status Send(WireMessage message) override;
115
116 absl::StatusOr<std::optional<WireMessage>> Receive(
117 absl::Duration timeout) override;
118
119 absl::Status Start() override { return absl::OkStatus(); }
120
121 absl::Status Accept() override { return absl::OkStatus(); }
122
123 void HalfClose() override {
124 act::MutexLock lock(&mu_);
125 HalfCloseInternal().IgnoreError();
126 }
127
128 void Abort() override;
129
130 absl::Status GetStatus() const override;
131
132 [[nodiscard]] std::string GetId() const override { return id_; }
133
134 [[nodiscard]] const void* absl_nullable GetImpl() const override {
135 return data_channel_.get();
136 }
137
138 private:
139 absl::Status SendInternal(WireMessage message)
140 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
141
142 absl::Status HalfCloseInternal() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
143
144 void CloseOnError(absl::Status status) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
145
146 mutable act::Mutex mu_;
147 mutable act::CondVar cv_ ABSL_GUARDED_BY(mu_);
148
149 absl::Status status_ ABSL_GUARDED_BY(mu_);
150
151 const std::string id_;
152 std::shared_ptr<rtc::PeerConnection> connection_;
153 std::shared_ptr<rtc::DataChannel> data_channel_;
154 thread::Channel<WireMessage> recv_channel_{kBufferSize};
155
156 absl::flat_hash_map<uint64_t, std::unique_ptr<data::ChunkedBytes>>
157 chunked_messages_ ABSL_GUARDED_BY(mu_) = {};
158 uint64_t next_transient_id_ ABSL_GUARDED_BY(mu_) = 0;
159
160 bool opened_ ABSL_GUARDED_BY(mu_) = false;
161 bool closed_ ABSL_GUARDED_BY(mu_) = false;
162
163 bool half_closed_ ABSL_GUARDED_BY(mu_) = false;
164};
165
166absl::StatusOr<std::unique_ptr<WebRtcWireStream>> StartStreamWithSignalling(
167 std::string_view identity = "client",
168 std::string_view peer_identity = "server",
169 std::string_view address = "localhost", uint16_t port = 80);
170
171} // namespace act::net
172
173#endif // ACTIONENGINE_NET_WEBRTC_WIRE_STREAM_H_
Concurrency utilities for ActionEngine.
Definition stream.h:44
absl::StatusOr< std::optional< WireMessage > > Receive(absl::Duration timeout) override
Definition wire_stream.cc:341
const void *absl_nullable GetImpl() const override
Definition wire_stream.h:134
void Abort() override
Definition wire_stream.cc:397
absl::Status Send(WireMessage message) override
Definition wire_stream.cc:292
absl::Status Start() override
Definition wire_stream.h:119
void HalfClose() override
Definition wire_stream.h:123
absl::Status GetStatus() const override
Definition wire_stream.cc:407
absl::Status Accept() override
Definition wire_stream.h:121
std::string GetId() const override
Definition wire_stream.h:132
Definition types.h:274
ActionEngine data structures used to implement actions and nodes (data streams).