15#ifndef ACTIONENGINE_DISTRIBUTED_SINGLEFLIGHT_H_ 
   16#define ACTIONENGINE_DISTRIBUTED_SINGLEFLIGHT_H_ 
   21#include <absl/container/flat_hash_map.h> 
   22#include <absl/status/statusor.h> 
   26namespace act::distributed {
 
   30  WaitGroup() = 
default;
 
   32  WaitGroup(
const WaitGroup&) = 
delete;
 
   33  WaitGroup& operator=(
const WaitGroup&) = 
delete;
 
   35  WaitGroup(WaitGroup&& other) 
noexcept;
 
   36  WaitGroup& operator=(WaitGroup&& other) 
noexcept;
 
   47  mutable act::Mutex mu_;
 
   48  act::CondVar cv_ ABSL_GUARDED_BY(mu_);
 
   49  int counter_ ABSL_GUARDED_BY(mu_) = 0;
 
   53  absl::StatusOr<std::any> val;
 
   59  ~FlightGroup() { act::MutexLock lock(&mu_); }
 
   61  absl::StatusOr<std::any> Do(
 
   62      std::string_view key, absl::AnyInvocable<absl::StatusOr<std::any>()> fn);
 
   65  absl::StatusOr<T> Do(std::string_view key,
 
   66                       absl::AnyInvocable<absl::StatusOr<T>()> fn);
 
   69  mutable act::Mutex mu_;
 
   70  absl::flat_hash_map<std::string, std::shared_ptr<Call>> calls_
 
   75absl::StatusOr<T> FlightGroup::Do(std::string_view key,
 
   76                                  absl::AnyInvocable<absl::StatusOr<T>()> fn) {
 
   77  absl::StatusOr<std::any> result =
 
   78      Do(key, [fn = std::move(fn)]() 
mutable -> absl::StatusOr<std::any> {
 
   79        absl::StatusOr<T> val = std::move(fn)();
 
   83        LOG(INFO) << 
"SingleFlight: computed value: " << val.value();
 
   84        return std::any(std::move(val.value()));
 
   87    return result.status();
 
   89  if (std::any_cast<T>(&result.value()) == 
nullptr) {
 
   90    return absl::InternalError(
"Type assertion failed in SingleFlight::Do");
 
   92  return std::any_cast<T>(std::move(result.value()));
 
Concurrency utilities for ActionEngine.