15#ifndef ACTIONENGINE_NET_WEBSOCKETS_WIRE_STREAM_H_ 
   16#define ACTIONENGINE_NET_WEBSOCKETS_WIRE_STREAM_H_ 
   24#define BOOST_ASIO_NO_DEPRECATED 
   26#include <absl/base/nullability.h> 
   27#include <absl/base/thread_annotations.h> 
   28#include <absl/log/check.h> 
   29#include <absl/status/status.h> 
   30#include <absl/status/statusor.h> 
   31#include <absl/strings/str_format.h> 
   32#include <boost/asio/ip/tcp.hpp> 
   36#include "actionengine/net/stream.h" 
   37#include "actionengine/net/websockets/fiber_aware_websocket_stream.h" 
   38#include "actionengine/service/service.h" 
   55  explicit WebsocketWireStream(std::unique_ptr<BoostWebsocketStream> stream,
 
   56                               std::string_view 
id = 
"");
 
   58  explicit WebsocketWireStream(FiberAwareWebsocketStream stream,
 
   59                               std::string_view 
id = 
"");
 
   61  ~WebsocketWireStream() 
override;
 
   65  absl::StatusOr<std::optional<WireMessage>> 
Receive(
 
   66      absl::Duration timeout) 
override;
 
   68  absl::Status 
Start() 
override;
 
   70  absl::Status 
Accept() 
override;
 
   74  void Abort() 
override;
 
   76  absl::Status 
GetStatus()
 const override { 
return status_; }
 
   78  [[nodiscard]] std::string 
GetId()
 const override { 
return id_; }
 
   80  [[nodiscard]] 
const void* absl_nonnull 
GetImpl()
 const override {
 
 
   84  template <
typename Sink>
 
   86    absl::Format(&sink, 
"WebsocketWireStream(id: %s, status: %v)", stream.id_,
 
   92      ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
 
   94  absl::Status HalfCloseInternal() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
 
   96  mutable act::Mutex mu_;
 
   98  FiberAwareWebsocketStream stream_;
 
   99  bool half_closed_ ABSL_GUARDED_BY(mu_) = false;
 
  100  bool closed_ ABSL_GUARDED_BY(mu_) = false;
 
  103  absl::Status status_;
 
 
  106absl::StatusOr<std::unique_ptr<WebsocketWireStream>> MakeWebsocketWireStream(
 
  107    std::string_view address = 
"127.0.0.1", uint16_t port = 20000,
 
  108    std::string_view target = 
"/", std::string_view 
id = 
"",
 
  109    PrepareStreamFn prepare_stream = PrepareClientStream);
 
Concurrency utilities for ActionEngine.
Definition wire_stream.h:53
absl::StatusOr< std::optional< WireMessage > > Receive(absl::Duration timeout) override
Definition wire_stream.cc:73
void HalfClose() override
Definition wire_stream.cc:134
absl::Status Accept() override
Definition wire_stream.cc:129
absl::Status GetStatus() const override
Definition wire_stream.h:76
absl::Status Send(WireMessage message) override
Definition wire_stream.cc:50
std::string GetId() const override
Definition wire_stream.h:78
const void *absl_nonnull GetImpl() const override
Definition wire_stream.h:80
absl::Status Start() override
Definition wire_stream.cc:124
void Abort() override
Definition wire_stream.cc:139
ActionEngine data structures used to implement actions and nodes (data streams).