15#ifndef ACTIONENGINE_REDIS_PUBSUB_H_ 
   16#define ACTIONENGINE_REDIS_PUBSUB_H_ 
   18#include <thread/channel.h> 
   21#include "actionengine/redis/reply.h" 
   26  static constexpr size_t kDefaultChannelCapacity = 16;
 
   28  explicit Subscription(
size_t capacity = kDefaultChannelCapacity);
 
   29  explicit Subscription(absl::AnyInvocable<
void(Reply)> on_message);
 
   33  thread::Case OnSubscribe() 
const;
 
   34  thread::Case OnUnsubscribe() 
const;
 
   36  thread::Reader<Reply>* GetReader() { 
return channel_.reader(); }
 
   41  void Message(Reply reply);
 
   45  void CloseWriter() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
 
   47  mutable act::Mutex mu_;
 
   48  bool writer_closed_ ABSL_GUARDED_BY(mu_) = false;
 
   50  thread::Channel<Reply> channel_;
 
   51  absl::AnyInvocable<
void(Reply)> on_message_;
 
   52  thread::PermanentEvent subscribe_event_;
 
   53  thread::PermanentEvent unsubscribe_event_;
 
Concurrency utilities for ActionEngine.