Action Engine
Loading...
Searching...
No Matches
streams.h
1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#ifndef ACTIONENGINE_REDIS_STREAMS_H_
16#define ACTIONENGINE_REDIS_STREAMS_H_
17
18#include <string>
19#include <string_view>
20
22#include "actionengine/data/conversion.h"
24#include "actionengine/redis/redis.h"
25#include "actionengine/redis/reply_converters.h"
26#include "actionengine/util/status_macros.h"
27
28namespace act::redis {
29namespace internal {
30template <typename It>
31concept StringViewKeyValueIterator =
32 std::input_iterator<It> &&
33 std::convertible_to<typename std::iterator_traits<It>::value_type,
34 std::pair<std::string_view, std::string_view>>;
35}
36
37struct StreamMessageId {
38 int64_t millis = 0;
39 int64_t sequence = 0;
40 bool is_wildcard = false;
41
42 StreamMessageId operator+(absl::Duration duration) const;
43
44 bool operator==(const StreamMessageId& rhs) const {
45 return millis == rhs.millis && sequence == rhs.sequence;
46 }
47
48 bool operator>=(const StreamMessageId& rhs) const {
49 return millis > rhs.millis ||
50 (millis == rhs.millis && sequence >= rhs.sequence);
51 }
52
53 static constexpr StreamMessageId Wildcard() {
54 return StreamMessageId{.is_wildcard = true};
55 }
56
57 static absl::StatusOr<StreamMessageId> FromString(std::string_view id);
58
59 std::string ToString() const;
60};
61
62struct StreamMessage {
63 std::optional<std::string> stream_id;
64 StreamMessageId id;
65 absl::flat_hash_map<std::string, std::string> fields;
66
67 bool operator==(const StreamMessage& rhs) const {
68 return id == rhs.id && fields == rhs.fields;
69 }
70
71 template <typename Sink>
72 friend void AbslStringify(Sink& sink, const StreamMessage& message) {
73 absl::Format(&sink, "StreamMessage {\nid: %s\nfields: {\n",
74 message.id.ToString());
75 for (const auto& [key, value] : message.fields) {
76 absl::Format(&sink, " %s: %s\n", key, value);
77 }
78 absl::Format(&sink, "}");
79 }
80};
81
82absl::Status EgltAssignInto(Reply from, StreamMessage* absl_nonnull to);
83
84class RedisStream {
85 public:
86 RedisStream(redis::Redis* absl_nonnull redis, std::string_view key);
87
88 // Non-copyable, non-moveable.
89 RedisStream(const RedisStream&) = delete;
90 RedisStream& operator=(const RedisStream&) = delete;
91
92 ~RedisStream() = default;
93
94 template <typename Iter>
95 requires internal::StringViewKeyValueIterator<Iter>
96 absl::StatusOr<StreamMessageId> XAdd(
97 Iter fields_begin, Iter fields_end,
98 StreamMessageId id = StreamMessageId::Wildcard()) const {
99 const std::string str_id = id.ToString();
100
101 std::vector<std::string_view> args;
102 args.reserve(2 * (fields_end - fields_begin) + 2);
103 args.push_back(key_);
104 args.push_back(str_id);
105 for (auto it = fields_begin; it != fields_end; ++it) {
106 args.push_back(it->first);
107 args.push_back(it->second);
108 }
109 ASSIGN_OR_RETURN(Reply reply, redis_->ExecuteCommand("XADD", args));
110 return StreamMessageId::FromString(reply.ConsumeStringContentOrDie());
111 }
112
113 absl::StatusOr<StreamMessageId> XAdd(
114 std::initializer_list<std::pair<std::string_view, std::string_view>>
115 fields,
116 std::string_view id = "*") const;
117
118 absl::StatusOr<std::vector<StreamMessage>> XRead(
119 std::string_view offset_id = "0", int count = -1,
120 absl::Duration timeout = absl::ZeroDuration()) const;
121
122 absl::StatusOr<std::vector<StreamMessage>> XRead(
123 StreamMessageId offset_id = {}, int count = -1,
124 absl::Duration timeout = absl::ZeroDuration()) const;
125
126 absl::StatusOr<std::vector<StreamMessage>> XRange(
127 const StreamMessageId& start_offset_id = {},
128 const StreamMessageId& end_offset_id = {}, int count = -1) const;
129
130 absl::StatusOr<std::vector<StreamMessage>> XRevRange(
131 StreamMessageId start_offset_id = {}, StreamMessageId end_offset_id = {},
132 int count = -1) const;
133
134 private:
135 Redis* const absl_nonnull redis_;
136 const std::string key_;
137};
138} // namespace act::redis
139
140#endif // ACTIONENGINE_REDIS_STREAMS_H_
Concurrency utilities for ActionEngine.
ActionEngine data structures used to implement actions and nodes (data streams).