263  explicit Writer(
Channel<T>* absl_nonnull channel_state);
 
  278requires std::is_move_assignable_v<T> 
class Channel {
 
  288      : capacity_(capacity), closed_(false), rd_(this), wr_(this) {
 
  289    DCHECK(Invariants());
 
 
  298    act::concurrency::impl::MutexLock lock(&mu_);
 
  299    DCHECK(Invariants());
 
  333  [[nodiscard]] 
size_t length()
 const { 
return Length(); }
 
  336  friend struct internal::ReadSelectable<T>;
 
  337  friend struct internal::WriteSelectable<T>;
 
  343  bool Get(T* absl_nonnull dst);
 
  345  size_t Length()
 const {
 
  346    act::concurrency::impl::MutexLock lock(&mu_);
 
  347    return queue_.size();
 
  350  Case OnRead(T* absl_nonnull dst, 
bool* absl_nonnull ok) {
 
  351    return {&rd_, dst, ok};
 
  354  Case OnWrite(
const T& item) 
requires(std::is_copy_constructible_v<T>) {
 
  355    return {&wr_, &item, &internal::kCopy};
 
  358  Case OnWrite(T&& item) { 
return {&wr_, &item, &internal::kMove}; }
 
  360  const size_t capacity_;
 
  362  mutable act::concurrency::impl::Mutex mu_;
 
  363  std::deque<T> queue_ ABSL_GUARDED_BY(mu_);
 
  364  bool closed_ ABSL_GUARDED_BY(mu_);
 
  366  internal::ChannelWaiterState waiters_;
 
  368  Reader<T> reader_{
this};
 
  369  internal::ReadSelectable<T> rd_;
 
  371  Writer<T> writer_{
this};
 
  372  internal::WriteSelectable<T> wr_;
 
  374  bool Invariants() const ABSL_EXCLUSIVE_LOCKS_REQUIRED
 
  377    CHECK_LE(queue_.size(), capacity_) << 
"Channel queue size exceeds capacity";
 
 
  383requires std::is_move_assignable_v<T> 
void Channel<T>::Close() {
 
  384  act::concurrency::impl::MutexLock lock(&mu_);
 
  385  DCHECK(Invariants());
 
  386  CHECK(!closed_) << 
"Calling Close() on closed channel";
 
  387  CHECK(waiters_.writers == 
nullptr)
 
  388      << 
"Calling Close() on a channel with blocked writers";
 
  390  this->waiters_.CloseAndReleaseReaders();
 
  391  DCHECK(Invariants());
 
  395requires std::is_move_assignable_v<T> 
bool Channel<T>::Get(
 
  396    T* absl_nonnull dst) {
 
  398  Select({OnRead(dst, &result)});
 
  403Reader<T>::Reader(
Channel<T>* absl_nonnull channel) : channel_(channel) {}
 
  407  return channel_->Get(item);
 
 
  412  return channel_->OnRead(item, ok);
 
 
  416Writer<T>::Writer(
Channel<T>* absl_nonnull channel_state)
 
  417    : channel_(channel_state) {}
 
  436  return channel_->OnWrite(item);
 
 
  441  return channel_->OnWrite(std::move(item));
 
 
  446  return !Cancelled() && 
Select({OnCancel(), 
OnWrite(item)}) == 1;
 
 
  451  return !Cancelled() && 
Select({OnCancel(), 
OnWrite(std::move(item))}) == 1;
 
 
 
  455namespace thread::internal {
 
  457bool ReadSelectable<T>::Handle(CaseInSelectClause* absl_nonnull reader,
 
  460  act::concurrency::impl::MutexLock lock(&channel->mu_);
 
  461  DCHECK(channel->Invariants());
 
  463  T* absl_nonnull dst_item = reader->GetCase()->GetArgPtr<T>(0);
 
  464  bool* absl_nonnull dst_ok = reader->GetCase()->GetArgPtr<
bool>(1);
 
  467  if (!channel->queue_.empty()) {
 
  468    DVLOG(2) << 
"Get from buffer";
 
  469    reader->selector->mu.Lock();
 
  470    if (reader->selector->picked_case_index == Selector::kNonePicked) {
 
  475      *dst_item = std::move(channel->queue_.front());
 
  476      channel->queue_.pop_front();
 
  478      channel->waiters_.UnlockAndReleaseReader(reader);
 
  481      if (CaseInSelectClause * absl_nonnull unblocked_writer;
 
  482          channel->waiters_.GetWaitingWriter(&unblocked_writer)) {
 
  483        auto* absl_nonnull item = unblocked_writer->GetCase()->GetArgPtr<T>(0);
 
  485            *unblocked_writer->GetCase()->GetArgPtr<CopyOrMove>(1);
 
  487        channel->queue_.push_back(CopyOrMoveOut(item, copy_or_move));
 
  488        channel->waiters_.UnlockAndReleaseWriter(unblocked_writer);
 
  493      reader->selector->mu.Unlock();
 
  495    DCHECK(channel->Invariants());
 
  500  if (CaseInSelectClause * absl_nonnull writer;
 
  501      channel->waiters_.GetMatchingWriter(reader, &writer)) {
 
  502    auto* absl_nonnull item = writer->GetCase()->GetArgPtr<T>(0);
 
  503    auto copy_or_move = *writer->GetCase()->GetArgPtr<CopyOrMove>(1);
 
  505    *dst_item = CopyOrMoveOut(item, copy_or_move);
 
  508    channel->waiters_.UnlockAndReleaseReader(reader);
 
  509    channel->waiters_.UnlockAndReleaseWriter(writer);
 
  510    DCHECK(channel->Invariants());
 
  513  reader->selector->mu.Lock();
 
  516  if (reader->selector->picked_case_index != Selector::kNonePicked) {
 
  517    reader->selector->mu.Unlock();
 
  519    DVLOG(2) << 
"Read cancelled since another selector case done";
 
  520    DCHECK(channel->Invariants());
 
  524  if (channel->closed_) {
 
  525    DVLOG(2) << 
"Read failing because channel closed";
 
  527    channel->waiters_.UnlockAndReleaseReader(reader);
 
  533    DVLOG(2) << 
"Read waiting";
 
  534    internal::PushBack(&channel->waiters_.readers, reader);
 
  537  reader->selector->mu.Unlock();
 
  538  DCHECK(channel->Invariants());
 
  543void ReadSelectable<T>::Unregister(CaseInSelectClause* absl_nonnull c) {
 
  544  act::concurrency::impl::MutexLock lock(&channel->mu_);
 
  545  internal::UnlinkFromList(&channel->waiters_.readers, c);
 
  549bool WriteSelectable<T>::Handle(CaseInSelectClause* absl_nonnull writer,
 
  552  act::concurrency::impl::MutexLock lock(&channel->mu_);
 
  553  DCHECK(channel->Invariants());
 
  554  CHECK(!channel->closed_) << 
"Calling Write() on closed channel";
 
  557  if (CaseInSelectClause * absl_nonnull reader;
 
  558      channel->waiters_.GetMatchingReader(writer, &reader)) {
 
  559    auto* absl_nonnull writer_item = writer->GetCase()->GetArgPtr<T>(0);
 
  560    auto copy_or_move = *writer->GetCase()->GetArgPtr<CopyOrMove>(1);
 
  562    auto* absl_nonnull reader_item = reader->GetCase()->GetArgPtr<T>(0);
 
  563    bool* absl_nonnull reader_ok = reader->GetCase()->GetArgPtr<
bool>(1);
 
  565    *reader_item = CopyOrMoveOut(writer_item, copy_or_move);
 
  568    channel->waiters_.UnlockAndReleaseReader(reader);
 
  569    channel->waiters_.UnlockAndReleaseWriter(writer);
 
  571    DCHECK(channel->Invariants());
 
  575  writer->selector->mu.Lock();
 
  578  if (writer->selector->picked_case_index != Selector::kNonePicked) {
 
  579    writer->selector->mu.Unlock();
 
  581    DVLOG(2) << 
"Write cancelled since another selector case done";
 
  582    DCHECK(channel->Invariants());
 
  587  if (channel->queue_.size() < channel->capacity_) {
 
  588    DVLOG(2) << 
"Add to buffer";
 
  590    T* absl_nonnull item = writer->GetCase()->GetArgPtr<T>(0);
 
  591    const CopyOrMove copy_or_move =
 
  592        *writer->GetCase()->GetArgPtr<CopyOrMove>(1);
 
  594    channel->queue_.push_back(CopyOrMoveOut(item, copy_or_move));
 
  595    channel->waiters_.UnlockAndReleaseWriter(writer);
 
  597    DCHECK(channel->Invariants());
 
  603    DVLOG(2) << 
"Write waiting";
 
  604    internal::PushBack(&channel->waiters_.writers, writer);
 
  607  writer->selector->mu.Unlock();
 
  608  DCHECK(channel->Invariants());
 
  613void WriteSelectable<T>::Unregister(CaseInSelectClause* absl_nonnull c) {
 
  614  act::concurrency::impl::MutexLock lock(&channel->mu_);
 
  615  internal::UnlinkFromList(&channel->waiters_.writers, c);