Action Engine
Loading...
Searching...
No Matches
wire_stream.h
1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#ifndef ACTIONENGINE_NET_WEBSOCKETS_WIRE_STREAM_H_
16#define ACTIONENGINE_NET_WEBSOCKETS_WIRE_STREAM_H_
17
18#include <cstdint>
19#include <memory>
20#include <optional>
21#include <string>
22#include <string_view>
23
24#define BOOST_ASIO_NO_DEPRECATED
25
26#include <absl/base/nullability.h>
27#include <absl/base/thread_annotations.h>
28#include <absl/log/check.h>
29#include <absl/status/status.h>
30#include <absl/status/statusor.h>
31#include <absl/strings/str_format.h>
32#include <boost/asio/ip/tcp.hpp>
33
36#include "actionengine/net/stream.h"
37#include "actionengine/net/websockets/fiber_aware_websocket_stream.h"
38#include "actionengine/service/service.h"
39
40namespace act::net {
41
53class WebsocketWireStream final : public WireStream {
54 public:
55 explicit WebsocketWireStream(std::unique_ptr<BoostWebsocketStream> stream,
56 std::string_view id = "");
57
58 explicit WebsocketWireStream(FiberAwareWebsocketStream stream,
59 std::string_view id = "");
60
61 ~WebsocketWireStream() override;
62
63 absl::Status Send(WireMessage message) override;
64
65 absl::StatusOr<std::optional<WireMessage>> Receive(
66 absl::Duration timeout) override;
67
68 absl::Status Start() override;
69
70 absl::Status Accept() override;
71
72 void HalfClose() override;
73
74 void Abort() override;
75
76 absl::Status GetStatus() const override { return status_; }
77
78 [[nodiscard]] std::string GetId() const override { return id_; }
79
80 [[nodiscard]] const void* absl_nonnull GetImpl() const override {
81 return &stream_;
82 }
83
84 template <typename Sink>
85 friend void AbslStringify(Sink& sink, const WebsocketWireStream& stream) {
86 absl::Format(&sink, "WebsocketWireStream(id: %s, status: %v)", stream.id_,
87 stream.status_);
88 }
89
90 private:
91 absl::Status SendInternal(WireMessage message)
92 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
93
94 absl::Status HalfCloseInternal() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
95
96 mutable act::Mutex mu_;
97
98 FiberAwareWebsocketStream stream_;
99 bool half_closed_ ABSL_GUARDED_BY(mu_) = false;
100 bool closed_ ABSL_GUARDED_BY(mu_) = false;
101 std::string id_;
102
103 absl::Status status_;
104};
105
106absl::StatusOr<std::unique_ptr<WebsocketWireStream>> MakeWebsocketWireStream(
107 std::string_view address = "127.0.0.1", uint16_t port = 20000,
108 std::string_view target = "/", std::string_view id = "",
109 PrepareStreamFn prepare_stream = PrepareClientStream);
110
111} // namespace act::net
112
113#endif // ACTIONENGINE_NET_WEBSOCKETS_WIRE_STREAM_H_
Concurrency utilities for ActionEngine.
Definition stream.h:44
Definition wire_stream.h:53
absl::StatusOr< std::optional< WireMessage > > Receive(absl::Duration timeout) override
Definition wire_stream.cc:73
void HalfClose() override
Definition wire_stream.cc:134
absl::Status Accept() override
Definition wire_stream.cc:129
absl::Status GetStatus() const override
Definition wire_stream.h:76
absl::Status Send(WireMessage message) override
Definition wire_stream.cc:50
std::string GetId() const override
Definition wire_stream.h:78
const void *absl_nonnull GetImpl() const override
Definition wire_stream.h:80
absl::Status Start() override
Definition wire_stream.cc:124
void Abort() override
Definition wire_stream.cc:139
Definition types.h:274
ActionEngine data structures used to implement actions and nodes (data streams).