Action Engine
Loading...
Searching...
No Matches
redis.h
1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#ifndef ACTIONENGINE_REDIS_REDIS_H_
16#define ACTIONENGINE_REDIS_REDIS_H_
17
18#include <cstddef>
19#include <cstdint>
20#include <memory>
21#include <optional>
22#include <string>
23#include <string_view>
24#include <thread>
25#include <utility>
26#include <vector>
27
28#include <absl/base/nullability.h>
29#include <absl/base/thread_annotations.h>
30#include <absl/container/flat_hash_map.h>
31#include <absl/container/flat_hash_set.h>
32#include <absl/functional/any_invocable.h>
33#include <absl/status/status.h>
34#include <absl/status/statusor.h>
35#include <hiredis/async.h>
36#include <hiredis/hiredis.h>
37#include <uvw/async.h>
38#include <uvw/loop.h>
39
41#include "actionengine/data/conversion.h"
42#include "actionengine/redis/pubsub.h"
43#include "actionengine/redis/reply.h"
44#include "actionengine/util/status_macros.h"
45
46namespace act::redis {
47
48using CommandArgs = std::vector<std::string_view>;
49
50namespace internal {
51// A custom deleter for redisContext to ensure proper cleanup of C structures
52// when the Redis object is destroyed.
53struct RedisContextDeleter {
54 void operator()(redisAsyncContext* absl_nullable context) const {
55 if (context != nullptr) {
56 redisAsyncFree(context);
57 }
58 }
59};
60
61struct PrivateConstructorTag {};
62
63class EventLoop {
64 public:
65 EventLoop();
66
67 ~EventLoop();
68
69 [[nodiscard]] uvw::loop* absl_nonnull Get() const;
70
71 void Wakeup() const;
72
73 private:
74 std::shared_ptr<uvw::async_handle> handle_;
75 std::shared_ptr<uvw::loop> loop_;
76 std::unique_ptr<std::thread> thread_;
77};
78
79} // namespace internal
80
81struct HelloReply {
82 static absl::StatusOr<HelloReply> From(Reply reply);
83
84 std::string server;
85 std::string version;
86 int protocol_version;
87 int id;
88 std::string mode;
89 std::string role;
90 std::vector<absl::flat_hash_map<std::string, std::string>> modules;
91};
92
93struct Script {
94 std::string sha1;
95 std::string code;
96};
97
98namespace internal {
99struct ReplyFuture {
100 Reply reply;
101 absl::Status status;
102 thread::PermanentEvent event;
103};
104} // namespace internal
105
106class Redis {
107 // A Redis client that binds hiredis for asynchronous communication with a Redis
108 // server. It supports pub/sub, command execution, Lua script execution and
109 // a subset of Redis streams. It should NOT be used as a general-purpose
110 // Redis client, but rather as a specialized client for Action Engine's needs
111 // consistent with its threading model and code style, for example, its
112 // use of `act::Mutex` and `act::CondVar` for fiber-aware synchronization,
113 // and exception-free error handling using `absl::Status` and `absl::StatusOr`
114 // for error propagation.
115 public:
116 // Static callbacks for hiredis async context events. They resolve to
117 // instance methods to allow access to the instance state.
118 static void ConnectCallback(const redisAsyncContext* absl_nonnull context,
119 int status);
120
121 static void DisconnectCallback(const redisAsyncContext* absl_nonnull context,
122 int status);
123
124 static void PubsubCallback(redisAsyncContext* absl_nonnull context,
125 void* absl_nonnull hiredis_reply,
126 void* absl_nullable);
127
128 static void ReplyCallback(redisAsyncContext* absl_nonnull context,
129 void* absl_nonnull hiredis_reply,
130 void* absl_nonnull privdata);
131
132 // ReSharper disable once CppParameterMayBeConstPtrOrRef
133 static void PushReplyCallback(redisAsyncContext* absl_nonnull context,
134 void* absl_nonnull hiredis_reply);
135
136 // Deleted default constructor to prevent instantiation without connection.
137 Redis() = delete;
138
139 static absl::StatusOr<std::unique_ptr<Redis>> Connect(std::string_view host,
140 int port = 6379);
141
142 // Non-copyable, non-moveable.
143 Redis(const Redis&) = delete;
144 Redis& operator=(const Redis&) = delete;
145
146 ~Redis();
147
148 void SetKeyPrefix(std::string_view prefix);
149
150 [[nodiscard]] std::string_view GetKeyPrefix() const;
151
152 std::string GetKey(std::string_view key) const;
153
154 absl::StatusOr<Reply> Get(std::string_view key);
155 template <typename T>
156 absl::StatusOr<T> Get(std::string_view key);
157
158 absl::Status Set(std::string_view key, std::string_view value);
159
160 template <typename T>
161 absl::Status Set(std::string_view key, T&& value) {
162 ASSIGN_OR_RETURN(const std::string value_str,
163 ConvertTo<std::string>(std::forward<T>(value)));
164 return Set(key, std::string_view(value_str));
165 }
166
167 absl::StatusOr<std::string> RegisterScript(std::string_view name,
168 std::string_view code,
169 bool overwrite_existing = true);
170
171 absl::StatusOr<Reply> ExecuteScript(std::string_view name,
172 CommandArgs script_keys = {},
173 CommandArgs script_args = {});
174
175 absl::StatusOr<absl::flat_hash_map<std::string, std::optional<int64_t>>>
176 ZRange(std::string_view key, int64_t start, int64_t end,
177 bool withscores = false);
178
179 absl::StatusOr<std::shared_ptr<Subscription>> Subscribe(
180 std::string_view channel,
181 absl::AnyInvocable<void(Reply)> on_message = {});
182
183 absl::Status Unsubscribe(std::string_view channel);
184
185 void RemoveSubscription(std::string_view channel,
186 const std::shared_ptr<Subscription>& subscription);
187
188 absl::StatusOr<HelloReply> Hello(int protocol_version = 3,
189 std::string_view client_name = "",
190 std::string_view username = "",
191 std::string_view password = "");
192
193 absl::StatusOr<Reply> ExecuteCommand(std::string_view command,
194 const CommandArgs& args = {});
195
196 explicit Redis(internal::PrivateConstructorTag _) {}
197
198 private:
199 bool ParseReply(redisReply* absl_nonnull hiredis_reply,
200 Reply* absl_nonnull reply_out)
201 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
202
203 absl::Status CheckConnected() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
204
205 absl::Status UnsubscribeInternal(std::string_view channel);
206
207 absl::StatusOr<Reply> ExecuteCommandWithGuards(std::string_view command,
208 const CommandArgs& args = {})
209 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
210
211 absl::StatusOr<Reply> ExecuteCommandInternal(std::string_view command,
212 const CommandArgs& args = {})
213 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
214
215 // Instance versions of the static callbacks.
216 void OnConnect(int status) ABSL_LOCKS_EXCLUDED(mu_);
217
218 void OnDisconnect(int status) ABSL_LOCKS_EXCLUDED(mu_);
219
220 void OnPubsubReply(void* absl_nonnull hiredis_reply) ABSL_LOCKS_EXCLUDED(mu_);
221
222 void OnPushReply(redisReply* absl_nonnull hiredis_reply)
223 ABSL_LOCKS_EXCLUDED(mu_);
224
225 mutable act::Mutex mu_;
226 act::CondVar cv_ ABSL_GUARDED_BY(mu_);
227
228 internal::EventLoop event_loop_ ABSL_GUARDED_BY(mu_);
229
230 std::string key_prefix_ ABSL_GUARDED_BY(mu_) = "act:";
231 size_t num_pending_commands_ ABSL_GUARDED_BY(mu_) = 0;
232 thread::PermanentEvent disconnect_event_ ABSL_GUARDED_BY(mu_);
233 bool connected_ ABSL_GUARDED_BY(mu_) = false;
234 absl::Status status_ ABSL_GUARDED_BY(mu_) = absl::OkStatus();
235
236 absl::flat_hash_map<std::string,
237 absl::flat_hash_set<std::shared_ptr<Subscription>>>
238 subscriptions_ ABSL_GUARDED_BY(mu_);
239 absl::flat_hash_map<std::string, Script> scripts_ ABSL_GUARDED_BY(mu_);
240 std::unique_ptr<redisAsyncContext, internal::RedisContextDeleter> context_;
241};
242
243template <typename T>
244absl::StatusOr<T> Redis::Get(std::string_view key) {
245 ASSIGN_OR_RETURN(Reply reply, Get(key));
246 return ConvertTo<T>(reply);
247}
248
249template <>
250inline absl::Status Redis::Set(std::string_view key, const std::string& value) {
251 return Set(key, std::string_view(value));
252}
253
254} // namespace act::redis
255
256#endif // ACTIONENGINE_REDIS_REDIS_H_
Concurrency utilities for ActionEngine.