15#ifndef ACTIONENGINE_REDIS_CHUNK_STORE_H_
16#define ACTIONENGINE_REDIS_CHUNK_STORE_H_
29#include <absl/base/nullability.h>
30#include <absl/base/optimization.h>
31#include <absl/base/thread_annotations.h>
32#include <absl/container/flat_hash_map.h>
33#include <absl/hash/hash.h>
34#include <absl/log/check.h>
35#include <absl/log/log.h>
36#include <absl/status/status.h>
37#include <absl/status/statusor.h>
38#include <absl/strings/numbers.h>
39#include <absl/strings/str_cat.h>
40#include <absl/strings/str_replace.h>
41#include <absl/strings/str_split.h>
42#include <absl/time/clock.h>
43#include <absl/time/time.h>
46#include "actionengine/data/conversion.h"
48#include "actionengine/redis/chunk_store_ops/close_writes.lua.h"
49#include "actionengine/redis/pubsub.h"
50#include "actionengine/redis/redis.h"
51#include "actionengine/redis/reply.h"
52#include "actionengine/redis/reply_converters.h"
53#include "actionengine/redis/streams.h"
55#include "actionengine/util/status_macros.h"
59struct ChunkStoreEvent {
62 int arrival_offset = -1;
63 std::string stream_message_id;
65 static absl::StatusOr<ChunkStoreEvent> FromString(
const std::string& message);
79 explicit ChunkStore(std::shared_ptr<Redis> redis, std::string_view
id,
80 absl::Duration ttl = absl::InfiniteDuration());
83 ChunkStore(
const ChunkStore&) =
delete;
84 ChunkStore& operator=(
const ChunkStore& other) =
delete;
86 ~ChunkStore()
override;
88 absl::StatusOr<Chunk>
Get(int64_t seq, absl::Duration timeout)
override;
91 absl::Duration timeout)
override;
93 absl::StatusOr<std::optional<Chunk>>
Pop(int64_t seq)
override;
95 absl::Status
Put(int64_t seq,
Chunk chunk,
bool final)
override;
99 absl::StatusOr<size_t> Size()
override;
101 absl::StatusOr<bool> Contains(int64_t seq)
override;
103 absl::Status SetId(std::string_view
id)
override;
105 [[nodiscard]] std::string_view GetId()
const override {
return id_; }
107 absl::StatusOr<int64_t> GetSeqForArrivalOffset(
108 int64_t arrival_offset)
override;
110 absl::StatusOr<int64_t> GetFinalSeq()
override;
113 std::string GetKey(std::string_view key =
"")
const {
115 return redis_->GetKey(absl::StrCat(
"streams:", id_));
117 return redis_->GetKey(absl::StrCat(
"streams:", id_,
":", key));
120 absl::StatusOr<std::optional<Chunk>> TryGet(int64_t seq);
122 mutable act::Mutex mu_;
123 act::CondVar cv_ ABSL_GUARDED_BY(mu_);
125 bool allow_new_gets_ ABSL_GUARDED_BY(mu_) =
true;
126 size_t num_pending_gets_ ABSL_GUARDED_BY(mu_) = 0;
127 absl::Duration ttl_ = absl::InfiniteDuration();
129 absl::flat_hash_map<int, std::string> seq_to_stream_id_ ABSL_GUARDED_BY(mu_);
130 absl::flat_hash_map<int, std::string> arrival_offset_to_stream_id_
131 ABSL_GUARDED_BY(mu_);
133 std::shared_ptr<Redis> redis_;
134 const std::string id_;
136 std::shared_ptr<Subscription> subscription_;
Concurrency utilities for ActionEngine.
An abstract interface for raw data storage and retrieval for ActionEngine nodes.
Definition chunk_store.h:84
absl::StatusOr< std::optional< Chunk > > Pop(int64_t seq) override
Pop a chunk from the store by its sequence number.
Definition chunk_store.cc:170
absl::Status CloseWritesWithStatus(absl::Status status) override
Closes the store for writes, allowing for finalization of the store.
Definition chunk_store.cc:238
absl::Status Put(int64_t seq, Chunk chunk, bool final) override
Put a chunk into the store with the specified sequence number.
Definition chunk_store.cc:197
absl::StatusOr< Chunk > Get(int64_t seq, absl::Duration timeout) override
Get a chunk by its sequence number from the represented store.
Definition chunk_store.cc:116
absl::StatusOr< Chunk > GetByArrivalOrder(int64_t arrival_offset, absl::Duration timeout) override
Same as Get(), but retrieves the chunk by its arrival order (rank by arrival time) instead of sequenc...
Definition chunk_store.cc:165
An abstract interface for raw data storage and retrieval for ActionEngine nodes.
ActionEngine data structures used to implement actions and nodes (data streams).