15#ifndef ACTIONENGINE_REDIS_STREAMS_H_ 
   16#define ACTIONENGINE_REDIS_STREAMS_H_ 
   22#include "actionengine/data/conversion.h" 
   24#include "actionengine/redis/redis.h" 
   25#include "actionengine/redis/reply_converters.h" 
   26#include "actionengine/util/status_macros.h" 
   31concept StringViewKeyValueIterator =
 
   32    std::input_iterator<It> &&
 
   33    std::convertible_to<typename std::iterator_traits<It>::value_type,
 
   34                        std::pair<std::string_view, std::string_view>>;
 
   37struct StreamMessageId {
 
   40  bool is_wildcard = 
false;
 
   42  StreamMessageId operator+(absl::Duration duration) 
const;
 
   44  bool operator==(
const StreamMessageId& rhs)
 const {
 
   45    return millis == rhs.millis && sequence == rhs.sequence;
 
   48  bool operator>=(
const StreamMessageId& rhs)
 const {
 
   49    return millis > rhs.millis ||
 
   50           (millis == rhs.millis && sequence >= rhs.sequence);
 
   53  static constexpr StreamMessageId Wildcard() {
 
   54    return StreamMessageId{.is_wildcard = 
true};
 
   57  static absl::StatusOr<StreamMessageId> FromString(std::string_view 
id);
 
   59  std::string ToString() 
const;
 
   63  std::optional<std::string> stream_id;
 
   65  absl::flat_hash_map<std::string, std::string> fields;
 
   67  bool operator==(
const StreamMessage& rhs)
 const {
 
   68    return id == rhs.id && fields == rhs.fields;
 
   71  template <
typename Sink>
 
   72  friend void AbslStringify(Sink& sink, 
const StreamMessage& message) {
 
   73    absl::Format(&sink, 
"StreamMessage {\nid: %s\nfields: {\n",
 
   74                 message.id.ToString());
 
   75    for (
const auto& [key, value] : message.fields) {
 
   76      absl::Format(&sink, 
"  %s: %s\n", key, value);
 
   78    absl::Format(&sink, 
"}");
 
   82absl::Status EgltAssignInto(Reply from, StreamMessage* absl_nonnull to);
 
   86  RedisStream(redis::Redis* absl_nonnull redis, std::string_view key);
 
   89  RedisStream(
const RedisStream&) = 
delete;
 
   90  RedisStream& operator=(
const RedisStream&) = 
delete;
 
   92  ~RedisStream() = 
default;
 
   94  template <
typename Iter>
 
   95  requires internal::StringViewKeyValueIterator<Iter>
 
   96      absl::StatusOr<StreamMessageId> XAdd(
 
   97          Iter fields_begin, Iter fields_end,
 
   98          StreamMessageId 
id = StreamMessageId::Wildcard())
 const {
 
   99    const std::string str_id = 
id.ToString();
 
  101    std::vector<std::string_view> args;
 
  102    args.reserve(2 * (fields_end - fields_begin) + 2);
 
  103    args.push_back(key_);
 
  104    args.push_back(str_id);
 
  105    for (
auto it = fields_begin; it != fields_end; ++it) {
 
  106      args.push_back(it->first);
 
  107      args.push_back(it->second);
 
  109    ASSIGN_OR_RETURN(Reply reply, redis_->ExecuteCommand(
"XADD", args));
 
  110    return StreamMessageId::FromString(reply.ConsumeStringContentOrDie());
 
  113  absl::StatusOr<StreamMessageId> XAdd(
 
  114      std::initializer_list<std::pair<std::string_view, std::string_view>>
 
  116      std::string_view 
id = 
"*") 
const;
 
  118  absl::StatusOr<std::vector<StreamMessage>> XRead(
 
  119      std::string_view offset_id = 
"0", 
int count = -1,
 
  120      absl::Duration timeout = absl::ZeroDuration()) 
const;
 
  122  absl::StatusOr<std::vector<StreamMessage>> XRead(
 
  123      StreamMessageId offset_id = {}, 
int count = -1,
 
  124      absl::Duration timeout = absl::ZeroDuration()) 
const;
 
  126  absl::StatusOr<std::vector<StreamMessage>> XRange(
 
  127      const StreamMessageId& start_offset_id = {},
 
  128      const StreamMessageId& end_offset_id = {}, 
int count = -1) 
const;
 
  130  absl::StatusOr<std::vector<StreamMessage>> XRevRange(
 
  131      StreamMessageId start_offset_id = {}, StreamMessageId end_offset_id = {},
 
  132      int count = -1) 
const;
 
  135  Redis* 
const absl_nonnull redis_;
 
  136  const std::string key_;
 
Concurrency utilities for ActionEngine.
ActionEngine data structures used to implement actions and nodes (data streams).