15#ifndef ACTIONENGINE_REDIS_REDIS_H_ 
   16#define ACTIONENGINE_REDIS_REDIS_H_ 
   28#include <absl/base/nullability.h> 
   29#include <absl/base/thread_annotations.h> 
   30#include <absl/container/flat_hash_map.h> 
   31#include <absl/container/flat_hash_set.h> 
   32#include <absl/functional/any_invocable.h> 
   33#include <absl/status/status.h> 
   34#include <absl/status/statusor.h> 
   35#include <hiredis/async.h> 
   36#include <hiredis/hiredis.h> 
   41#include "actionengine/data/conversion.h" 
   42#include "actionengine/redis/pubsub.h" 
   43#include "actionengine/redis/reply.h" 
   44#include "actionengine/util/status_macros.h" 
   48using CommandArgs = std::vector<std::string_view>;
 
   53struct RedisContextDeleter {
 
   54  void operator()(redisAsyncContext* absl_nullable context)
 const {
 
   55    if (context != 
nullptr) {
 
   56      redisAsyncFree(context);
 
   61struct PrivateConstructorTag {};
 
   69  [[nodiscard]] uvw::loop* absl_nonnull Get() 
const;
 
   74  std::shared_ptr<uvw::async_handle> handle_;
 
   75  std::shared_ptr<uvw::loop> loop_;
 
   76  std::unique_ptr<std::thread> thread_;
 
   82  static absl::StatusOr<HelloReply> From(Reply reply);
 
   90  std::vector<absl::flat_hash_map<std::string, std::string>> modules;
 
  102  thread::PermanentEvent event;
 
  118  static void ConnectCallback(
const redisAsyncContext* absl_nonnull context,
 
  121  static void DisconnectCallback(
const redisAsyncContext* absl_nonnull context,
 
  124  static void PubsubCallback(redisAsyncContext* absl_nonnull context,
 
  125                             void* absl_nonnull hiredis_reply,
 
  126                             void* absl_nullable);
 
  128  static void ReplyCallback(redisAsyncContext* absl_nonnull context,
 
  129                            void* absl_nonnull hiredis_reply,
 
  130                            void* absl_nonnull privdata);
 
  133  static void PushReplyCallback(redisAsyncContext* absl_nonnull context,
 
  134                                void* absl_nonnull hiredis_reply);
 
  139  static absl::StatusOr<std::unique_ptr<Redis>> Connect(std::string_view host,
 
  143  Redis(
const Redis&) = 
delete;
 
  144  Redis& operator=(
const Redis&) = 
delete;
 
  148  void SetKeyPrefix(std::string_view prefix);
 
  150  [[nodiscard]] std::string_view GetKeyPrefix() 
const;
 
  152  std::string GetKey(std::string_view key) 
const;
 
  154  absl::StatusOr<Reply> Get(std::string_view key);
 
  155  template <
typename T>
 
  156  absl::StatusOr<T> Get(std::string_view key);
 
  158  absl::Status Set(std::string_view key, std::string_view value);
 
  160  template <
typename T>
 
  161  absl::Status Set(std::string_view key, T&& value) {
 
  162    ASSIGN_OR_RETURN(
const std::string value_str,
 
  163                     ConvertTo<std::string>(std::forward<T>(value)));
 
  164    return Set(key, std::string_view(value_str));
 
  167  absl::StatusOr<std::string> RegisterScript(std::string_view name,
 
  168                                             std::string_view code,
 
  169                                             bool overwrite_existing = 
true);
 
  171  absl::StatusOr<Reply> ExecuteScript(std::string_view name,
 
  172                                      CommandArgs script_keys = {},
 
  173                                      CommandArgs script_args = {});
 
  175  absl::StatusOr<absl::flat_hash_map<std::string, std::optional<int64_t>>>
 
  176  ZRange(std::string_view key, int64_t start, int64_t end,
 
  177         bool withscores = 
false);
 
  179  absl::StatusOr<std::shared_ptr<Subscription>> Subscribe(
 
  180      std::string_view channel,
 
  181      absl::AnyInvocable<
void(Reply)> on_message = {});
 
  183  absl::Status Unsubscribe(std::string_view channel);
 
  185  void RemoveSubscription(std::string_view channel,
 
  186                          const std::shared_ptr<Subscription>& subscription);
 
  188  absl::StatusOr<HelloReply> Hello(
int protocol_version = 3,
 
  189                                   std::string_view client_name = 
"",
 
  190                                   std::string_view username = 
"",
 
  191                                   std::string_view password = 
"");
 
  193  absl::StatusOr<Reply> ExecuteCommand(std::string_view command,
 
  194                                       const CommandArgs& args = {});
 
  196  explicit Redis(internal::PrivateConstructorTag _) {}
 
  199  bool ParseReply(redisReply* absl_nonnull hiredis_reply,
 
  200                  Reply* absl_nonnull reply_out)
 
  201      ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
 
  203  absl::Status CheckConnected() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
 
  205  absl::Status UnsubscribeInternal(std::string_view channel);
 
  207  absl::StatusOr<Reply> ExecuteCommandWithGuards(std::string_view command,
 
  208                                                 const CommandArgs& args = {})
 
  209      ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
 
  211  absl::StatusOr<Reply> ExecuteCommandInternal(std::string_view command,
 
  212                                               const CommandArgs& args = {})
 
  213      ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
 
  216  void OnConnect(
int status) ABSL_LOCKS_EXCLUDED(mu_);
 
  218  void OnDisconnect(
int status) ABSL_LOCKS_EXCLUDED(mu_);
 
  220  void OnPubsubReply(
void* absl_nonnull hiredis_reply) ABSL_LOCKS_EXCLUDED(mu_);
 
  222  void OnPushReply(redisReply* absl_nonnull hiredis_reply)
 
  223      ABSL_LOCKS_EXCLUDED(mu_);
 
  225  mutable act::Mutex mu_;
 
  226  act::CondVar cv_ ABSL_GUARDED_BY(mu_);
 
  228  internal::EventLoop event_loop_ ABSL_GUARDED_BY(mu_);
 
  230  std::string key_prefix_ ABSL_GUARDED_BY(mu_) = 
"act:";
 
  231  size_t num_pending_commands_ ABSL_GUARDED_BY(mu_) = 0;
 
  232  thread::PermanentEvent disconnect_event_ ABSL_GUARDED_BY(mu_);
 
  233  bool connected_ ABSL_GUARDED_BY(mu_) = 
false;
 
  234  absl::Status status_ ABSL_GUARDED_BY(mu_) = absl::OkStatus();
 
  236  absl::flat_hash_map<std::string,
 
  237                      absl::flat_hash_set<std::shared_ptr<Subscription>>>
 
  238      subscriptions_ ABSL_GUARDED_BY(mu_);
 
  239  absl::flat_hash_map<std::string, Script> scripts_ ABSL_GUARDED_BY(mu_);
 
  240  std::unique_ptr<redisAsyncContext, internal::RedisContextDeleter> context_;
 
  244absl::StatusOr<T> Redis::Get(std::string_view key) {
 
  245  ASSIGN_OR_RETURN(Reply reply, Get(key));
 
  246  return ConvertTo<T>(reply);
 
  250inline absl::Status Redis::Set(std::string_view key, 
const std::string& value) {
 
  251  return Set(key, std::string_view(value));
 
Concurrency utilities for ActionEngine.