61 int n_chunks_to_buffer = -1);
69 absl::StatusOr<int> Put(
Chunk value,
int seq,
bool final);
72 absl::StatusOr<int> Put(T value,
int seq = -1,
bool final =
false) {
73 auto chunk = ToChunk(std::move(value));
75 return chunk.status();
77 return Put(*std::move(chunk), seq,
final);
82 absl::StatusOr<Chunk> chunk = ToChunk(std::move(value));
83 const bool final = chunk->IsNull();
84 writer.Put(*std::move(chunk), -1,
final).IgnoreError();
88 absl::Status GetStatus()
const;
90 void BindPeers(absl::flat_hash_map<std::string, WireStream*> peers);
93 act::MutexLock lock(&mu_);
97 void WaitForBufferToDrain() {
98 act::MutexLock lock(&mu_);
99 WaitForBufferToDrainInternal();
103 act::MutexLock lock(&mu_);
108 void EnsureWriteLoop() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
110 void SafelyCloseBuffer() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
112 absl::Status RunWriteLoop() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
114 void WaitForBufferToDrainInternal() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
119 void CancelInternal() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
121 if (fiber_ !=
nullptr) {
126 void JoinInternal() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
127 if (fiber_ !=
nullptr) {
128 thread::Fiber* fiber = fiber_.get();
136 ChunkStore* absl_nonnull
const chunk_store_ =
nullptr;
137 const int n_chunks_to_buffer_;
139 int final_seq_ ABSL_GUARDED_BY(mu_) = -1;
140 int total_chunks_put_ ABSL_GUARDED_BY(mu_) = 0;
142 bool accepts_puts_ ABSL_GUARDED_BY(mu_) =
true;
143 bool buffer_writer_closed_ ABSL_GUARDED_BY(mu_) =
false;
145 int total_chunks_written_ = 0;
147 std::unique_ptr<thread::Fiber> fiber_ ABSL_GUARDED_BY(mu_);
148 thread::Channel<std::optional<NodeFragment>> buffer_;
149 absl::Status status_ ABSL_GUARDED_BY(mu_);
150 absl::flat_hash_map<std::string, WireStream*> peers_ ABSL_GUARDED_BY(mu_);
152 mutable act::Mutex mu_;
ChunkStoreWriter(ChunkStore *absl_nonnull chunk_store, int n_chunks_to_buffer=-1)
Constructs a ChunkStoreWriter for the given ChunkStore, setting n_chunks_to_buffer if provided.