corosio


Directory: ../corosio_lcov_PR-138/
Coverage: low: ≥ 0% medium: ≥ 75.0% high: ≥ 90.0%
Coverage Exec / Excl / Total
Lines: 80.9% 402 / 0 / 497
Functions: 89.6% 43 / 0 / 48
Branches: 68.6% 208 / 0 / 303

src/corosio/src/detail/epoll/scheduler.cpp
Line Branch Exec Source
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_EPOLL
13
14 #include "src/detail/epoll/scheduler.hpp"
15 #include "src/detail/epoll/op.hpp"
16 #include "src/detail/timer_service.hpp"
17 #include "src/detail/make_err.hpp"
18 #include "src/detail/posix/resolver_service.hpp"
19 #include "src/detail/posix/signals.hpp"
20
21 #include <boost/corosio/detail/except.hpp>
22 #include <boost/corosio/detail/thread_local_ptr.hpp>
23
24 #include <chrono>
25 #include <limits>
26 #include <utility>
27
28 #include <errno.h>
29 #include <fcntl.h>
30 #include <sys/epoll.h>
31 #include <sys/eventfd.h>
32 #include <sys/socket.h>
33 #include <sys/timerfd.h>
34 #include <unistd.h>
35
36 /*
37 epoll Scheduler - Single Reactor Model
38 ======================================
39
40 This scheduler uses a thread coordination strategy to provide handler
41 parallelism and avoid the thundering herd problem.
42 Instead of all threads blocking on epoll_wait(), one thread becomes the
43 "reactor" while others wait on a condition variable for handler work.
44
45 Thread Model
46 ------------
47 - ONE thread runs epoll_wait() at a time (the reactor thread)
48 - OTHER threads wait on cond_ (condition variable) for handlers
49 - When work is posted, exactly one waiting thread wakes via notify_one()
50 - This matches Windows IOCP semantics where N posted items wake N threads
51
52 Event Loop Structure (do_one)
53 -----------------------------
54 1. Lock mutex, try to pop handler from queue
55 2. If got handler: execute it (unlocked), return
56 3. If queue empty and no reactor running: become reactor
57 - Run epoll_wait (unlocked), queue I/O completions, loop back
58 4. If queue empty and reactor running: wait on condvar for work
59
60 The task_running_ flag ensures only one thread owns epoll_wait().
61 After the reactor queues I/O completions, it loops back to try getting
62 a handler, giving priority to handler execution over more I/O polling.
63
64 Signaling State (state_)
65 ------------------------
66 The state_ variable encodes two pieces of information:
67 - Bit 0: signaled flag (1 = signaled, persists until cleared)
68 - Upper bits: waiter count (each waiter adds 2 before blocking)
69
70 This allows efficient coordination:
71 - Signalers only call notify when waiters exist (state_ > 1)
72 - Waiters check if already signaled before blocking (fast-path)
73
74 Wake Coordination (wake_one_thread_and_unlock)
75 ----------------------------------------------
76 When posting work:
77 - If waiters exist (state_ > 1): signal and notify_one()
78 - Else if reactor running: interrupt via eventfd write
79 - Else: no-op (thread will find work when it checks queue)
80
81 This avoids waking threads unnecessarily. With cascading wakes,
82 each handler execution wakes at most one additional thread if
83 more work exists in the queue.
84
85 Work Counting
86 -------------
87 outstanding_work_ tracks pending operations. When it hits zero, run()
88 returns. Each operation increments on start, decrements on completion.
89
90 Timer Integration
91 -----------------
92 Timers are handled by timer_service. The reactor adjusts epoll_wait
93 timeout to wake for the nearest timer expiry. When a new timer is
94 scheduled earlier than current, timer_service calls interrupt_reactor()
95 to re-evaluate the timeout.
96 */
97
98 namespace boost::corosio::detail {
99
100 struct scheduler_context
101 {
102 epoll_scheduler const* key;
103 scheduler_context* next;
104 op_queue private_queue;
105 long private_outstanding_work;
106 int inline_budget;
107
108 183 scheduler_context(epoll_scheduler const* k, scheduler_context* n)
109 183 : key(k)
110 183 , next(n)
111 183 , private_outstanding_work(0)
112 183 , inline_budget(0)
113 {
114 183 }
115 };
116
117 namespace {
118
119 corosio::detail::thread_local_ptr<scheduler_context> context_stack;
120
121 struct thread_context_guard
122 {
123 scheduler_context frame_;
124
125 183 explicit thread_context_guard(
126 epoll_scheduler const* ctx) noexcept
127 183 : frame_(ctx, context_stack.get())
128 {
129 183 context_stack.set(&frame_);
130 183 }
131
132 183 ~thread_context_guard() noexcept
133 {
134
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 183 times.
183 if (!frame_.private_queue.empty())
135 frame_.key->drain_thread_queue(frame_.private_queue, frame_.private_outstanding_work);
136 183 context_stack.set(frame_.next);
137 183 }
138 };
139
140 scheduler_context*
141 472968 find_context(epoll_scheduler const* self) noexcept
142 {
143
2/2
✓ Branch 1 taken 471289 times.
✓ Branch 2 taken 1679 times.
472968 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
144
1/2
✓ Branch 0 taken 471289 times.
✗ Branch 1 not taken.
471289 if (c->key == self)
145 471289 return c;
146 1679 return nullptr;
147 }
148
149 } // namespace
150
151 void
152 88844 epoll_scheduler::
153 reset_inline_budget() const noexcept
154 {
155
1/2
✓ Branch 1 taken 88844 times.
✗ Branch 2 not taken.
88844 if (auto* ctx = find_context(this))
156 88844 ctx->inline_budget = max_inline_budget_;
157 88844 }
158
159 bool
160 237761 epoll_scheduler::
161 try_consume_inline_budget() const noexcept
162 {
163
1/2
✓ Branch 1 taken 237761 times.
✗ Branch 2 not taken.
237761 if (auto* ctx = find_context(this))
164 {
165
2/2
✓ Branch 0 taken 158564 times.
✓ Branch 1 taken 79197 times.
237761 if (ctx->inline_budget > 0)
166 {
167 158564 --ctx->inline_budget;
168 158564 return true;
169 }
170 }
171 79197 return false;
172 }
173
174 void
175 64641 descriptor_state::
176 operator()()
177 {
178 64641 is_enqueued_.store(false, std::memory_order_relaxed);
179
180 // Take ownership of impl ref set by close_socket() to prevent
181 // the owning impl from being freed while we're executing
182 64641 auto prevent_impl_destruction = std::move(impl_ref_);
183
184 64641 std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
185
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 64641 times.
64641 if (ev == 0)
186 {
187 scheduler_->compensating_work_started();
188 return;
189 }
190
191 64641 op_queue local_ops;
192
193 64641 int err = 0;
194
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 64640 times.
64641 if (ev & EPOLLERR)
195 {
196 1 socklen_t len = sizeof(err);
197
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
198 err = errno;
199
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 if (err == 0)
200 1 err = EIO;
201 }
202
203 {
204
1/1
✓ Branch 1 taken 64641 times.
64641 std::lock_guard lock(mutex);
205
2/2
✓ Branch 0 taken 20218 times.
✓ Branch 1 taken 44423 times.
64641 if (ev & EPOLLIN)
206 {
207
2/2
✓ Branch 0 taken 4719 times.
✓ Branch 1 taken 15499 times.
20218 if (read_op)
208 {
209 4719 auto* rd = read_op;
210
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4719 times.
4719 if (err)
211 rd->complete(err, 0);
212 else
213 4719 rd->perform_io();
214
215
2/4
✓ Branch 0 taken 4719 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 4719 times.
4719 if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
216 {
217 rd->errn = 0;
218 }
219 else
220 {
221 4719 read_op = nullptr;
222 4719 local_ops.push(rd);
223 }
224 }
225 else
226 {
227 15499 read_ready = true;
228 }
229 }
230
2/2
✓ Branch 0 taken 59926 times.
✓ Branch 1 taken 4715 times.
64641 if (ev & EPOLLOUT)
231 {
232
3/4
✓ Branch 0 taken 55208 times.
✓ Branch 1 taken 4718 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 55208 times.
59926 bool had_write_op = (connect_op || write_op);
233
2/2
✓ Branch 0 taken 4718 times.
✓ Branch 1 taken 55208 times.
59926 if (connect_op)
234 {
235 4718 auto* cn = connect_op;
236
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4718 times.
4718 if (err)
237 cn->complete(err, 0);
238 else
239 4718 cn->perform_io();
240 4718 connect_op = nullptr;
241 4718 local_ops.push(cn);
242 }
243
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 59926 times.
59926 if (write_op)
244 {
245 auto* wr = write_op;
246 if (err)
247 wr->complete(err, 0);
248 else
249 wr->perform_io();
250
251 if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
252 {
253 wr->errn = 0;
254 }
255 else
256 {
257 write_op = nullptr;
258 local_ops.push(wr);
259 }
260 }
261
2/2
✓ Branch 0 taken 55208 times.
✓ Branch 1 taken 4718 times.
59926 if (!had_write_op)
262 55208 write_ready = true;
263 }
264
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 64640 times.
64641 if (err)
265 {
266
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if (read_op)
267 {
268 read_op->complete(err, 0);
269 local_ops.push(std::exchange(read_op, nullptr));
270 }
271
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if (write_op)
272 {
273 write_op->complete(err, 0);
274 local_ops.push(std::exchange(write_op, nullptr));
275 }
276
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if (connect_op)
277 {
278 connect_op->complete(err, 0);
279 local_ops.push(std::exchange(connect_op, nullptr));
280 }
281 }
282 64641 }
283
284 // Execute first handler inline — the scheduler's work_cleanup
285 // accounts for this as the "consumed" work item
286 64641 scheduler_op* first = local_ops.pop();
287
2/2
✓ Branch 0 taken 9437 times.
✓ Branch 1 taken 55204 times.
64641 if (first)
288 {
289
1/1
✓ Branch 1 taken 9437 times.
9437 scheduler_->post_deferred_completions(local_ops);
290
1/1
✓ Branch 1 taken 9437 times.
9437 (*first)();
291 }
292 else
293 {
294 55204 scheduler_->compensating_work_started();
295 }
296 64641 }
297
298 203 epoll_scheduler::
299 epoll_scheduler(
300 capy::execution_context& ctx,
301 203 int)
302 203 : epoll_fd_(-1)
303 203 , event_fd_(-1)
304 203 , timer_fd_(-1)
305 203 , outstanding_work_(0)
306 203 , stopped_(false)
307 203 , shutdown_(false)
308 203 , task_running_{false}
309 203 , task_interrupted_(false)
310 406 , state_(0)
311 {
312 203 epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
313
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 203 times.
203 if (epoll_fd_ < 0)
314 detail::throw_system_error(make_err(errno), "epoll_create1");
315
316 203 event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
317
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 203 times.
203 if (event_fd_ < 0)
318 {
319 int errn = errno;
320 ::close(epoll_fd_);
321 detail::throw_system_error(make_err(errn), "eventfd");
322 }
323
324 203 timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
325
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 203 times.
203 if (timer_fd_ < 0)
326 {
327 int errn = errno;
328 ::close(event_fd_);
329 ::close(epoll_fd_);
330 detail::throw_system_error(make_err(errn), "timerfd_create");
331 }
332
333 203 epoll_event ev{};
334 203 ev.events = EPOLLIN | EPOLLET;
335 203 ev.data.ptr = nullptr;
336
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 203 times.
203 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
337 {
338 int errn = errno;
339 ::close(timer_fd_);
340 ::close(event_fd_);
341 ::close(epoll_fd_);
342 detail::throw_system_error(make_err(errn), "epoll_ctl");
343 }
344
345 203 epoll_event timer_ev{};
346 203 timer_ev.events = EPOLLIN | EPOLLERR;
347 203 timer_ev.data.ptr = &timer_fd_;
348
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 203 times.
203 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
349 {
350 int errn = errno;
351 ::close(timer_fd_);
352 ::close(event_fd_);
353 ::close(epoll_fd_);
354 detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
355 }
356
357
1/1
✓ Branch 1 taken 203 times.
203 timer_svc_ = &get_timer_service(ctx, *this);
358
1/1
✓ Branch 3 taken 203 times.
203 timer_svc_->set_on_earliest_changed(
359 timer_service::callback(
360 this,
361 [](void* p) {
362 4933 auto* self = static_cast<epoll_scheduler*>(p);
363 4933 self->timerfd_stale_.store(true, std::memory_order_release);
364
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4933 times.
4933 if (self->task_running_.load(std::memory_order_acquire))
365 self->interrupt_reactor();
366 4933 }));
367
368 // Initialize resolver service
369
1/1
✓ Branch 1 taken 203 times.
203 get_resolver_service(ctx, *this);
370
371 // Initialize signal service
372
1/1
✓ Branch 1 taken 203 times.
203 get_signal_service(ctx, *this);
373
374 // Push task sentinel to interleave reactor runs with handler execution
375 203 completed_ops_.push(&task_op_);
376 203 }
377
378 406 epoll_scheduler::
379 203 ~epoll_scheduler()
380 {
381
1/2
✓ Branch 0 taken 203 times.
✗ Branch 1 not taken.
203 if (timer_fd_ >= 0)
382 203 ::close(timer_fd_);
383
1/2
✓ Branch 0 taken 203 times.
✗ Branch 1 not taken.
203 if (event_fd_ >= 0)
384 203 ::close(event_fd_);
385
1/2
✓ Branch 0 taken 203 times.
✗ Branch 1 not taken.
203 if (epoll_fd_ >= 0)
386 203 ::close(epoll_fd_);
387 406 }
388
389 void
390 203 epoll_scheduler::
391 shutdown()
392 {
393 {
394
1/1
✓ Branch 1 taken 203 times.
203 std::unique_lock lock(mutex_);
395 203 shutdown_ = true;
396
397
2/2
✓ Branch 1 taken 203 times.
✓ Branch 2 taken 203 times.
406 while (auto* h = completed_ops_.pop())
398 {
399
1/2
✓ Branch 0 taken 203 times.
✗ Branch 1 not taken.
203 if (h == &task_op_)
400 203 continue;
401 lock.unlock();
402 h->destroy();
403 lock.lock();
404 203 }
405
406 203 signal_all(lock);
407 203 }
408
409 203 outstanding_work_.store(0, std::memory_order_release);
410
411
1/2
✓ Branch 0 taken 203 times.
✗ Branch 1 not taken.
203 if (event_fd_ >= 0)
412 203 interrupt_reactor();
413 203 }
414
415 void
416 6767 epoll_scheduler::
417 post(std::coroutine_handle<> h) const
418 {
419 struct post_handler final
420 : scheduler_op
421 {
422 std::coroutine_handle<> h_;
423
424 explicit
425 6767 post_handler(std::coroutine_handle<> h)
426 6767 : h_(h)
427 {
428 6767 }
429
430 13534 ~post_handler() = default;
431
432 6767 void operator()() override
433 {
434 6767 auto h = h_;
435
1/2
✓ Branch 0 taken 6767 times.
✗ Branch 1 not taken.
6767 delete this;
436
1/1
✓ Branch 1 taken 6767 times.
6767 h.resume();
437 6767 }
438
439 void destroy() override
440 {
441 delete this;
442 }
443 };
444
445
1/1
✓ Branch 1 taken 6767 times.
6767 auto ph = std::make_unique<post_handler>(h);
446
447 // Fast path: same thread posts to private queue
448 // Only count locally; work_cleanup batches to global counter
449
2/2
✓ Branch 1 taken 5114 times.
✓ Branch 2 taken 1653 times.
6767 if (auto* ctx = find_context(this))
450 {
451 5114 ++ctx->private_outstanding_work;
452 5114 ctx->private_queue.push(ph.release());
453 5114 return;
454 }
455
456 // Slow path: cross-thread post requires mutex
457 1653 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
458
459
1/1
✓ Branch 1 taken 1653 times.
1653 std::unique_lock lock(mutex_);
460 1653 completed_ops_.push(ph.release());
461
1/1
✓ Branch 1 taken 1653 times.
1653 wake_one_thread_and_unlock(lock);
462 6767 }
463
464 void
465 84392 epoll_scheduler::
466 post(scheduler_op* h) const
467 {
468 // Fast path: same thread posts to private queue
469 // Only count locally; work_cleanup batches to global counter
470
2/2
✓ Branch 1 taken 84366 times.
✓ Branch 2 taken 26 times.
84392 if (auto* ctx = find_context(this))
471 {
472 84366 ++ctx->private_outstanding_work;
473 84366 ctx->private_queue.push(h);
474 84366 return;
475 }
476
477 // Slow path: cross-thread post requires mutex
478 26 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
479
480
1/1
✓ Branch 1 taken 26 times.
26 std::unique_lock lock(mutex_);
481 26 completed_ops_.push(h);
482
1/1
✓ Branch 1 taken 26 times.
26 wake_one_thread_and_unlock(lock);
483 26 }
484
485 void
486 5660 epoll_scheduler::
487 on_work_started() noexcept
488 {
489 5660 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
490 5660 }
491
492 void
493 5628 epoll_scheduler::
494 on_work_finished() noexcept
495 {
496
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5628 times.
11256 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
497 stop();
498 5628 }
499
500 bool
501 159263 epoll_scheduler::
502 running_in_this_thread() const noexcept
503 {
504
2/2
✓ Branch 1 taken 159023 times.
✓ Branch 2 taken 240 times.
159263 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
505
1/2
✓ Branch 0 taken 159023 times.
✗ Branch 1 not taken.
159023 if (c->key == this)
506 159023 return true;
507 240 return false;
508 }
509
510 void
511 43 epoll_scheduler::
512 stop()
513 {
514
1/1
✓ Branch 1 taken 43 times.
43 std::unique_lock lock(mutex_);
515
2/2
✓ Branch 0 taken 22 times.
✓ Branch 1 taken 21 times.
43 if (!stopped_)
516 {
517 22 stopped_ = true;
518 22 signal_all(lock);
519
1/1
✓ Branch 1 taken 22 times.
22 interrupt_reactor();
520 }
521 43 }
522
523 bool
524 18 epoll_scheduler::
525 stopped() const noexcept
526 {
527 18 std::unique_lock lock(mutex_);
528 36 return stopped_;
529 18 }
530
531 void
532 49 epoll_scheduler::
533 restart()
534 {
535
1/1
✓ Branch 1 taken 49 times.
49 std::unique_lock lock(mutex_);
536 49 stopped_ = false;
537 49 }
538
539 std::size_t
540 183 epoll_scheduler::
541 run()
542 {
543
2/2
✓ Branch 1 taken 32 times.
✓ Branch 2 taken 151 times.
366 if (outstanding_work_.load(std::memory_order_acquire) == 0)
544 {
545
1/1
✓ Branch 1 taken 32 times.
32 stop();
546 32 return 0;
547 }
548
549 151 thread_context_guard ctx(this);
550
1/1
✓ Branch 1 taken 151 times.
151 std::unique_lock lock(mutex_);
551
552 151 std::size_t n = 0;
553 for (;;)
554 {
555
3/3
✓ Branch 1 taken 155918 times.
✓ Branch 3 taken 151 times.
✓ Branch 4 taken 155767 times.
155918 if (!do_one(lock, -1, &ctx.frame_))
556 151 break;
557
1/2
✓ Branch 1 taken 155767 times.
✗ Branch 2 not taken.
155767 if (n != (std::numeric_limits<std::size_t>::max)())
558 155767 ++n;
559
2/2
✓ Branch 1 taken 71326 times.
✓ Branch 2 taken 84441 times.
155767 if (!lock.owns_lock())
560
1/1
✓ Branch 1 taken 71326 times.
71326 lock.lock();
561 }
562 151 return n;
563 151 }
564
565 std::size_t
566 2 epoll_scheduler::
567 run_one()
568 {
569
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
570 {
571 stop();
572 return 0;
573 }
574
575 2 thread_context_guard ctx(this);
576
1/1
✓ Branch 1 taken 2 times.
2 std::unique_lock lock(mutex_);
577
1/1
✓ Branch 1 taken 2 times.
2 return do_one(lock, -1, &ctx.frame_);
578 2 }
579
580 std::size_t
581 34 epoll_scheduler::
582 wait_one(long usec)
583 {
584
2/2
✓ Branch 1 taken 7 times.
✓ Branch 2 taken 27 times.
68 if (outstanding_work_.load(std::memory_order_acquire) == 0)
585 {
586
1/1
✓ Branch 1 taken 7 times.
7 stop();
587 7 return 0;
588 }
589
590 27 thread_context_guard ctx(this);
591
1/1
✓ Branch 1 taken 27 times.
27 std::unique_lock lock(mutex_);
592
1/1
✓ Branch 1 taken 27 times.
27 return do_one(lock, usec, &ctx.frame_);
593 27 }
594
595 std::size_t
596 2 epoll_scheduler::
597 poll()
598 {
599
2/2
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 1 time.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
600 {
601
1/1
✓ Branch 1 taken 1 time.
1 stop();
602 1 return 0;
603 }
604
605 1 thread_context_guard ctx(this);
606
1/1
✓ Branch 1 taken 1 time.
1 std::unique_lock lock(mutex_);
607
608 1 std::size_t n = 0;
609 for (;;)
610 {
611
3/3
✓ Branch 1 taken 3 times.
✓ Branch 3 taken 1 time.
✓ Branch 4 taken 2 times.
3 if (!do_one(lock, 0, &ctx.frame_))
612 1 break;
613
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 if (n != (std::numeric_limits<std::size_t>::max)())
614 2 ++n;
615
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 if (!lock.owns_lock())
616
1/1
✓ Branch 1 taken 2 times.
2 lock.lock();
617 }
618 1 return n;
619 1 }
620
621 std::size_t
622 4 epoll_scheduler::
623 poll_one()
624 {
625
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 2 times.
8 if (outstanding_work_.load(std::memory_order_acquire) == 0)
626 {
627
1/1
✓ Branch 1 taken 2 times.
2 stop();
628 2 return 0;
629 }
630
631 2 thread_context_guard ctx(this);
632
1/1
✓ Branch 1 taken 2 times.
2 std::unique_lock lock(mutex_);
633
1/1
✓ Branch 1 taken 2 times.
2 return do_one(lock, 0, &ctx.frame_);
634 2 }
635
636 void
637 9508 epoll_scheduler::
638 register_descriptor(int fd, descriptor_state* desc) const
639 {
640 9508 epoll_event ev{};
641 9508 ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
642 9508 ev.data.ptr = desc;
643
644
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 9508 times.
9508 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
645 detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
646
647 9508 desc->registered_events = ev.events;
648 9508 desc->fd = fd;
649 9508 desc->scheduler_ = this;
650
651
1/1
✓ Branch 1 taken 9508 times.
9508 std::lock_guard lock(desc->mutex);
652 9508 desc->read_ready = false;
653 9508 desc->write_ready = false;
654 9508 }
655
656 void
657 9508 epoll_scheduler::
658 deregister_descriptor(int fd) const
659 {
660 9508 ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
661 9508 }
662
663 void
664 9645 epoll_scheduler::
665 work_started() const noexcept
666 {
667 9645 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
668 9645 }
669
670 void
671 16384 epoll_scheduler::
672 work_finished() const noexcept
673 {
674
2/2
✓ Branch 0 taken 158 times.
✓ Branch 1 taken 16226 times.
32768 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
675 {
676 // Last work item completed - wake all threads so they can exit.
677 // signal_all() wakes threads waiting on the condvar.
678 // interrupt_reactor() wakes the reactor thread blocked in epoll_wait().
679 // Both are needed because they target different blocking mechanisms.
680 158 std::unique_lock lock(mutex_);
681 158 signal_all(lock);
682
5/6
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 157 times.
✓ Branch 3 taken 1 time.
✗ Branch 4 not taken.
✓ Branch 5 taken 1 time.
✓ Branch 6 taken 157 times.
158 if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
683 {
684 1 task_interrupted_ = true;
685 1 lock.unlock();
686 1 interrupt_reactor();
687 }
688 158 }
689 16384 }
690
691 void
692 55204 epoll_scheduler::
693 compensating_work_started() const noexcept
694 {
695 55204 auto* ctx = find_context(this);
696
1/2
✓ Branch 0 taken 55204 times.
✗ Branch 1 not taken.
55204 if (ctx)
697 55204 ++ctx->private_outstanding_work;
698 55204 }
699
700 void
701 epoll_scheduler::
702 drain_thread_queue(op_queue& queue, long count) const
703 {
704 // Note: outstanding_work_ was already incremented when posting
705 std::unique_lock lock(mutex_);
706 completed_ops_.splice(queue);
707 if (count > 0)
708 maybe_unlock_and_signal_one(lock);
709 }
710
711 void
712 9437 epoll_scheduler::
713 post_deferred_completions(op_queue& ops) const
714 {
715
1/2
✓ Branch 1 taken 9437 times.
✗ Branch 2 not taken.
9437 if (ops.empty())
716 9437 return;
717
718 // Fast path: if on scheduler thread, use private queue
719 if (auto* ctx = find_context(this))
720 {
721 ctx->private_queue.splice(ops);
722 return;
723 }
724
725 // Slow path: add to global queue and wake a thread
726 std::unique_lock lock(mutex_);
727 completed_ops_.splice(ops);
728 wake_one_thread_and_unlock(lock);
729 }
730
731 void
732 252 epoll_scheduler::
733 interrupt_reactor() const
734 {
735 // Only write if not already armed to avoid redundant writes
736 252 bool expected = false;
737
2/2
✓ Branch 1 taken 236 times.
✓ Branch 2 taken 16 times.
252 if (eventfd_armed_.compare_exchange_strong(expected, true,
738 std::memory_order_release, std::memory_order_relaxed))
739 {
740 236 std::uint64_t val = 1;
741
1/1
✓ Branch 1 taken 236 times.
236 [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
742 }
743 252 }
744
745 void
746 383 epoll_scheduler::
747 signal_all(std::unique_lock<std::mutex>&) const
748 {
749 383 state_ |= 1;
750 383 cond_.notify_all();
751 383 }
752
753 bool
754 1679 epoll_scheduler::
755 maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
756 {
757 1679 state_ |= 1;
758
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1679 times.
1679 if (state_ > 1)
759 {
760 lock.unlock();
761 cond_.notify_one();
762 return true;
763 }
764 1679 return false;
765 }
766
767 void
768 198024 epoll_scheduler::
769 unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
770 {
771 198024 state_ |= 1;
772 198024 bool have_waiters = state_ > 1;
773 198024 lock.unlock();
774
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 198024 times.
198024 if (have_waiters)
775 cond_.notify_one();
776 198024 }
777
778 void
779 epoll_scheduler::
780 clear_signal() const
781 {
782 state_ &= ~std::size_t(1);
783 }
784
785 void
786 epoll_scheduler::
787 wait_for_signal(std::unique_lock<std::mutex>& lock) const
788 {
789 while ((state_ & 1) == 0)
790 {
791 state_ += 2;
792 cond_.wait(lock);
793 state_ -= 2;
794 }
795 }
796
797 void
798 epoll_scheduler::
799 wait_for_signal_for(
800 std::unique_lock<std::mutex>& lock,
801 long timeout_us) const
802 {
803 if ((state_ & 1) == 0)
804 {
805 state_ += 2;
806 cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
807 state_ -= 2;
808 }
809 }
810
811 void
812 1679 epoll_scheduler::
813 wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const
814 {
815
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1679 times.
1679 if (maybe_unlock_and_signal_one(lock))
816 return;
817
818
5/6
✓ Branch 1 taken 26 times.
✓ Branch 2 taken 1653 times.
✓ Branch 3 taken 26 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 26 times.
✓ Branch 6 taken 1653 times.
1679 if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
819 {
820 26 task_interrupted_ = true;
821 26 lock.unlock();
822 26 interrupt_reactor();
823 }
824 else
825 {
826 1653 lock.unlock();
827 }
828 }
829
830 /** RAII guard for handler execution work accounting.
831
832 Handler consumes 1 work item, may produce N new items via fast-path posts.
833 Net change = N - 1:
834 - If N > 1: add (N-1) to global (more work produced than consumed)
835 - If N == 1: net zero, do nothing
836 - If N < 1: call work_finished() (work consumed, may trigger stop)
837
838 Also drains private queue to global for other threads to process.
839 */
840 struct work_cleanup
841 {
842 epoll_scheduler const* scheduler;
843 std::unique_lock<std::mutex>* lock;
844 scheduler_context* ctx;
845
846 155800 ~work_cleanup()
847 {
848
1/2
✓ Branch 0 taken 155800 times.
✗ Branch 1 not taken.
155800 if (ctx)
849 {
850 155800 long produced = ctx->private_outstanding_work;
851
2/2
✓ Branch 0 taken 96 times.
✓ Branch 1 taken 155704 times.
155800 if (produced > 1)
852 96 scheduler->outstanding_work_.fetch_add(produced - 1, std::memory_order_relaxed);
853
2/2
✓ Branch 0 taken 16144 times.
✓ Branch 1 taken 139560 times.
155704 else if (produced < 1)
854 16144 scheduler->work_finished();
855 // produced == 1: net zero, handler consumed what it produced
856 155800 ctx->private_outstanding_work = 0;
857
858
2/2
✓ Branch 1 taken 84452 times.
✓ Branch 2 taken 71348 times.
155800 if (!ctx->private_queue.empty())
859 {
860 84452 lock->lock();
861 84452 scheduler->completed_ops_.splice(ctx->private_queue);
862 }
863 }
864 else
865 {
866 // No thread context - slow-path op was already counted globally
867 scheduler->work_finished();
868 }
869 155800 }
870 };
871
872 /** RAII guard for reactor work accounting.
873
874 Reactor only produces work via timer/signal callbacks posting handlers.
875 Unlike handler execution which consumes 1, the reactor consumes nothing.
876 All produced work must be flushed to global counter.
877 */
878 struct task_cleanup
879 {
880 epoll_scheduler const* scheduler;
881 std::unique_lock<std::mutex>* lock;
882 scheduler_context* ctx;
883
884 51894 ~task_cleanup()
885 51894 {
886
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 51894 times.
51894 if (!ctx)
887 return;
888
889
2/2
✓ Branch 0 taken 4927 times.
✓ Branch 1 taken 46967 times.
51894 if (ctx->private_outstanding_work > 0)
890 {
891 4927 scheduler->outstanding_work_.fetch_add(
892 4927 ctx->private_outstanding_work, std::memory_order_relaxed);
893 4927 ctx->private_outstanding_work = 0;
894 }
895
896
2/2
✓ Branch 1 taken 4927 times.
✓ Branch 2 taken 46967 times.
51894 if (!ctx->private_queue.empty())
897 {
898
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4927 times.
4927 if (!lock->owns_lock())
899 lock->lock();
900 4927 scheduler->completed_ops_.splice(ctx->private_queue);
901 }
902 51894 }
903 };
904
905 void
906 9848 epoll_scheduler::
907 update_timerfd() const
908 {
909 9848 auto nearest = timer_svc_->nearest_expiry();
910
911 9848 itimerspec ts{};
912 9848 int flags = 0;
913
914
3/3
✓ Branch 2 taken 9848 times.
✓ Branch 4 taken 9803 times.
✓ Branch 5 taken 45 times.
9848 if (nearest == timer_service::time_point::max())
915 {
916 // No timers - disarm by setting to 0 (relative)
917 }
918 else
919 {
920 9803 auto now = std::chrono::steady_clock::now();
921
3/3
✓ Branch 1 taken 9803 times.
✓ Branch 4 taken 127 times.
✓ Branch 5 taken 9676 times.
9803 if (nearest <= now)
922 {
923 // Use 1ns instead of 0 - zero disarms the timerfd
924 127 ts.it_value.tv_nsec = 1;
925 }
926 else
927 {
928 9676 auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
929
1/1
✓ Branch 1 taken 9676 times.
19352 nearest - now).count();
930 9676 ts.it_value.tv_sec = nsec / 1000000000;
931 9676 ts.it_value.tv_nsec = nsec % 1000000000;
932 // Ensure non-zero to avoid disarming if duration rounds to 0
933
3/4
✓ Branch 0 taken 9665 times.
✓ Branch 1 taken 11 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 9665 times.
9676 if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
934 ts.it_value.tv_nsec = 1;
935 }
936 }
937
938
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 9848 times.
9848 if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
939 detail::throw_system_error(make_err(errno), "timerfd_settime");
940 9848 }
941
942 void
943 51894 epoll_scheduler::
944 run_task(std::unique_lock<std::mutex>& lock, scheduler_context* ctx)
945 {
946
2/2
✓ Branch 0 taken 42224 times.
✓ Branch 1 taken 9670 times.
51894 int timeout_ms = task_interrupted_ ? 0 : -1;
947
948
2/2
✓ Branch 1 taken 9670 times.
✓ Branch 2 taken 42224 times.
51894 if (lock.owns_lock())
949
1/1
✓ Branch 1 taken 9670 times.
9670 lock.unlock();
950
951 51894 task_cleanup on_exit{this, &lock, ctx};
952
953 // Flush deferred timerfd programming before blocking
954
2/2
✓ Branch 1 taken 4921 times.
✓ Branch 2 taken 46973 times.
51894 if (timerfd_stale_.exchange(false, std::memory_order_acquire))
955
1/1
✓ Branch 1 taken 4921 times.
4921 update_timerfd();
956
957 // Event loop runs without mutex held
958 epoll_event events[128];
959
1/1
✓ Branch 1 taken 51894 times.
51894 int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
960
961
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 51894 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
51894 if (nfds < 0 && errno != EINTR)
962 detail::throw_system_error(make_err(errno), "epoll_wait");
963
964 51894 bool check_timers = false;
965 51894 op_queue local_ops;
966
967 // Process events without holding the mutex
968
2/2
✓ Branch 0 taken 69601 times.
✓ Branch 1 taken 51894 times.
121495 for (int i = 0; i < nfds; ++i)
969 {
970
2/2
✓ Branch 0 taken 33 times.
✓ Branch 1 taken 69568 times.
69601 if (events[i].data.ptr == nullptr)
971 {
972 std::uint64_t val;
973
1/1
✓ Branch 1 taken 33 times.
33 [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
974 33 eventfd_armed_.store(false, std::memory_order_relaxed);
975 33 continue;
976 33 }
977
978
2/2
✓ Branch 0 taken 4927 times.
✓ Branch 1 taken 64641 times.
69568 if (events[i].data.ptr == &timer_fd_)
979 {
980 std::uint64_t expirations;
981
1/1
✓ Branch 1 taken 4927 times.
4927 [[maybe_unused]] auto r = ::read(timer_fd_, &expirations, sizeof(expirations));
982 4927 check_timers = true;
983 4927 continue;
984 4927 }
985
986 // Deferred I/O: just set ready events and enqueue descriptor
987 // No per-descriptor mutex locking in reactor hot path!
988 64641 auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
989 64641 desc->add_ready_events(events[i].events);
990
991 // Only enqueue if not already enqueued
992 64641 bool expected = false;
993
1/2
✓ Branch 1 taken 64641 times.
✗ Branch 2 not taken.
64641 if (desc->is_enqueued_.compare_exchange_strong(expected, true,
994 std::memory_order_release, std::memory_order_relaxed))
995 {
996 64641 local_ops.push(desc);
997 }
998 }
999
1000 // Process timers only when timerfd fires
1001
2/2
✓ Branch 0 taken 4927 times.
✓ Branch 1 taken 46967 times.
51894 if (check_timers)
1002 {
1003
1/1
✓ Branch 1 taken 4927 times.
4927 timer_svc_->process_expired();
1004
1/1
✓ Branch 1 taken 4927 times.
4927 update_timerfd();
1005 }
1006
1007
1/1
✓ Branch 1 taken 51894 times.
51894 lock.lock();
1008
1009
2/2
✓ Branch 1 taken 41765 times.
✓ Branch 2 taken 10129 times.
51894 if (!local_ops.empty())
1010 41765 completed_ops_.splice(local_ops);
1011 51894 }
1012
1013 std::size_t
1014 155952 epoll_scheduler::
1015 do_one(std::unique_lock<std::mutex>& lock, long timeout_us, scheduler_context* ctx)
1016 {
1017 for (;;)
1018 {
1019
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 207845 times.
207846 if (stopped_)
1020 1 return 0;
1021
1022 207845 scheduler_op* op = completed_ops_.pop();
1023
1024 // Handle reactor sentinel - time to poll for I/O
1025
2/2
✓ Branch 0 taken 52044 times.
✓ Branch 1 taken 155801 times.
207845 if (op == &task_op_)
1026 {
1027 52044 bool more_handlers = !completed_ops_.empty();
1028
1029 // Nothing to run the reactor for: no pending work to wait on,
1030 // or caller requested a non-blocking poll
1031
4/4
✓ Branch 0 taken 9820 times.
✓ Branch 1 taken 42224 times.
✓ Branch 2 taken 150 times.
✓ Branch 3 taken 51894 times.
61864 if (!more_handlers &&
1032
3/4
✓ Branch 1 taken 9670 times.
✓ Branch 2 taken 150 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 9670 times.
19640 (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1033 timeout_us == 0))
1034 {
1035 150 completed_ops_.push(&task_op_);
1036 150 return 0;
1037 }
1038
1039
3/4
✓ Branch 0 taken 9670 times.
✓ Branch 1 taken 42224 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 9670 times.
51894 task_interrupted_ = more_handlers || timeout_us == 0;
1040 51894 task_running_.store(true, std::memory_order_release);
1041
1042
2/2
✓ Branch 0 taken 42224 times.
✓ Branch 1 taken 9670 times.
51894 if (more_handlers)
1043 42224 unlock_and_signal_one(lock);
1044
1045 51894 run_task(lock, ctx);
1046
1047 51894 task_running_.store(false, std::memory_order_relaxed);
1048 51894 completed_ops_.push(&task_op_);
1049 51894 continue;
1050 51894 }
1051
1052 // Handle operation
1053
2/2
✓ Branch 0 taken 155800 times.
✓ Branch 1 taken 1 time.
155801 if (op != nullptr)
1054 {
1055
1/2
✓ Branch 1 taken 155800 times.
✗ Branch 2 not taken.
155800 if (!completed_ops_.empty())
1056
1/1
✓ Branch 1 taken 155800 times.
155800 unlock_and_signal_one(lock);
1057 else
1058 lock.unlock();
1059
1060 155800 work_cleanup on_exit{this, &lock, ctx};
1061
1062
1/1
✓ Branch 1 taken 155800 times.
155800 (*op)();
1063 155800 return 1;
1064 155800 }
1065
1066 // No pending work to wait on, or caller requested non-blocking poll
1067
2/6
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 1 time.
✗ Branch 6 not taken.
2 if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1068 timeout_us == 0)
1069 1 return 0;
1070
1071 clear_signal();
1072 if (timeout_us < 0)
1073 wait_for_signal(lock);
1074 else
1075 wait_for_signal_for(lock, timeout_us);
1076 51894 }
1077 }
1078
1079 } // namespace boost::corosio::detail
1080
1081 #endif
1082