Action Engine
Loading...
Searching...
No Matches
singleflight.h
1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#ifndef ACTIONENGINE_DISTRIBUTED_SINGLEFLIGHT_H_
16#define ACTIONENGINE_DISTRIBUTED_SINGLEFLIGHT_H_
17
18#include <any>
19#include <vector>
20
21#include <absl/container/flat_hash_map.h>
22#include <absl/status/statusor.h>
23
25
26namespace act::distributed {
27
28class WaitGroup {
29 public:
30 WaitGroup() = default;
31
32 WaitGroup(const WaitGroup&) = delete;
33 WaitGroup& operator=(const WaitGroup&) = delete;
34
35 WaitGroup(WaitGroup&& other) noexcept;
36 WaitGroup& operator=(WaitGroup&& other) noexcept;
37
38 ~WaitGroup();
39
40 void Add(int delta);
41
42 void Done();
43
44 void Wait();
45
46 private:
47 mutable act::Mutex mu_;
48 act::CondVar cv_ ABSL_GUARDED_BY(mu_);
49 int counter_ ABSL_GUARDED_BY(mu_) = 0;
50};
51
52struct Call {
53 absl::StatusOr<std::any> val;
54 WaitGroup wg;
55};
56
57class FlightGroup {
58 public:
59 ~FlightGroup() { act::MutexLock lock(&mu_); }
60
61 absl::StatusOr<std::any> Do(
62 std::string_view key, absl::AnyInvocable<absl::StatusOr<std::any>()> fn);
63
64 template <typename T>
65 absl::StatusOr<T> Do(std::string_view key,
66 absl::AnyInvocable<absl::StatusOr<T>()> fn);
67
68 private:
69 mutable act::Mutex mu_;
70 absl::flat_hash_map<std::string, std::shared_ptr<Call>> calls_
71 ABSL_GUARDED_BY(mu_);
72};
73
74template <typename T>
75absl::StatusOr<T> FlightGroup::Do(std::string_view key,
76 absl::AnyInvocable<absl::StatusOr<T>()> fn) {
77 absl::StatusOr<std::any> result =
78 Do(key, [fn = std::move(fn)]() mutable -> absl::StatusOr<std::any> {
79 absl::StatusOr<T> val = std::move(fn)();
80 if (!val.ok()) {
81 return val.status();
82 }
83 LOG(INFO) << "SingleFlight: computed value: " << val.value();
84 return std::any(std::move(val.value()));
85 });
86 if (!result.ok()) {
87 return result.status();
88 }
89 if (std::any_cast<T>(&result.value()) == nullptr) {
90 return absl::InternalError("Type assertion failed in SingleFlight::Do");
91 }
92 return std::any_cast<T>(std::move(result.value()));
93}
94
95} // namespace act::distributed
96
97#endif // ACTIONENGINE_DISTRIBUTED_SINGLEFLIGHT_H_
Concurrency utilities for ActionEngine.