15#ifndef ACTIONENGINE_NET_WEBSOCKETS_FIBER_AWARE_WEBSOCKET_STREAM_H_ 
   16#define ACTIONENGINE_NET_WEBSOCKETS_FIBER_AWARE_WEBSOCKET_STREAM_H_ 
   18#define BOOST_ASIO_NO_DEPRECATED 
   20#include <absl/status/status.h> 
   21#include <absl/status/statusor.h> 
   22#include <boost/asio/cancellation_signal.hpp> 
   23#include <boost/asio/ip/tcp.hpp> 
   24#include <boost/asio/strand.hpp> 
   25#include <boost/asio/thread_pool.hpp> 
   26#include <boost/beast/core.hpp> 
   27#include <boost/beast/websocket.hpp> 
   33using BoostWebsocketStream =
 
   34    boost::beast::websocket::stream<boost::asio::ip::tcp::socket>;
 
   36using PrepareStreamFn =
 
   37    std::function<absl::Status(BoostWebsocketStream* absl_nonnull)>;
 
   39absl::Status PrepareClientStream(BoostWebsocketStream* absl_nonnull stream);
 
   40absl::Status PrepareServerStream(BoostWebsocketStream* absl_nonnull stream);
 
   42using PerformHandshakeFn =
 
   43    std::function<absl::Status(BoostWebsocketStream* absl_nonnull)>;
 
   45class FiberAwareWebsocketStream {
 
   47  explicit FiberAwareWebsocketStream(
 
   48      std::unique_ptr<BoostWebsocketStream> stream = 
nullptr,
 
   49      PerformHandshakeFn handshake_fn = {});
 
   51  FiberAwareWebsocketStream(
const FiberAwareWebsocketStream&) = 
delete;
 
   52  FiberAwareWebsocketStream& operator=(
const FiberAwareWebsocketStream&) =
 
   55  FiberAwareWebsocketStream(FiberAwareWebsocketStream&&) noexcept;
 
   56  FiberAwareWebsocketStream& operator=(FiberAwareWebsocketStream&&) noexcept;
 
   58  ~FiberAwareWebsocketStream();
 
   60  template <typename ExecutionContext>
 
   61  static absl::StatusOr<FiberAwareWebsocketStream> Connect(
 
   62      ExecutionContext& context, std::string_view address, uint16_t port,
 
   63      std::string_view target = 
"/",
 
   64      PrepareStreamFn prepare_stream_fn = PrepareClientStream);
 
   66  static absl::StatusOr<FiberAwareWebsocketStream> Connect(
 
   67      std::string_view address, uint16_t port, std::string_view target = 
"/",
 
   68      PrepareStreamFn prepare_stream_fn = PrepareClientStream);
 
   70  BoostWebsocketStream& GetStream() const;
 
   72  absl::Status Accept() const noexcept;
 
   73  absl::Status Close() const noexcept;
 
   74  absl::Status Read(absl::Duration timeout,
 
   75                    std::vector<uint8_t>* absl_nonnull buffer,
 
   76                    bool* absl_nullable got_text = 
nullptr) noexcept;
 
   77  absl::Status ReadText(absl::Duration timeout,
 
   78                        std::
string* absl_nonnull buffer) noexcept;
 
   79  absl::Status Start() const noexcept;
 
   80  absl::Status Write(const std::vector<uint8_t>& message_bytes) noexcept;
 
   81  absl::Status WriteText(const std::
string& message) noexcept;
 
   83  void CancelRead() noexcept {
 
   84    act::MutexLock lock(&mu_);
 
   85    cancel_signal_.emit(boost::asio::cancellation_type::total);
 
   88  template <
typename Sink>
 
   89  friend void AbslStringify(Sink& sink,
 
   90                            const FiberAwareWebsocketStream& stream) {
 
   91    const auto endpoint = stream.stream_->next_layer().remote_endpoint();
 
   92    sink.Append(absl::StrFormat(
"FiberAwareWebsocketStream: %s:%d",
 
   93                                endpoint.address().to_string(),
 
   98  absl::Status CloseInternal() const noexcept
 
   99      ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
 
  101  std::unique_ptr<BoostWebsocketStream> stream_;
 
  102  PerformHandshakeFn handshake_fn_;
 
  104  mutable act::Mutex mu_;
 
  105  mutable act::CondVar cv_ ABSL_GUARDED_BY(mu_);
 
  106  bool write_pending_ ABSL_GUARDED_BY(mu_) = false;
 
  107  bool read_pending_ ABSL_GUARDED_BY(mu_) = false;
 
  108  boost::asio::cancellation_signal cancel_signal_ ABSL_GUARDED_BY(mu_);
 
  111template <typename ExecutionContext>
 
  112absl::Status ResolveAndConnect(ExecutionContext& context,
 
  113                               BoostWebsocketStream* absl_nonnull stream,
 
  114                               std::string_view address, uint16_t port) {
 
  115  boost::system::error_code error;
 
  116  boost::asio::ip::tcp::resolver resolver(context);
 
  118  const auto endpoints =
 
  119      resolver.resolve(address, std::to_string(port),
 
  120                       boost::asio::ip::resolver_query_base::flags(), error);
 
  123    return absl::InternalError(error.message());
 
  126  boost::asio::connect(stream->next_layer(), endpoints, error);
 
  128  if (thread::Cancelled()) {
 
  129    stream->next_layer().cancel();
 
  130    stream->next_layer().shutdown(boost::asio::ip::tcp::socket::shutdown_both,
 
  132    return absl::CancelledError(
"FiberAwareWebsocketStream Connect cancelled");
 
  136    return absl::InternalError(error.message());
 
  139  return absl::OkStatus();
 
  142absl::Status ResolveAndConnect(BoostWebsocketStream* absl_nonnull stream,
 
  143                               std::string_view address, uint16_t port);
 
  145absl::Status DoHandshake(BoostWebsocketStream* absl_nonnull stream,
 
  146                         std::string_view host, std::string_view target = 
"/");
 
  148template <
typename ExecutionContext>
 
  149absl::StatusOr<FiberAwareWebsocketStream> FiberAwareWebsocketStream::Connect(
 
  150    ExecutionContext& context, std::string_view address, uint16_t port,
 
  151    std::string_view target, PrepareStreamFn prepare_stream_fn) {
 
  152  auto ws_stream = std::make_unique<BoostWebsocketStream>(context);
 
  154  if (absl::Status resolve_status =
 
  155          ResolveAndConnect(context, ws_stream.get(), address, port);
 
  156      !resolve_status.ok()) {
 
  157    return resolve_status;
 
  160  if (prepare_stream_fn) {
 
  161    if (absl::Status prepare_status =
 
  162            std::move(prepare_stream_fn)(ws_stream.get());
 
  163        !prepare_status.ok()) {
 
  164      return prepare_status;
 
  168  auto do_handshake = [host = absl::StrFormat(
"%s:%d", address, port),
 
  169                       target = std::string(target)](
 
  170                          BoostWebsocketStream* absl_nonnull stream) {
 
  171    return DoHandshake(stream, host, target);
 
  174  return FiberAwareWebsocketStream(std::move(ws_stream),
 
  175                                   std::move(do_handshake));
 
Concurrency utilities for ActionEngine.