42class LocalChunkStore final :
public ChunkStore {
45 LocalChunkStore() : ChunkStore() {}
47 explicit LocalChunkStore(std::string_view
id);
50 LocalChunkStore(
const LocalChunkStore& other) =
delete;
51 LocalChunkStore& operator=(
const LocalChunkStore& other) =
delete;
53 ~LocalChunkStore()
override;
55 void Notify()
override;
57 absl::StatusOr<std::reference_wrapper<const Chunk>>
GetRef(
58 int64_t seq, absl::Duration timeout)
override;
61 int64_t arrival_offset, absl::Duration timeout)
override;
63 absl::StatusOr<std::optional<Chunk>>
Pop(int64_t seq)
override;
65 absl::Status
Put(int64_t seq,
Chunk chunk,
bool final)
override;
69 absl::StatusOr<size_t> Size()
override;
71 absl::StatusOr<bool> Contains(int64_t seq)
override;
73 absl::Status SetId(std::string_view
id)
override;
75 std::string_view GetId()
const override;
77 absl::StatusOr<int64_t> GetSeqForArrivalOffset(
78 int64_t arrival_offset)
override;
80 absl::StatusOr<int64_t> GetFinalSeq()
override;
83 void ClosePutsAndAwaitPendingOperations() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
85 mutable act::Mutex mu_;
89 absl::flat_hash_map<int64_t, int64_t> seq_to_arrival_order_
91 absl::flat_hash_map<int64_t, int64_t> arrival_order_to_seq_
93 absl::flat_hash_map<int64_t, Chunk> chunks_ ABSL_GUARDED_BY(mu_);
95 int64_t final_seq_ = -1;
96 int64_t max_seq_ = -1;
97 int64_t total_chunks_put_ ABSL_GUARDED_BY(mu_) = 0;
99 bool no_further_puts_ ABSL_GUARDED_BY(mu_) =
false;
100 mutable act::CondVar cv_ ABSL_GUARDED_BY(mu_);
102 size_t num_pending_ops_ ABSL_GUARDED_BY(mu_) = 0;
absl::StatusOr< std::reference_wrapper< const Chunk > > GetRef(int64_t seq, absl::Duration timeout) override
Same as Get(), but returns a reference to the chunk instead of copying it.
Definition local_chunk_store.cc:45
absl::StatusOr< std::reference_wrapper< const Chunk > > GetRefByArrivalOrder(int64_t arrival_offset, absl::Duration timeout) override
Same as GetByArrivalOrder(), but returns a reference to the chunk instead of copying it.
Definition local_chunk_store.cc:86
absl::StatusOr< std::optional< Chunk > > Pop(int64_t seq) override
Pop a chunk from the store by its sequence number.
Definition local_chunk_store.cc:130
absl::Status CloseWritesWithStatus(absl::Status) override
Closes the store for writes, allowing for finalization of the store.
Definition local_chunk_store.cc:162
absl::Status Put(int64_t seq, Chunk chunk, bool final) override
Put a chunk into the store with the specified sequence number.
Definition local_chunk_store.cc:142