Action Engine
Loading...
Searching...
No Matches
fiber.h
1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#ifndef THREAD_FIBER_FIBER_H_
16#define THREAD_FIBER_FIBER_H_
17
18#include <atomic>
19#include <concepts>
20#include <cstdint>
21#include <memory>
22#include <type_traits>
23#include <utility>
24
25#include <absl/log/check.h>
26
27#include "thread/boost_primitives.h"
28#include "thread/selectables.h"
29
30namespace thread {
31class TreeOptions {};
32
33using InvocableWork = absl::AnyInvocable<void() &&>;
34
35template <typename F>
36concept InvocableWithNoArgsAndReturnsVoid =
37 std::is_invocable_v<std::decay_t<F>> &&
38 std::is_same_v<std::invoke_result_t<F>, void>;
39
40template <typename F>
41InvocableWork MakeInvocable(F&& f) requires
42 InvocableWithNoArgsAndReturnsVoid<F> {
43 return InvocableWork(std::forward<F>(f));
44}
45
46class Fiber;
47
48namespace internal {
49std::unique_ptr<Fiber> CreateTree(InvocableWork f, TreeOptions&& tree_options);
50}
51
52Fiber* absl_nullable GetPerThreadFiberPtr();
53
54class FiberProperties;
55
56class Fiber {
57 public:
58 friend class FiberProperties;
59
60 template <typename F>
61 explicit Fiber(F&& f) requires InvocableWithNoArgsAndReturnsVoid<F>
62 : Fiber(Unstarted{}, {std::forward<F>(f)}) {
63 Start();
64 }
65
66 Fiber(const Fiber&) = delete;
67 Fiber& operator=(const Fiber&) = delete;
68
69 // REQUIRES: Join() must have been called.
70 ~Fiber();
71
72 // Return a pointer to the currently running fiber.
73 static Fiber* absl_nonnull Current();
74
75 void Cancel();
76 void Join();
77
78 bool Cancelled() const { return cancellation_.HasBeenNotified(); }
79
80 Case OnCancel() const { return cancellation_.OnEvent(); }
81
82 Case OnJoinable() const { return joinable_.OnEvent(); }
83
84 private:
85 // No internal constructor starts the fiber. It is the caller's responsibility
86 // to call Start() on the fiber.
87 struct Unstarted {};
88
89 // Internal constructor for root fibers.
90 explicit Fiber(Unstarted, InvocableWork work, TreeOptions&& tree_options);
91 // Internal constructor for child fibers.
92 explicit Fiber(Unstarted, InvocableWork work,
93 Fiber* absl_nonnull parent = Current());
94
95 void Start();
96 bool MarkFinished();
97 void MarkJoined();
98 void InternalJoin();
99
100 void PushBackChild(Fiber* absl_nonnull child)
101
102 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
103 if (first_child_ == nullptr) {
104 first_child_ = child;
105 } else {
106 child->prev_sibling_ = child;
107 child->next_sibling_ = first_child_;
108 first_child_->prev_sibling_ = child;
109 }
110 }
111
112 void UnlinkChild(const Fiber* absl_nonnull child)
113
114 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
115 if (child->next_sibling_ == child) {
116 DCHECK(first_child_ == child)
117 << "Unlinking a child that's the \"only\" sibling on its level, but "
118 "is not the first child.";
119 first_child_ = nullptr;
120 return;
121 }
122
123 child->next_sibling_->prev_sibling_ = child->prev_sibling_;
124 child->prev_sibling_->next_sibling_ = child->next_sibling_;
125 if (first_child_ == child) {
126 first_child_ = child->next_sibling_;
127 }
128 }
129
130 mutable act::concurrency::impl::Mutex mu_;
131
132 InvocableWork work_;
133 FiberProperties* absl_nullable properties_ ABSL_GUARDED_BY(mu_) = nullptr;
134
135 // Whether this Fiber is self-joining. This is always set under lock, but is
136 // an atomic to allow for reads during stats collection which cannot acquire
137 // mutexes.
138 std::atomic<bool> detached_ ABSL_GUARDED_BY(mu_) = false;
139
140 enum State : uint8_t { RUNNING, FINISHED, JOINED };
141
142 State state_ ABSL_GUARDED_BY(mu_) = RUNNING;
143
144 Fiber* absl_nullable const parent_;
145 Fiber* absl_nullable first_child_ ABSL_GUARDED_BY(mu_) = nullptr;
146 Fiber* absl_nullable next_sibling_;
147 Fiber* absl_nullable prev_sibling_;
148
149 PermanentEvent cancellation_;
150 PermanentEvent joinable_;
151
152 friend std::unique_ptr<Fiber> internal::CreateTree(
153 InvocableWork f, TreeOptions&& tree_options);
154
155 friend class act::concurrency::impl::CondVar;
156
157 friend struct ThreadLocalFiber;
158 friend bool IsFiberDetached(const Fiber* absl_nonnull fiber);
159 friend void Detach(std::unique_ptr<Fiber> fiber);
160};
161
162class FiberProperties final : public boost::fibers::fiber_properties {
163 public:
164 friend class Fiber;
165 friend class act::concurrency::impl::CondVar;
166
167 explicit FiberProperties(boost::fibers::context* absl_nonnull ctx) = delete;
168
169 explicit FiberProperties(Fiber* absl_nonnull fiber)
170 : boost::fibers::fiber_properties(nullptr), fiber_(fiber) {}
171
172 [[nodiscard]] Fiber* absl_nullable GetFiber() const { return fiber_; }
173
174 private:
175 Fiber* absl_nullable fiber_ = nullptr;
176 act::concurrency::impl::CondVar* absl_nullable waiting_on_
177 ABSL_GUARDED_BY(fiber_->mu_) = nullptr;
178};
179
180namespace internal {
181inline std::unique_ptr<Fiber> CreateTree(InvocableWork f,
182 TreeOptions&& tree_options) {
183 const auto fiber =
184 new Fiber(Fiber::Unstarted{}, std::move(f), std::move(tree_options));
185 fiber->Start();
186 return absl::WrapUnique(fiber);
187}
188} // namespace internal
189
190template <typename F>
191[[nodiscard]] std::unique_ptr<Fiber> NewTree(TreeOptions tree_options, F&& f) {
192 return internal::CreateTree(MakeInvocable(std::forward<F>(f)),
193 std::move(tree_options));
194}
195
196inline void Detach(std::unique_ptr<Fiber> fiber) {
197 {
198 act::concurrency::impl::MutexLock lock(&fiber->mu_);
199 DCHECK(!fiber->detached_.load(std::memory_order_relaxed))
200 << "Detach() called on already detached fiber, this should not be "
201 "possible without calling WrapUnique or similar on a Fiber* you do "
202 "not own.";
203 // If the fiber is FINISHED, we need to join it since it has passed the
204 // point where it would be self joined and deleted if detached.
205 if (fiber->state_ != Fiber::FINISHED) {
206 fiber->detached_.store(true, std::memory_order_relaxed);
207 fiber.release(); // Fiber will delete itself.
208 }
209 }
210 if (ABSL_PREDICT_FALSE(fiber != nullptr)) {
211 fiber->InternalJoin();
212 }
213}
214
215FiberProperties* absl_nullable GetCurrentFiberProperties();
216
217template <typename F>
218void Detach(TreeOptions tree_options, F&& f) {
219 Detach(NewTree(std::move(tree_options), std::forward<F>(f)));
220}
221
222inline bool Cancelled() {
223 const Fiber* fiber_ptr = GetPerThreadFiberPtr();
224 if (fiber_ptr == nullptr) {
225 // Only threads which are already fibers could be cancelled.
226 return false;
227 }
228 return fiber_ptr->Cancelled();
229}
230
231inline Case OnCancel() {
232 const Fiber* current_fiber = Fiber::Current();
233 if (current_fiber == nullptr) {
234 return NonSelectableCase();
235 }
236 return current_fiber->OnCancel();
237}
238} // namespace thread
239
240#endif // THREAD_FIBER_FIBER_H_
A Case represents a selectable case in a Select statement.
Definition cases.h:80