Action Engine
Loading...
Searching...
No Matches
signalling_client.h
Go to the documentation of this file.
1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#ifndef ACTIONENGINE_NET_WEBRTC_SIGNALLING_CLIENT_H_
16#define ACTIONENGINE_NET_WEBRTC_SIGNALLING_CLIENT_H_
17
18#include <cstdint>
19#include <functional>
20#include <memory>
21#include <string>
22#include <string_view>
23#include <utility>
24
25#include <absl/base/thread_annotations.h>
26#include <absl/log/log.h>
27#include <absl/status/status.h>
28#include <boost/json/value.hpp>
29
31#include "actionengine/net/websockets/fiber_aware_websocket_stream.h"
32
37namespace act::net {
38
39// void(peer_id, message)
40using PeerJsonHandler =
41 std::function<void(std::string_view, boost::json::value)>;
42
53class SignallingClient {
54 public:
55 explicit SignallingClient(std::string_view address = "localhost",
56 uint16_t port = 80);
57
58 // This class is not copyable or movable
59 SignallingClient(const SignallingClient&) = delete;
60 SignallingClient& operator=(const SignallingClient&) = delete;
61
62 ~SignallingClient();
63
64 void ResetCallbacks();
65
66 void OnOffer(PeerJsonHandler on_offer) { on_offer_ = std::move(on_offer); }
67
68 void OnCandidate(PeerJsonHandler on_candidate) {
69 on_candidate_ = std::move(on_candidate);
70 }
71
72 void OnAnswer(PeerJsonHandler on_answer) {
73 on_answer_ = std::move(on_answer);
74 }
75
76 thread::Case OnError() const { return error_event_.OnEvent(); }
77
78 absl::Status GetStatus() const {
79 act::MutexLock lock(&mu_);
80 return loop_status_;
81 }
82
83 absl::Status ConnectWithIdentity(std::string_view identity);
84
85 absl::Status Send(const std::string& message) {
86 return stream_.WriteText(message);
87 }
88
89 void Cancel() {
90 act::MutexLock lock(&mu_);
91 CancelInternal();
92 }
93
94 void Join() {
95 act::MutexLock lock(&mu_);
96 JoinInternal();
97 }
98
99 private:
100 void CancelInternal() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
101 if (const absl::Status status = stream_.Close(); !status.ok()) {
102 LOG(ERROR) << "SignallingClient::Cancel failed: " << status;
103 }
104 if (loop_ != nullptr) {
105 loop_->Cancel();
106 loop_status_ =
107 absl::CancelledError("WebsocketActionEngineServer cancelled");
108 }
109 }
110
111 void JoinInternal() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
112 if (loop_ != nullptr) {
113 mu_.unlock();
114 loop_->Join();
115 mu_.lock();
116
117 loop_ = nullptr;
118
119 thread_pool_->stop();
120 thread_pool_->join();
121 }
122 }
123
124 void RunLoop() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
125
126 void CloseStreamAndJoinLoop() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
127
128 std::string identity_;
129 const std::string address_;
130 const uint16_t port_;
131
132 PeerJsonHandler on_offer_;
133 PeerJsonHandler on_candidate_;
134 PeerJsonHandler on_answer_;
135
136 std::unique_ptr<boost::asio::thread_pool> thread_pool_;
137 FiberAwareWebsocketStream stream_;
138 std::unique_ptr<thread::Fiber> loop_;
139 absl::Status loop_status_ ABSL_GUARDED_BY(mu_);
140 mutable act::Mutex mu_;
141 thread::PermanentEvent error_event_;
142};
143
144} // namespace act::net
145
146#endif // ACTIONENGINE_NET_WEBRTC_SIGNALLING_CLIENT_H_
Concurrency utilities for ActionEngine.
A Case represents a selectable case in a Select statement.
Definition cases.h:80