Action Engine
Loading...
Searching...
No Matches
act::ChunkStore Class Referenceabstract

Detailed Description

An abstract interface for raw data storage and retrieval for ActionEngine nodes.

This class provides a generic interface for storing and retrieving chunks of data, identified by a sequence number. It supports both ordered and unordered access to the chunks, as well as various operations such as putting, getting, and popping chunks.

Note
A key feature of this interface is its flexibility, allowing it to be used in a wide range of scenarios, from simple in-memory storage to complex distributed systems.
However, this places a limitation: note that ChunkStore itself does not care about streaming semantics or, for example, range-based retrieval or writes. Streaming semantics are tackled by ChunkStoreReader and ChunkStoreWriter, which are built on top of this interface and are used together by AsyncNode to provide channel-like functionality.

Importantly, the Get() and GetByArrivalOrder() methods are designed to block until the requested chunk is available, allowing for synchronous channel-like behavior. This is particularly useful in asynchronous workflows by providing a way to wait for data to become available without having to manage the underlying storage details or streaming semantics directly.

ChunkStore can be implemented in various ways, such as in-memory or file-based storage, in relational or key-value databases, or even in distributed storage systems. The specific implementation will determine how the chunks are stored and retrieved, but the interface remains consistent across implementations.

Note
Efficiency and performance optimizations for specific storage backends, such as range-based queries, are expected to be implemented in concrete ChunkStore subclasses. This may change in the future as this interface evolves. For now, it is based, essentially, around keys and values.

#include <actionengine/stores/chunk_store.h>

Inheritance diagram for act::ChunkStore:

Public Member Functions

virtual absl::StatusOr< ChunkGet (int64_t seq, absl::Duration timeout)
 Get a chunk by its sequence number from the represented store.
 
virtual absl::StatusOr< ChunkGetByArrivalOrder (int64_t arrival_order, absl::Duration timeout)
 Same as Get(), but retrieves the chunk by its arrival order (rank by arrival time) instead of sequence number.
 
virtual absl::StatusOr< std::reference_wrapper< const Chunk > > GetRef (int64_t seq, absl::Duration timeout)
 Same as Get(), but returns a reference to the chunk instead of copying it.
 
virtual absl::StatusOr< std::reference_wrapper< const Chunk > > GetRefByArrivalOrder (int64_t seq, absl::Duration timeout)
 Same as GetByArrivalOrder(), but returns a reference to the chunk instead of copying it.
 
virtual absl::Status Put (int64_t seq, Chunk chunk, bool final)=0
 Put a chunk into the store with the specified sequence number.
 
virtual absl::StatusOr< std::optional< Chunk > > Pop (int64_t seq)=0
 Pop a chunk from the store by its sequence number.
 
virtual absl::Status CloseWritesWithStatus (absl::Status status)=0
 Closes the store for writes, allowing for finalization of the store.
 

Member Function Documentation

◆ CloseWritesWithStatus()

virtual absl::Status act::ChunkStore::CloseWritesWithStatus ( absl::Status status)
pure virtual

Closes the store for writes, allowing for finalization of the store.

This method is used to indicate that no further writes will be made to the store. It can be used to finalize the store, ensuring that no new writes are allowed. Any pending read operations that are waiting for new writes will be notified that no further writes will occur.

Parameters
statusThe status to set for the store, indicating whether it was closed successfully or with an error.
Returns
An absl::Status indicating the success or failure of the operation.

Implemented in act::LocalChunkStore, and act::redis::ChunkStore.

◆ Get()

absl::StatusOr< Chunk > act::ChunkStore::Get ( int64_t seq,
absl::Duration timeout )
virtual

Get a chunk by its sequence number from the represented store.

This method blocks until the chunk with the specified sequence number is available in the store, or until the specified timeout expires. As storage may be non-local, any errors encountered while retrieving the chunk will be returned as an absl::Status.

Parameters
seqThe sequence number of the chunk to retrieve.
timeoutThe maximum duration to wait for the chunk to become available.
Returns
A Chunk object containing the data associated with the specified sequence number, or an absl::Status indicating an error if the chunk could not be retrieved due to timeout or other issues.
Note
This method may react to cancellation of the calling fiber, but it is not guaranteed to do so: that depends on the specific implementation. However, in cases when it becomes evident that the chunk can never be retrieved (e.g. writes were closed or final sequence number is less than the requested sequence number), correct implementations should return errors and not block indefinitely, even if the state only becomes evident after the method is called.

Reimplemented in act::redis::ChunkStore.

◆ GetByArrivalOrder()

absl::StatusOr< Chunk > act::ChunkStore::GetByArrivalOrder ( int64_t arrival_order,
absl::Duration timeout )
virtual

Same as Get(), but retrieves the chunk by its arrival order (rank by arrival time) instead of sequence number.

Parameters
arrival_orderThe parameter n to retrieve the n-th chunk that has arrived in the store. The first chunk that arrives has an arrival order of 0, the second has an arrival order of 1, and so on.
timeoutThe maximum duration to wait for the chunk to become available.
Returns
A Chunk object containing the data associated with the specified sequence number, or an absl::Status indicating an error if the chunk could not be retrieved due to timeout or other issues.

Reimplemented in act::redis::ChunkStore.

◆ GetRef()

absl::StatusOr< std::reference_wrapper< const Chunk > > act::ChunkStore::GetRef ( int64_t seq,
absl::Duration timeout )
virtual

Same as Get(), but returns a reference to the chunk instead of copying it.

This allows for more efficient access to the chunk data in cases where data lives in-process, such as in-memory storage.

Parameters
seqThe sequence number of the chunk to retrieve.
timeoutThe maximum duration to wait for the chunk to become available.
Returns
A reference to a Chunk object containing the data associated with the specified sequence number, or an absl::Status indicating an error if the chunk could not be retrieved due to timeout or other issues.

Reimplemented in act::LocalChunkStore.

◆ GetRefByArrivalOrder()

absl::StatusOr< std::reference_wrapper< const Chunk > > act::ChunkStore::GetRefByArrivalOrder ( int64_t seq,
absl::Duration timeout )
virtual

Same as GetByArrivalOrder(), but returns a reference to the chunk instead of copying it.

This allows for more efficient access to the chunk data in cases where data lives in-process, such as in-memory storage.

Parameters
seqThe sequence number of the chunk to retrieve.
timeoutThe maximum duration to wait for the chunk to become available.
Returns
A reference to a Chunk object containing the data associated with the specified sequence number, or an absl::Status indicating an error if the chunk could not be retrieved due to timeout or other issues.

Reimplemented in act::LocalChunkStore.

◆ Pop()

virtual absl::StatusOr< std::optional< Chunk > > act::ChunkStore::Pop ( int64_t seq)
pure virtual

Pop a chunk from the store by its sequence number.

This method removes the chunk associated with the specified sequence number from the store and returns it. If no chunk with that sequence number exists, it returns std::nullopt.

Parameters
seqThe sequence number of the chunk to pop.
Returns
An absl::StatusOr<std::optional<Chunk>> containing the popped chunk, or std::nullopt if no chunk with that sequence number exists. If an error occurs during the operation, it returns an absl::Status indicating the error.

Implemented in act::LocalChunkStore, and act::redis::ChunkStore.

◆ Put()

virtual absl::Status act::ChunkStore::Put ( int64_t seq,
Chunk chunk,
bool final )
pure virtual

Put a chunk into the store with the specified sequence number.

This method allows you to store a chunk of data in the store, associating it with a specific sequence number. If the final parameter is set to true, it indicates that this is the last chunk in a sequence, and no further chunks with a higher sequence number will be added.

Parameters
seqThe sequence number to associate with the chunk. Putting the same seq twice is an error.
chunkThe Chunk of data to put in the store.
finalA boolean indicating whether this is the final chunk in a sequence. If true, it indicates that no further chunks with a higher sequence number will be added to the store.
Note
Complete and correct implementations of this method may become quite complex, as they need to handle various edge cases, such as:
  • ensuring that the sequence number is unique,
  • managing the finality of chunks and communicating that to readers,
  • handling concurrent writes and reads without data corruption or loss.
LocalChunkStore is a good example of such an implementation. redis::ChunkStore is another example, which implements this interface on top of Redis, leveraging its atomicity within Lua scripts to ensure that writes are safe and consistent.

ChunkStore::Put

Implemented in act::LocalChunkStore, and act::redis::ChunkStore.


The documentation for this class was generated from the following files: