15#ifndef ACTIONENGINE_NET_WEBRTC_WIRE_STREAM_H_ 
   16#define ACTIONENGINE_NET_WEBRTC_WIRE_STREAM_H_ 
   26#include <absl/base/nullability.h> 
   27#include <absl/base/thread_annotations.h> 
   28#include <absl/container/flat_hash_map.h> 
   29#include <absl/hash/hash.h> 
   30#include <absl/status/status.h> 
   31#include <absl/status/statusor.h> 
   32#include <absl/time/time.h> 
   33#include <rtc/configuration.hpp> 
   34#include <rtc/datachannel.hpp> 
   35#include <rtc/peerconnection.hpp> 
   39#include "actionengine/net/stream.h" 
   40#include "actionengine/stores/byte_chunking.h" 
   45  static absl::StatusOr<TurnServer> FromString(std::string_view url);
 
   47  bool operator==(
const TurnServer& other) 
const;
 
   55bool AbslParseFlag(std::string_view text, TurnServer* absl_nonnull server,
 
   56                   std::string* absl_nonnull error);
 
   57std::string AbslUnparseFlag(
const TurnServer& server);
 
   59bool AbslParseFlag(std::string_view text,
 
   60                   std::vector<act::net::TurnServer>* absl_nonnull servers,
 
   61                   std::string* absl_nonnull error);
 
   62std::string AbslUnparseFlag(
const std::vector<act::net::TurnServer>& servers);
 
   65  static constexpr int kDefaultMaxMessageSize =
 
   68  [[nodiscard]] rtc::Configuration BuildLibdatachannelConfig() 
const;
 
   70  std::optional<size_t> max_message_size = kDefaultMaxMessageSize;
 
   72  uint16_t port_range_begin = 19002;
 
   73  uint16_t port_range_end = 19002;
 
   74  bool enable_ice_udp_mux = 
true;
 
   76  std::vector<std::string> stun_servers = {
 
   77      "stun.l.google.com:19302",  
 
   79  std::vector<TurnServer> turn_servers;
 
   82struct WebRtcDataChannelConnection {
 
   83  std::shared_ptr<rtc::PeerConnection> connection;
 
   84  std::shared_ptr<rtc::DataChannel> data_channel;
 
   87absl::StatusOr<WebRtcDataChannelConnection> StartWebRtcDataChannel(
 
   88    std::string_view identity, std::string_view peer_identity = 
"server",
 
   89    std::string_view signalling_address = 
"localhost",
 
   90    uint16_t signalling_port = 80,
 
   91    std::optional<RtcConfig> rtc_config = std::nullopt);
 
  105  static constexpr int kBufferSize = 256;
 
  106  static constexpr absl::Duration kHalfCloseTimeout = absl::Seconds(5);
 
  108  explicit WebRtcWireStream(
 
  109      std::shared_ptr<rtc::DataChannel> data_channel,
 
  110      std::shared_ptr<rtc::PeerConnection> connection = 
nullptr);
 
  112  ~WebRtcWireStream() 
override;
 
  116  absl::StatusOr<std::optional<WireMessage>> 
Receive(
 
  117      absl::Duration timeout) 
override;
 
  119  absl::Status 
Start()
 override { 
return absl::OkStatus(); }
 
  121  absl::Status 
Accept()
 override { 
return absl::OkStatus(); }
 
  124    act::MutexLock lock(&mu_);
 
  125    HalfCloseInternal().IgnoreError();
 
 
  128  void Abort() 
override;
 
  132  [[nodiscard]] std::string 
GetId()
 const override { 
return id_; }
 
  134  [[nodiscard]] 
const void* absl_nullable 
GetImpl()
 const override {
 
  135    return data_channel_.get();
 
 
  140      ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
 
  142  absl::Status HalfCloseInternal() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
 
  144  void CloseOnError(absl::Status status) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
 
  146  mutable act::Mutex mu_;
 
  147  mutable act::CondVar cv_ ABSL_GUARDED_BY(mu_);
 
  149  absl::Status status_ ABSL_GUARDED_BY(mu_);
 
  151  const std::
string id_;
 
  152  std::shared_ptr<rtc::PeerConnection> connection_;
 
  153  std::shared_ptr<rtc::DataChannel> data_channel_;
 
  154  thread::Channel<
WireMessage> recv_channel_{kBufferSize};
 
  156  absl::flat_hash_map<uint64_t, std::unique_ptr<data::ChunkedBytes>>
 
  157      chunked_messages_ ABSL_GUARDED_BY(mu_) = {};
 
  158  uint64_t next_transient_id_ ABSL_GUARDED_BY(mu_) = 0;
 
  160  bool opened_ ABSL_GUARDED_BY(mu_) = 
false;
 
  161  bool closed_ ABSL_GUARDED_BY(mu_) = 
false;
 
  163  bool half_closed_ ABSL_GUARDED_BY(mu_) = 
false;
 
 
  166absl::StatusOr<std::unique_ptr<WebRtcWireStream>> StartStreamWithSignalling(
 
  167    std::string_view identity = 
"client",
 
  168    std::string_view peer_identity = 
"server",
 
  169    std::string_view address = 
"localhost", uint16_t port = 80);
 
Concurrency utilities for ActionEngine.
absl::StatusOr< std::optional< WireMessage > > Receive(absl::Duration timeout) override
Definition wire_stream.cc:341
const void *absl_nullable GetImpl() const override
Definition wire_stream.h:134
void Abort() override
Definition wire_stream.cc:397
absl::Status Send(WireMessage message) override
Definition wire_stream.cc:292
absl::Status Start() override
Definition wire_stream.h:119
void HalfClose() override
Definition wire_stream.h:123
absl::Status GetStatus() const override
Definition wire_stream.cc:407
absl::Status Accept() override
Definition wire_stream.h:121
std::string GetId() const override
Definition wire_stream.h:132
ActionEngine data structures used to implement actions and nodes (data streams).