Action Engine
Loading...
Searching...
No Matches
local_chunk_store.h
1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#ifndef ACTIONENGINE_STORES_LOCAL_CHUNK_STORE_H_
16#define ACTIONENGINE_STORES_LOCAL_CHUNK_STORE_H_
17
18#include <cstddef>
19#include <cstdint>
20#include <string>
21#include <string_view>
22
23#include <absl/base/thread_annotations.h>
24#include <absl/container/flat_hash_map.h>
25#include <absl/hash/hash.h>
26
30
31namespace act {
32
42class LocalChunkStore final : public ChunkStore {
43 // For detailed documentation, see the base class, ChunkStore.
44 public:
45 LocalChunkStore() : ChunkStore() {}
46
47 explicit LocalChunkStore(std::string_view id);
48
49 // Neither copyable nor movable.
50 LocalChunkStore(const LocalChunkStore& other) = delete;
51 LocalChunkStore& operator=(const LocalChunkStore& other) = delete;
52
53 ~LocalChunkStore() override;
54
55 void Notify() override;
56
57 absl::StatusOr<std::reference_wrapper<const Chunk>> GetRef(
58 int64_t seq, absl::Duration timeout) override;
59
60 absl::StatusOr<std::reference_wrapper<const Chunk>> GetRefByArrivalOrder(
61 int64_t arrival_offset, absl::Duration timeout) override;
62
63 absl::StatusOr<std::optional<Chunk>> Pop(int64_t seq) override;
64
65 absl::Status Put(int64_t seq, Chunk chunk, bool final) override;
66
67 absl::Status CloseWritesWithStatus(absl::Status) override;
68
69 absl::StatusOr<size_t> Size() override;
70
71 absl::StatusOr<bool> Contains(int64_t seq) override;
72
73 absl::Status SetId(std::string_view id) override;
74
75 std::string_view GetId() const override;
76
77 absl::StatusOr<int64_t> GetSeqForArrivalOffset(
78 int64_t arrival_offset) override;
79
80 absl::StatusOr<int64_t> GetFinalSeq() override;
81
82 private:
83 void ClosePutsAndAwaitPendingOperations() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
84
85 mutable act::Mutex mu_;
86
87 std::string id_;
88
89 absl::flat_hash_map<int64_t, int64_t> seq_to_arrival_order_
90 ABSL_GUARDED_BY(mu_);
91 absl::flat_hash_map<int64_t, int64_t> arrival_order_to_seq_
92 ABSL_GUARDED_BY(mu_);
93 absl::flat_hash_map<int64_t, Chunk> chunks_ ABSL_GUARDED_BY(mu_);
94
95 int64_t final_seq_ = -1;
96 int64_t max_seq_ = -1;
97 int64_t total_chunks_put_ ABSL_GUARDED_BY(mu_) = 0;
98
99 bool no_further_puts_ ABSL_GUARDED_BY(mu_) = false;
100 mutable act::CondVar cv_ ABSL_GUARDED_BY(mu_);
101
102 size_t num_pending_ops_ ABSL_GUARDED_BY(mu_) = 0;
103};
104
105} // namespace act
106
107#endif // ACTIONENGINE_STORES_LOCAL_CHUNK_STORE_H_
Concurrency utilities for ActionEngine.
absl::StatusOr< std::reference_wrapper< const Chunk > > GetRef(int64_t seq, absl::Duration timeout) override
Same as Get(), but returns a reference to the chunk instead of copying it.
Definition local_chunk_store.cc:45
absl::StatusOr< std::reference_wrapper< const Chunk > > GetRefByArrivalOrder(int64_t arrival_offset, absl::Duration timeout) override
Same as GetByArrivalOrder(), but returns a reference to the chunk instead of copying it.
Definition local_chunk_store.cc:86
absl::StatusOr< std::optional< Chunk > > Pop(int64_t seq) override
Pop a chunk from the store by its sequence number.
Definition local_chunk_store.cc:130
absl::Status CloseWritesWithStatus(absl::Status) override
Closes the store for writes, allowing for finalization of the store.
Definition local_chunk_store.cc:162
absl::Status Put(int64_t seq, Chunk chunk, bool final) override
Put a chunk into the store with the specified sequence number.
Definition local_chunk_store.cc:142
An abstract interface for raw data storage and retrieval for ActionEngine nodes.
Definition types.h:104
ActionEngine data structures used to implement actions and nodes (data streams).