Action Engine
Loading...
Searching...
No Matches
chunk_store.h
1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#ifndef ACTIONENGINE_REDIS_CHUNK_STORE_H_
16#define ACTIONENGINE_REDIS_CHUNK_STORE_H_
17
18#include <array>
19#include <cstddef>
20#include <cstdint>
21#include <memory>
22#include <optional>
23#include <string>
24#include <string_view>
25#include <utility>
26#include <variant>
27#include <vector>
28
29#include <absl/base/nullability.h>
30#include <absl/base/optimization.h>
31#include <absl/base/thread_annotations.h>
32#include <absl/container/flat_hash_map.h>
33#include <absl/hash/hash.h>
34#include <absl/log/check.h>
35#include <absl/log/log.h>
36#include <absl/status/status.h>
37#include <absl/status/statusor.h>
38#include <absl/strings/numbers.h>
39#include <absl/strings/str_cat.h>
40#include <absl/strings/str_replace.h>
41#include <absl/strings/str_split.h>
42#include <absl/time/clock.h>
43#include <absl/time/time.h>
44
46#include "actionengine/data/conversion.h"
48#include "actionengine/redis/chunk_store_ops/close_writes.lua.h"
49#include "actionengine/redis/pubsub.h"
50#include "actionengine/redis/redis.h"
51#include "actionengine/redis/reply.h"
52#include "actionengine/redis/reply_converters.h"
53#include "actionengine/redis/streams.h"
55#include "actionengine/util/status_macros.h"
56
57namespace act::redis {
58
59struct ChunkStoreEvent {
60 std::string type;
61 int seq = -1;
62 int arrival_offset = -1;
63 std::string stream_message_id;
64
65 static absl::StatusOr<ChunkStoreEvent> FromString(const std::string& message);
66};
67
76class ChunkStore final : public act::ChunkStore {
77 // For detailed documentation, see the base class, ChunkStore.
78 public:
79 explicit ChunkStore(std::shared_ptr<Redis> redis, std::string_view id,
80 absl::Duration ttl = absl::InfiniteDuration());
81
82 // No copy or move semantics allowed.
83 ChunkStore(const ChunkStore&) = delete;
84 ChunkStore& operator=(const ChunkStore& other) = delete;
85
86 ~ChunkStore() override;
87
88 absl::StatusOr<Chunk> Get(int64_t seq, absl::Duration timeout) override;
89
90 absl::StatusOr<Chunk> GetByArrivalOrder(int64_t arrival_offset,
91 absl::Duration timeout) override;
92
93 absl::StatusOr<std::optional<Chunk>> Pop(int64_t seq) override;
94
95 absl::Status Put(int64_t seq, Chunk chunk, bool final) override;
96
97 absl::Status CloseWritesWithStatus(absl::Status status) override;
98
99 absl::StatusOr<size_t> Size() override;
100
101 absl::StatusOr<bool> Contains(int64_t seq) override;
102
103 absl::Status SetId(std::string_view id) override;
104
105 [[nodiscard]] std::string_view GetId() const override { return id_; }
106
107 absl::StatusOr<int64_t> GetSeqForArrivalOffset(
108 int64_t arrival_offset) override;
109
110 absl::StatusOr<int64_t> GetFinalSeq() override;
111
112 private:
113 std::string GetKey(std::string_view key = "") const {
114 if (key.empty()) {
115 return redis_->GetKey(absl::StrCat("streams:", id_));
116 }
117 return redis_->GetKey(absl::StrCat("streams:", id_, ":", key));
118 }
119
120 absl::StatusOr<std::optional<Chunk>> TryGet(int64_t seq);
121
122 mutable act::Mutex mu_;
123 act::CondVar cv_ ABSL_GUARDED_BY(mu_);
124
125 bool allow_new_gets_ ABSL_GUARDED_BY(mu_) = true;
126 size_t num_pending_gets_ ABSL_GUARDED_BY(mu_) = 0;
127 absl::Duration ttl_ = absl::InfiniteDuration();
128
129 absl::flat_hash_map<int, std::string> seq_to_stream_id_ ABSL_GUARDED_BY(mu_);
130 absl::flat_hash_map<int, std::string> arrival_offset_to_stream_id_
131 ABSL_GUARDED_BY(mu_);
132
133 std::shared_ptr<Redis> redis_;
134 const std::string id_;
135 RedisStream stream_;
136 std::shared_ptr<Subscription> subscription_;
137};
138
139} // namespace act::redis
140
141#endif // ACTIONENGINE_REDIS_CHUNK_STORE_H_
Concurrency utilities for ActionEngine.
An abstract interface for raw data storage and retrieval for ActionEngine nodes.
Definition chunk_store.h:84
absl::StatusOr< std::optional< Chunk > > Pop(int64_t seq) override
Pop a chunk from the store by its sequence number.
Definition chunk_store.cc:170
absl::Status CloseWritesWithStatus(absl::Status status) override
Closes the store for writes, allowing for finalization of the store.
Definition chunk_store.cc:238
absl::Status Put(int64_t seq, Chunk chunk, bool final) override
Put a chunk into the store with the specified sequence number.
Definition chunk_store.cc:197
absl::StatusOr< Chunk > Get(int64_t seq, absl::Duration timeout) override
Get a chunk by its sequence number from the represented store.
Definition chunk_store.cc:116
absl::StatusOr< Chunk > GetByArrivalOrder(int64_t arrival_offset, absl::Duration timeout) override
Same as Get(), but retrieves the chunk by its arrival order (rank by arrival time) instead of sequenc...
Definition chunk_store.cc:165
An abstract interface for raw data storage and retrieval for ActionEngine nodes.
Definition types.h:104
ActionEngine data structures used to implement actions and nodes (data streams).