Action Engine
Loading...
Searching...
No Matches
act::redis::ChunkStore Class Referencefinalabstract

Detailed Description

A Redis-based implementation of the ChunkStore interface.

This class provides methods to store and retrieve chunks of data in a Redis database, using Redis streams for ordered storage and retrieval.

#include <actionengine/redis/chunk_store.h>

Inheritance diagram for act::redis::ChunkStore:

Public Member Functions

absl::StatusOr< ChunkGet (int64_t seq, absl::Duration timeout) override
 Get a chunk by its sequence number from the represented store.
 
absl::StatusOr< ChunkGetByArrivalOrder (int64_t arrival_offset, absl::Duration timeout) override
 Same as Get(), but retrieves the chunk by its arrival order (rank by arrival time) instead of sequence number.
 
absl::StatusOr< std::optional< Chunk > > Pop (int64_t seq) override
 Pop a chunk from the store by its sequence number.
 
absl::Status Put (int64_t seq, Chunk chunk, bool final) override
 Put a chunk into the store with the specified sequence number.
 
absl::Status CloseWritesWithStatus (absl::Status status) override
 Closes the store for writes, allowing for finalization of the store.
 
- Public Member Functions inherited from act::ChunkStore
virtual absl::StatusOr< std::reference_wrapper< const Chunk > > GetRef (int64_t seq, absl::Duration timeout)
 Same as Get(), but returns a reference to the chunk instead of copying it.
 
virtual absl::StatusOr< std::reference_wrapper< const Chunk > > GetRefByArrivalOrder (int64_t seq, absl::Duration timeout)
 Same as GetByArrivalOrder(), but returns a reference to the chunk instead of copying it.
 

Member Function Documentation

◆ CloseWritesWithStatus()

absl::Status act::redis::ChunkStore::CloseWritesWithStatus ( absl::Status status)
overridevirtual

Closes the store for writes, allowing for finalization of the store.

This method is used to indicate that no further writes will be made to the store. It can be used to finalize the store, ensuring that no new writes are allowed. Any pending read operations that are waiting for new writes will be notified that no further writes will occur.

Parameters
statusThe status to set for the store, indicating whether it was closed successfully or with an error.
Returns
An absl::Status indicating the success or failure of the operation.

Implements act::ChunkStore.

◆ Get()

absl::StatusOr< Chunk > act::redis::ChunkStore::Get ( int64_t seq,
absl::Duration timeout )
overridevirtual

Get a chunk by its sequence number from the represented store.

This method blocks until the chunk with the specified sequence number is available in the store, or until the specified timeout expires. As storage may be non-local, any errors encountered while retrieving the chunk will be returned as an absl::Status.

Parameters
seqThe sequence number of the chunk to retrieve.
timeoutThe maximum duration to wait for the chunk to become available.
Returns
A Chunk object containing the data associated with the specified sequence number, or an absl::Status indicating an error if the chunk could not be retrieved due to timeout or other issues.
Note
This method may react to cancellation of the calling fiber, but it is not guaranteed to do so: that depends on the specific implementation. However, in cases when it becomes evident that the chunk can never be retrieved (e.g. writes were closed or final sequence number is less than the requested sequence number), correct implementations should return errors and not block indefinitely, even if the state only becomes evident after the method is called.

Reimplemented from act::ChunkStore.

◆ GetByArrivalOrder()

absl::StatusOr< Chunk > act::redis::ChunkStore::GetByArrivalOrder ( int64_t arrival_order,
absl::Duration timeout )
overridevirtual

Same as Get(), but retrieves the chunk by its arrival order (rank by arrival time) instead of sequence number.

Parameters
arrival_orderThe parameter n to retrieve the n-th chunk that has arrived in the store. The first chunk that arrives has an arrival order of 0, the second has an arrival order of 1, and so on.
timeoutThe maximum duration to wait for the chunk to become available.
Returns
A Chunk object containing the data associated with the specified sequence number, or an absl::Status indicating an error if the chunk could not be retrieved due to timeout or other issues.

Reimplemented from act::ChunkStore.

◆ Pop()

absl::StatusOr< std::optional< Chunk > > act::redis::ChunkStore::Pop ( int64_t seq)
overridevirtual

Pop a chunk from the store by its sequence number.

This method removes the chunk associated with the specified sequence number from the store and returns it. If no chunk with that sequence number exists, it returns std::nullopt.

Parameters
seqThe sequence number of the chunk to pop.
Returns
An absl::StatusOr<std::optional<Chunk>> containing the popped chunk, or std::nullopt if no chunk with that sequence number exists. If an error occurs during the operation, it returns an absl::Status indicating the error.

Implements act::ChunkStore.

◆ Put()

absl::Status act::redis::ChunkStore::Put ( int64_t seq,
Chunk chunk,
bool final )
overridevirtual

Put a chunk into the store with the specified sequence number.

This method allows you to store a chunk of data in the store, associating it with a specific sequence number. If the final parameter is set to true, it indicates that this is the last chunk in a sequence, and no further chunks with a higher sequence number will be added.

Parameters
seqThe sequence number to associate with the chunk. Putting the same seq twice is an error.
chunkThe Chunk of data to put in the store.
finalA boolean indicating whether this is the final chunk in a sequence. If true, it indicates that no further chunks with a higher sequence number will be added to the store.
Note
Complete and correct implementations of this method may become quite complex, as they need to handle various edge cases, such as:
  • ensuring that the sequence number is unique,
  • managing the finality of chunks and communicating that to readers,
  • handling concurrent writes and reads without data corruption or loss.
LocalChunkStore is a good example of such an implementation. redis::ChunkStore is another example, which implements this interface on top of Redis, leveraging its atomicity within Lua scripts to ensure that writes are safe and consistent.

ChunkStore::Put

Implements act::ChunkStore.


The documentation for this class was generated from the following files: