Action Engine
Loading...
Searching...
No Matches
chunk_store_writer.h
1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#ifndef ACTIONENGINE_STORES_CHUNK_STORE_WRITER_H_
16#define ACTIONENGINE_STORES_CHUNK_STORE_WRITER_H_
17
18#include <algorithm>
19#include <memory>
20#include <optional>
21#include <utility>
22#include <vector>
23
24#include <absl/base/nullability.h>
25#include <absl/base/thread_annotations.h>
26#include <absl/log/check.h>
27#include <absl/status/status.h>
28#include <absl/status/statusor.h>
29
31#include "actionengine/data/serialization.h"
33#include "actionengine/net/stream.h"
35
36namespace act {
37
49 public:
60 explicit ChunkStoreWriter(ChunkStore* absl_nonnull chunk_store,
61 int n_chunks_to_buffer = -1);
62
63 // This class is not copyable or movable.
64 ChunkStoreWriter(const ChunkStoreWriter&) = delete;
65 ChunkStoreWriter& operator=(const ChunkStoreWriter&) = delete;
66
68
69 absl::StatusOr<int> Put(Chunk value, int seq, bool final);
70
71 template <typename T>
72 absl::StatusOr<int> Put(T value, int seq = -1, bool final = false) {
73 auto chunk = ToChunk(std::move(value));
74 if (!chunk.ok()) {
75 return chunk.status();
76 }
77 return Put(*std::move(chunk), seq, final);
78 }
79
80 template <typename T>
81 friend ChunkStoreWriter& operator<<(ChunkStoreWriter& writer, T value) {
82 absl::StatusOr<Chunk> chunk = ToChunk(std::move(value));
83 const bool final = chunk->IsNull();
84 writer.Put(*std::move(chunk), -1, final).IgnoreError();
85 return writer;
86 }
87
88 absl::Status GetStatus() const;
89
90 void BindPeers(absl::flat_hash_map<std::string, WireStream*> peers);
91
92 void Cancel() {
93 act::MutexLock lock(&mu_);
94 CancelInternal();
95 }
96
97 void WaitForBufferToDrain() {
98 act::MutexLock lock(&mu_);
99 WaitForBufferToDrainInternal();
100 }
101
102 void Join() {
103 act::MutexLock lock(&mu_);
104 JoinInternal();
105 }
106
107 private:
108 void EnsureWriteLoop() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
109
110 void SafelyCloseBuffer() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
111
112 absl::Status RunWriteLoop() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
113
114 void WaitForBufferToDrainInternal() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
115 SafelyCloseBuffer();
116 JoinInternal();
117 }
118
119 void CancelInternal() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
120 SafelyCloseBuffer();
121 if (fiber_ != nullptr) {
122 fiber_->Cancel();
123 }
124 }
125
126 void JoinInternal() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
127 if (fiber_ != nullptr) {
128 thread::Fiber* fiber = fiber_.get();
129 mu_.unlock();
130 fiber->Join();
131 mu_.lock();
132 fiber_ = nullptr;
133 }
134 }
135
136 ChunkStore* absl_nonnull const chunk_store_ = nullptr;
137 const int n_chunks_to_buffer_;
138
139 int final_seq_ ABSL_GUARDED_BY(mu_) = -1;
140 int total_chunks_put_ ABSL_GUARDED_BY(mu_) = 0;
141
142 bool accepts_puts_ ABSL_GUARDED_BY(mu_) = true;
143 bool buffer_writer_closed_ ABSL_GUARDED_BY(mu_) = false;
144
145 int total_chunks_written_ = 0;
146
147 std::unique_ptr<thread::Fiber> fiber_ ABSL_GUARDED_BY(mu_);
148 thread::Channel<std::optional<NodeFragment>> buffer_;
149 absl::Status status_ ABSL_GUARDED_BY(mu_);
150 absl::flat_hash_map<std::string, WireStream*> peers_ ABSL_GUARDED_BY(mu_);
151
152 mutable act::Mutex mu_;
153};
154
155template <>
156ChunkStoreWriter& operator<<(ChunkStoreWriter& writer, Chunk value);
157
158template <>
159ChunkStoreWriter& operator<<(ChunkStoreWriter& writer,
160 std::pair<Chunk, int> value);
161
162template <typename T>
163ChunkStoreWriter& operator<<(ChunkStoreWriter& writer, std::vector<T> value) {
164 for (auto& element : std::move(value)) {
165 writer << std::move(element);
166 }
167 return writer;
168}
169
170template <typename T>
171ChunkStoreWriter* absl_nonnull operator<<(ChunkStoreWriter* absl_nonnull writer,
172 T value) {
173 *writer << std::move(value);
174 return writer;
175}
176
177template <typename T>
178std::unique_ptr<ChunkStoreWriter>& operator<<(
179 std::unique_ptr<ChunkStoreWriter>& writer, T value) {
180 *writer << std::move(value);
181 return writer;
182}
183
184template <typename T>
185std::shared_ptr<ChunkStoreWriter>& operator<<(
186 std::shared_ptr<ChunkStoreWriter>& writer, T value) {
187 *writer << std::move(value);
188 return writer;
189}
190
191} // namespace act
192
193#endif // ACTIONENGINE_STORES_CHUNK_STORE_WRITER_H_
Concurrency utilities for ActionEngine.
An abstract interface for raw data storage and retrieval for ActionEngine nodes.
Definition chunk_store.h:84
A writer for the ChunkStore that allows writing chunks to the store in a buffered manner.
Definition chunk_store_writer.h:48
ChunkStoreWriter(ChunkStore *absl_nonnull chunk_store, int n_chunks_to_buffer=-1)
Constructs a ChunkStoreWriter for the given ChunkStore, setting n_chunks_to_buffer if provided.
An abstract interface for raw data storage and retrieval for ActionEngine nodes.
Definition types.h:104
ActionEngine data structures used to implement actions and nodes (data streams).