|
Action Engine
|
An abstract interface for raw data storage and retrieval for ActionEngine nodes.
This class provides a generic interface for storing and retrieving chunks of data, identified by a sequence number. It supports both ordered and unordered access to the chunks, as well as various operations such as putting, getting, and popping chunks.
Importantly, the Get() and GetByArrivalOrder() methods are designed to block until the requested chunk is available, allowing for synchronous channel-like behavior. This is particularly useful in asynchronous workflows by providing a way to wait for data to become available without having to manage the underlying storage details or streaming semantics directly.
ChunkStore can be implemented in various ways, such as in-memory or file-based storage, in relational or key-value databases, or even in distributed storage systems. The specific implementation will determine how the chunks are stored and retrieved, but the interface remains consistent across implementations.
#include <actionengine/stores/chunk_store.h>
Public Member Functions | |
| virtual absl::StatusOr< Chunk > | Get (int64_t seq, absl::Duration timeout) |
| Get a chunk by its sequence number from the represented store. | |
| virtual absl::StatusOr< Chunk > | GetByArrivalOrder (int64_t arrival_order, absl::Duration timeout) |
| Same as Get(), but retrieves the chunk by its arrival order (rank by arrival time) instead of sequence number. | |
| virtual absl::StatusOr< std::reference_wrapper< const Chunk > > | GetRef (int64_t seq, absl::Duration timeout) |
| Same as Get(), but returns a reference to the chunk instead of copying it. | |
| virtual absl::StatusOr< std::reference_wrapper< const Chunk > > | GetRefByArrivalOrder (int64_t seq, absl::Duration timeout) |
| Same as GetByArrivalOrder(), but returns a reference to the chunk instead of copying it. | |
| virtual absl::Status | Put (int64_t seq, Chunk chunk, bool final)=0 |
| Put a chunk into the store with the specified sequence number. | |
| virtual absl::StatusOr< std::optional< Chunk > > | Pop (int64_t seq)=0 |
| Pop a chunk from the store by its sequence number. | |
| virtual absl::Status | CloseWritesWithStatus (absl::Status status)=0 |
| Closes the store for writes, allowing for finalization of the store. | |
|
pure virtual |
Closes the store for writes, allowing for finalization of the store.
This method is used to indicate that no further writes will be made to the store. It can be used to finalize the store, ensuring that no new writes are allowed. Any pending read operations that are waiting for new writes will be notified that no further writes will occur.
| status | The status to set for the store, indicating whether it was closed successfully or with an error. |
absl::Status indicating the success or failure of the operation. Implemented in act::LocalChunkStore, and act::redis::ChunkStore.
|
virtual |
Get a chunk by its sequence number from the represented store.
This method blocks until the chunk with the specified sequence number is available in the store, or until the specified timeout expires. As storage may be non-local, any errors encountered while retrieving the chunk will be returned as an absl::Status.
| seq | The sequence number of the chunk to retrieve. |
| timeout | The maximum duration to wait for the chunk to become available. |
absl::Status indicating an error if the chunk could not be retrieved due to timeout or other issues.Reimplemented in act::redis::ChunkStore.
|
virtual |
Same as Get(), but retrieves the chunk by its arrival order (rank by arrival time) instead of sequence number.
| arrival_order | The parameter n to retrieve the n-th chunk that has arrived in the store. The first chunk that arrives has an arrival order of 0, the second has an arrival order of 1, and so on. |
| timeout | The maximum duration to wait for the chunk to become available. |
absl::Status indicating an error if the chunk could not be retrieved due to timeout or other issues. Reimplemented in act::redis::ChunkStore.
|
virtual |
Same as Get(), but returns a reference to the chunk instead of copying it.
This allows for more efficient access to the chunk data in cases where data lives in-process, such as in-memory storage.
| seq | The sequence number of the chunk to retrieve. |
| timeout | The maximum duration to wait for the chunk to become available. |
absl::Status indicating an error if the chunk could not be retrieved due to timeout or other issues. Reimplemented in act::LocalChunkStore.
|
virtual |
Same as GetByArrivalOrder(), but returns a reference to the chunk instead of copying it.
This allows for more efficient access to the chunk data in cases where data lives in-process, such as in-memory storage.
| seq | The sequence number of the chunk to retrieve. |
| timeout | The maximum duration to wait for the chunk to become available. |
absl::Status indicating an error if the chunk could not be retrieved due to timeout or other issues. Reimplemented in act::LocalChunkStore.
|
pure virtual |
Pop a chunk from the store by its sequence number.
This method removes the chunk associated with the specified sequence number from the store and returns it. If no chunk with that sequence number exists, it returns std::nullopt.
| seq | The sequence number of the chunk to pop. |
absl::StatusOr<std::optional<Chunk>> containing the popped chunk, or std::nullopt if no chunk with that sequence number exists. If an error occurs during the operation, it returns an absl::Status indicating the error. Implemented in act::LocalChunkStore, and act::redis::ChunkStore.
|
pure virtual |
Put a chunk into the store with the specified sequence number.
This method allows you to store a chunk of data in the store, associating it with a specific sequence number. If the final parameter is set to true, it indicates that this is the last chunk in a sequence, and no further chunks with a higher sequence number will be added.
| seq | The sequence number to associate with the chunk. Putting the same seq twice is an error. |
| chunk | The Chunk of data to put in the store. |
| final | A boolean indicating whether this is the final chunk in a sequence. If true, it indicates that no further chunks with a higher sequence number will be added to the store. |
Implemented in act::LocalChunkStore, and act::redis::ChunkStore.