Action Engine
Loading...
Searching...
No Matches
channel.h
1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#ifndef THREAD_FIBER_CHANNEL_H_
16#define THREAD_FIBER_CHANNEL_H_
17
18#include "thread/boost_primitives.h"
19#include "thread/cases.h"
20#include "thread/channel/waiter_state.h"
21#include "thread/fiber.h"
22#include "thread/select.h"
23
24namespace thread::internal {
25enum class CopyOrMove {
26 Copy,
27 Move,
28};
29
30static constexpr auto kCopy = CopyOrMove::Copy;
31static constexpr auto kMove = CopyOrMove::Move;
32
33template <typename T>
34T CopyOrMoveOut(T* absl_nonnull item, CopyOrMove strategy) {
35 if (strategy == kCopy) {
36 return *static_cast<const T*>(item);
37 }
38
39 if (strategy == kMove) {
40 return std::move(*item);
41 }
42
43 LOG(FATAL) << "Invalid CopyOrMove strategy: " << static_cast<int>(strategy);
44 ABSL_ASSUME(false);
45}
46} // namespace thread::internal
47
48namespace thread {
49template <class T>
50requires std::is_move_assignable_v<T> class Channel;
51}
52
53namespace thread::internal {
54template <typename T>
55struct ReadSelectable final : Selectable {
56 explicit ReadSelectable(Channel<T>* absl_nonnull channel)
57 : channel(channel) {}
58
59 bool Handle(CaseInSelectClause* absl_nonnull reader, bool enqueue) override;
60 void Unregister(CaseInSelectClause* absl_nonnull c) override;
61
62 Channel<T>* absl_nonnull channel;
63};
64
65template <typename T>
66struct WriteSelectable final : Selectable {
67 explicit WriteSelectable(Channel<T>* absl_nonnull channel)
68 : channel(channel) {}
69
70 bool Handle(CaseInSelectClause* absl_nonnull writer, bool enqueue) override;
71 void Unregister(CaseInSelectClause* absl_nonnull c) override;
72
73 Channel<T>* absl_nonnull channel;
74};
75} // namespace thread::internal
76
77namespace thread {
84template <class T>
85class Reader {
86 public:
87 // This class is not copyable or movable.
88 Reader(const Reader&) = delete;
89 Reader& operator=(const Reader&) = delete;
90
101 bool Read(T* absl_nonnull item);
102
132 Case OnRead(T* absl_nonnull item, bool* absl_nonnull ok);
133
134 private:
135 friend class Channel<T>;
136 explicit Reader(Channel<T>* absl_nonnull channel);
137
138 Channel<T>* absl_nonnull channel_;
139};
140
150template <class T>
151class Writer {
152 public:
153 // This class is not copyable or movable.
154 Writer(const Writer&) = delete;
155 Writer& operator=(const Writer&) = delete;
156
164 void Write(const T& item);
165
173 void Write(T&& item);
174
179 void Close();
180
204 Case OnWrite(const T& item);
205
229 Case OnWrite(T&& item);
230
242 bool WriteUnlessCancelled(const T& item);
243
259 bool WriteUnlessCancelled(T&& item);
260
261 private:
262 friend class Channel<T>;
263 explicit Writer(Channel<T>* absl_nonnull channel_state);
264
265 Channel<T>* absl_nonnull channel_;
266};
267
277template <typename T>
278requires std::is_move_assignable_v<T> class Channel {
279 public:
287 explicit Channel(size_t capacity)
288 : capacity_(capacity), closed_(false), rd_(this), wr_(this) {
289 DCHECK(Invariants());
290 }
291
292 // This class is not copyable or movable.
293 Channel(const Channel&) = delete;
294 Channel& operator=(const Channel&) = delete;
295
296 ~Channel() {
297 // Ensure exclusive access (to e.g. prevent concurrent Close()).
298 act::concurrency::impl::MutexLock lock(&mu_);
299 DCHECK(Invariants());
300 }
301
311 Reader<T>* absl_nonnull reader() { return &reader_; }
312
324 Writer<T>* absl_nonnull writer() { return &writer_; }
325
333 [[nodiscard]] size_t length() const { return Length(); }
334
335 private:
336 friend struct internal::ReadSelectable<T>;
337 friend struct internal::WriteSelectable<T>;
338 friend class Reader<T>;
339 friend class Writer<T>;
340
341 void Close();
342
343 bool Get(T* absl_nonnull dst);
344
345 size_t Length() const {
346 act::concurrency::impl::MutexLock lock(&mu_);
347 return queue_.size();
348 }
349
350 Case OnRead(T* absl_nonnull dst, bool* absl_nonnull ok) {
351 return {&rd_, dst, ok};
352 }
353
354 Case OnWrite(const T& item) requires(std::is_copy_constructible_v<T>) {
355 return {&wr_, &item, &internal::kCopy};
356 }
357
358 Case OnWrite(T&& item) { return {&wr_, &item, &internal::kMove}; }
359
360 const size_t capacity_;
361
362 mutable act::concurrency::impl::Mutex mu_;
363 std::deque<T> queue_ ABSL_GUARDED_BY(mu_);
364 bool closed_ ABSL_GUARDED_BY(mu_);
365
366 internal::ChannelWaiterState waiters_;
367
368 Reader<T> reader_{this};
369 internal::ReadSelectable<T> rd_;
370
371 Writer<T> writer_{this};
372 internal::WriteSelectable<T> wr_;
373
374 bool Invariants() const ABSL_EXCLUSIVE_LOCKS_REQUIRED
375
376 (mu_) {
377 CHECK_LE(queue_.size(), capacity_) << "Channel queue size exceeds capacity";
378 return true;
379 }
380};
381
382template <typename T>
383requires std::is_move_assignable_v<T> void Channel<T>::Close() {
384 act::concurrency::impl::MutexLock lock(&mu_);
385 DCHECK(Invariants());
386 CHECK(!closed_) << "Calling Close() on closed channel";
387 CHECK(waiters_.writers == nullptr)
388 << "Calling Close() on a channel with blocked writers";
389 closed_ = true;
390 this->waiters_.CloseAndReleaseReaders();
391 DCHECK(Invariants());
392}
393
394template <typename T>
395requires std::is_move_assignable_v<T> bool Channel<T>::Get(
396 T* absl_nonnull dst) {
397 bool result;
398 Select({OnRead(dst, &result)});
399 return result;
400}
401
402template <typename T>
403Reader<T>::Reader(Channel<T>* absl_nonnull channel) : channel_(channel) {}
404
405template <typename T>
406bool Reader<T>::Read(T* absl_nonnull item) {
407 return channel_->Get(item);
408}
409
410template <typename T>
411Case Reader<T>::OnRead(T* absl_nonnull item, bool* absl_nonnull ok) {
412 return channel_->OnRead(item, ok);
413}
414
415template <typename T>
416Writer<T>::Writer(Channel<T>* absl_nonnull channel_state)
417 : channel_(channel_state) {}
418
419template <typename T>
420void Writer<T>::Write(const T& item) {
421 Select({OnWrite(item)});
422}
423
424template <typename T>
425void Writer<T>::Write(T&& item) {
426 Select({OnWrite(std::move(item))});
427}
428
429template <typename T>
431 channel_->Close();
432}
433
434template <typename T>
435Case Writer<T>::OnWrite(const T& item) {
436 return channel_->OnWrite(item);
437}
438
439template <typename T>
441 return channel_->OnWrite(std::move(item));
442}
443
444template <typename T>
446 return !Cancelled() && Select({OnCancel(), OnWrite(item)}) == 1;
447}
448
449template <typename T>
451 return !Cancelled() && Select({OnCancel(), OnWrite(std::move(item))}) == 1;
452}
453} // namespace thread
454
455namespace thread::internal {
456template <typename T>
457bool ReadSelectable<T>::Handle(CaseInSelectClause* absl_nonnull reader,
458
459 bool enqueue) {
460 act::concurrency::impl::MutexLock lock(&channel->mu_);
461 DCHECK(channel->Invariants());
462
463 T* absl_nonnull dst_item = reader->GetCase()->GetArgPtr<T>(0);
464 bool* absl_nonnull dst_ok = reader->GetCase()->GetArgPtr<bool>(1);
465
466 // Is there a buffered item to read?
467 if (!channel->queue_.empty()) {
468 DVLOG(2) << "Get from buffer";
469 reader->selector->mu.Lock();
470 if (reader->selector->picked_case_index == Selector::kNonePicked) {
471 // Move out of the buffer. Explicitly destruct behind for types that don't
472 // have a move-assignment operator and where it may be harmful to leave
473 // around a copy. (For example, a shared_ptr-like object with only a copy
474 // assignment operator.)
475 *dst_item = std::move(channel->queue_.front());
476 channel->queue_.pop_front();
477 *dst_ok = true;
478 channel->waiters_.UnlockAndReleaseReader(reader);
479
480 // Potentially admit a waiting writer.
481 if (CaseInSelectClause * absl_nonnull unblocked_writer;
482 channel->waiters_.GetWaitingWriter(&unblocked_writer)) {
483 auto* absl_nonnull item = unblocked_writer->GetCase()->GetArgPtr<T>(0);
484 auto copy_or_move =
485 *unblocked_writer->GetCase()->GetArgPtr<CopyOrMove>(1);
486
487 channel->queue_.push_back(CopyOrMoveOut(item, copy_or_move));
488 channel->waiters_.UnlockAndReleaseWriter(unblocked_writer);
489 }
490 } else {
491 // While we weren't technically able to proceed, there's no point in
492 // Select() processing further cases, so we'll still return true below.
493 reader->selector->mu.Unlock();
494 }
495 DCHECK(channel->Invariants());
496 return true;
497 }
498
499 // Try to transfer directly from waiting writer to reader
500 if (CaseInSelectClause * absl_nonnull writer;
501 channel->waiters_.GetMatchingWriter(reader, &writer)) {
502 auto* absl_nonnull item = writer->GetCase()->GetArgPtr<T>(0);
503 auto copy_or_move = *writer->GetCase()->GetArgPtr<CopyOrMove>(1);
504
505 *dst_item = CopyOrMoveOut(item, copy_or_move);
506 *dst_ok = true;
507
508 channel->waiters_.UnlockAndReleaseReader(reader);
509 channel->waiters_.UnlockAndReleaseWriter(writer);
510 DCHECK(channel->Invariants());
511 }
512
513 reader->selector->mu.Lock();
514 // We must guarantee that this case is eligible to proceed before any
515 // side effects can occur.
516 if (reader->selector->picked_case_index != Selector::kNonePicked) {
517 reader->selector->mu.Unlock();
518 // Already handled item
519 DVLOG(2) << "Read cancelled since another selector case done";
520 DCHECK(channel->Invariants());
521 return true;
522 }
523
524 if (channel->closed_) {
525 DVLOG(2) << "Read failing because channel closed";
526 *dst_ok = false;
527 channel->waiters_.UnlockAndReleaseReader(reader);
528 return true;
529 }
530
531 if (enqueue) {
532 // Register with waiting readers
533 DVLOG(2) << "Read waiting";
534 internal::PushBack(&channel->waiters_.readers, reader);
535 }
536
537 reader->selector->mu.Unlock();
538 DCHECK(channel->Invariants());
539 return false;
540}
541
542template <typename T>
543void ReadSelectable<T>::Unregister(CaseInSelectClause* absl_nonnull c) {
544 act::concurrency::impl::MutexLock lock(&channel->mu_);
545 internal::UnlinkFromList(&channel->waiters_.readers, c);
546}
547
548template <typename T>
549bool WriteSelectable<T>::Handle(CaseInSelectClause* absl_nonnull writer,
550
551 bool enqueue) {
552 act::concurrency::impl::MutexLock lock(&channel->mu_);
553 DCHECK(channel->Invariants());
554 CHECK(!channel->closed_) << "Calling Write() on closed channel";
555
556 // First try to transfer directly from writer to a waiting reader
557 if (CaseInSelectClause * absl_nonnull reader;
558 channel->waiters_.GetMatchingReader(writer, &reader)) {
559 auto* absl_nonnull writer_item = writer->GetCase()->GetArgPtr<T>(0);
560 auto copy_or_move = *writer->GetCase()->GetArgPtr<CopyOrMove>(1);
561
562 auto* absl_nonnull reader_item = reader->GetCase()->GetArgPtr<T>(0);
563 bool* absl_nonnull reader_ok = reader->GetCase()->GetArgPtr<bool>(1);
564
565 *reader_item = CopyOrMoveOut(writer_item, copy_or_move);
566 *reader_ok = true;
567
568 channel->waiters_.UnlockAndReleaseReader(reader);
569 channel->waiters_.UnlockAndReleaseWriter(writer);
570
571 DCHECK(channel->Invariants());
572 return true;
573 }
574
575 writer->selector->mu.Lock();
576 // We must guarantee that this case is eligible to proceed before any
577 // side effects can occur.
578 if (writer->selector->picked_case_index != Selector::kNonePicked) {
579 writer->selector->mu.Unlock();
580 // Already handled item
581 DVLOG(2) << "Write cancelled since another selector case done";
582 DCHECK(channel->Invariants());
583 return true;
584 }
585
586 // Is there room to buffer item?
587 if (channel->queue_.size() < channel->capacity_) {
588 DVLOG(2) << "Add to buffer";
589
590 T* absl_nonnull item = writer->GetCase()->GetArgPtr<T>(0);
591 const CopyOrMove copy_or_move =
592 *writer->GetCase()->GetArgPtr<CopyOrMove>(1);
593
594 channel->queue_.push_back(CopyOrMoveOut(item, copy_or_move));
595 channel->waiters_.UnlockAndReleaseWriter(writer);
596
597 DCHECK(channel->Invariants());
598 return true;
599 }
600
601 if (enqueue) {
602 // Register with waiting writers
603 DVLOG(2) << "Write waiting";
604 internal::PushBack(&channel->waiters_.writers, writer);
605 }
606
607 writer->selector->mu.Unlock();
608 DCHECK(channel->Invariants());
609 return false;
610}
611
612template <typename T>
613void WriteSelectable<T>::Unregister(CaseInSelectClause* absl_nonnull c) {
614 act::concurrency::impl::MutexLock lock(&channel->mu_);
615 internal::UnlinkFromList(&channel->waiters_.writers, c);
616}
617} // namespace thread::internal
618
619#endif // THREAD_FIBER_CHANNEL_H_
A Channel is a bounded FIFO queue that allows multiple readers and writers to communicate with each o...
Definition channel.h:278
Writer< T > *absl_nonnull writer()
Definition channel.h:324
Channel(size_t capacity)
Constructs a Channel with the given capacity.
Definition channel.h:287
Reader< T > *absl_nonnull reader()
Definition channel.h:311
size_t length() const
Definition channel.h:333
A Reader is used to read items from a Channel.
Definition channel.h:85
bool Read(T *absl_nonnull item)
Reads an item from the channel, blocking until it can be read. Returns false if and only if the chann...
Definition channel.h:406
Case OnRead(T *absl_nonnull item, bool *absl_nonnull ok)
Returns a Case that can be used to wait for and synchronise on reading an item from the channel.
Definition channel.h:411
A Writer is used to write items to a Channel.
Definition channel.h:151
Case OnWrite(T &&item)
Returns a Case that can be used to write the given item to the channel. The item is not mutated (move...
Definition channel.h:440
void Close()
Marks the channel as closed, notifying any waiting readers. See Reader::Read(). May be called at most...
Definition channel.h:430
Case OnWrite(const T &item)
Returns a Case that can be used to write the given item to the channel.
Definition channel.h:435
bool WriteUnlessCancelled(const T &item)
Returns false iff the calling fiber is cancelled before the value can be written.
Definition channel.h:445
void Write(T &&item)
Writes the given item to the channel, blocking until it can be written. If the channel is closed,...
Definition channel.h:425
void Write(const T &item)
Writes the given item to the channel, blocking until it can be written. If the channel is closed,...
Definition channel.h:420
bool WriteUnlessCancelled(T &&item)
Returns false iff the calling fiber is cancelled before the value can be written.
Definition channel.h:450
Provides the Select and SelectUntil functions for selecting a fiber to proceed if one of several case...
int Select(const CaseArray &cases)
Returns the index of the first case that is ready, blocking until one is.
Definition select.h:75
A Case represents a selectable case in a Select statement.
Definition cases.h:80