corosio


Directory: ../corosio_lcov_PR-138/
Coverage: low: ≥ 0% medium: ≥ 75.0% high: ≥ 90.0%
Coverage Exec / Excl / Total
Lines: 73.4% 268 / 0 / 365
Functions: 87.9% 29 / 0 / 33
Branches: 62.2% 166 / 0 / 267

src/corosio/src/detail/select/scheduler.cpp
Line Branch Exec Source
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_SELECT
13
14 #include "src/detail/select/scheduler.hpp"
15 #include "src/detail/select/op.hpp"
16 #include "src/detail/timer_service.hpp"
17 #include "src/detail/make_err.hpp"
18 #include "src/detail/posix/resolver_service.hpp"
19 #include "src/detail/posix/signals.hpp"
20
21 #include <boost/corosio/detail/except.hpp>
22 #include <boost/corosio/detail/thread_local_ptr.hpp>
23
24 #include <chrono>
25 #include <limits>
26
27 #include <errno.h>
28 #include <fcntl.h>
29 #include <sys/select.h>
30 #include <sys/socket.h>
31 #include <unistd.h>
32
33 /*
34 select Scheduler - Single Reactor Model
35 =======================================
36
37 This scheduler mirrors the epoll_scheduler design but uses select() instead
38 of epoll for I/O multiplexing. The thread coordination strategy is identical:
39 one thread becomes the "reactor" while others wait on a condition variable.
40
41 Thread Model
42 ------------
43 - ONE thread runs select() at a time (the reactor thread)
44 - OTHER threads wait on wakeup_event_ (condition variable) for handlers
45 - When work is posted, exactly one waiting thread wakes via notify_one()
46
47 Key Differences from epoll
48 --------------------------
49 - Uses self-pipe instead of eventfd for interruption (more portable)
50 - fd_set rebuilding each iteration (O(n) vs O(1) for epoll)
51 - FD_SETSIZE limit (~1024 fds on most systems)
52 - Level-triggered only (no edge-triggered mode)
53
54 Self-Pipe Pattern
55 -----------------
56 To interrupt a blocking select() call (e.g., when work is posted or a timer
57 expires), we write a byte to pipe_fds_[1]. The read end pipe_fds_[0] is
58 always in the read_fds set, so select() returns immediately. We drain the
59 pipe to clear the readable state.
60
61 fd-to-op Mapping
62 ----------------
63 We use an unordered_map<int, fd_state> to track which operations are
64 registered for each fd. This allows O(1) lookup when select() returns
65 ready fds. Each fd can have at most one read op and one write op registered.
66 */
67
68 namespace boost::corosio::detail {
69
70 namespace {
71
72 struct scheduler_context
73 {
74 select_scheduler const* key;
75 scheduler_context* next;
76 };
77
78 corosio::detail::thread_local_ptr<scheduler_context> context_stack;
79
80 struct thread_context_guard
81 {
82 scheduler_context frame_;
83
84 120 explicit thread_context_guard(
85 select_scheduler const* ctx) noexcept
86 120 : frame_{ctx, context_stack.get()}
87 {
88 120 context_stack.set(&frame_);
89 120 }
90
91 120 ~thread_context_guard() noexcept
92 {
93 120 context_stack.set(frame_.next);
94 120 }
95 };
96
97 } // namespace
98
99 133 select_scheduler::
100 select_scheduler(
101 capy::execution_context& ctx,
102 133 int)
103 133 : pipe_fds_{-1, -1}
104 133 , outstanding_work_(0)
105 133 , stopped_(false)
106 133 , shutdown_(false)
107 133 , max_fd_(-1)
108 133 , reactor_running_(false)
109 133 , reactor_interrupted_(false)
110 266 , idle_thread_count_(0)
111 {
112 // Create self-pipe for interrupting select()
113
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 133 times.
133 if (::pipe(pipe_fds_) < 0)
114 detail::throw_system_error(make_err(errno), "pipe");
115
116 // Set both ends to non-blocking and close-on-exec
117
2/2
✓ Branch 0 taken 266 times.
✓ Branch 1 taken 133 times.
399 for (int i = 0; i < 2; ++i)
118 {
119
1/1
✓ Branch 1 taken 266 times.
266 int flags = ::fcntl(pipe_fds_[i], F_GETFL, 0);
120
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 266 times.
266 if (flags == -1)
121 {
122 int errn = errno;
123 ::close(pipe_fds_[0]);
124 ::close(pipe_fds_[1]);
125 detail::throw_system_error(make_err(errn), "fcntl F_GETFL");
126 }
127
2/3
✓ Branch 1 taken 266 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 266 times.
266 if (::fcntl(pipe_fds_[i], F_SETFL, flags | O_NONBLOCK) == -1)
128 {
129 int errn = errno;
130 ::close(pipe_fds_[0]);
131 ::close(pipe_fds_[1]);
132 detail::throw_system_error(make_err(errn), "fcntl F_SETFL");
133 }
134
2/3
✓ Branch 1 taken 266 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 266 times.
266 if (::fcntl(pipe_fds_[i], F_SETFD, FD_CLOEXEC) == -1)
135 {
136 int errn = errno;
137 ::close(pipe_fds_[0]);
138 ::close(pipe_fds_[1]);
139 detail::throw_system_error(make_err(errn), "fcntl F_SETFD");
140 }
141 }
142
143
1/1
✓ Branch 1 taken 133 times.
133 timer_svc_ = &get_timer_service(ctx, *this);
144
1/1
✓ Branch 3 taken 133 times.
133 timer_svc_->set_on_earliest_changed(
145 timer_service::callback(
146 this,
147 3741 [](void* p) { static_cast<select_scheduler*>(p)->interrupt_reactor(); }));
148
149 // Initialize resolver service
150
1/1
✓ Branch 1 taken 133 times.
133 get_resolver_service(ctx, *this);
151
152 // Initialize signal service
153
1/1
✓ Branch 1 taken 133 times.
133 get_signal_service(ctx, *this);
154
155 // Push task sentinel to interleave reactor runs with handler execution
156 133 completed_ops_.push(&task_op_);
157 133 }
158
159 266 select_scheduler::
160 133 ~select_scheduler()
161 {
162
1/2
✓ Branch 0 taken 133 times.
✗ Branch 1 not taken.
133 if (pipe_fds_[0] >= 0)
163 133 ::close(pipe_fds_[0]);
164
1/2
✓ Branch 0 taken 133 times.
✗ Branch 1 not taken.
133 if (pipe_fds_[1] >= 0)
165 133 ::close(pipe_fds_[1]);
166 266 }
167
168 void
169 133 select_scheduler::
170 shutdown()
171 {
172 {
173
1/1
✓ Branch 1 taken 133 times.
133 std::unique_lock lock(mutex_);
174 133 shutdown_ = true;
175
176
2/2
✓ Branch 1 taken 133 times.
✓ Branch 2 taken 133 times.
266 while (auto* h = completed_ops_.pop())
177 {
178
1/2
✓ Branch 0 taken 133 times.
✗ Branch 1 not taken.
133 if (h == &task_op_)
179 133 continue;
180 lock.unlock();
181 h->destroy();
182 lock.lock();
183 133 }
184 133 }
185
186 133 outstanding_work_.store(0, std::memory_order_release);
187
188
1/2
✓ Branch 0 taken 133 times.
✗ Branch 1 not taken.
133 if (pipe_fds_[1] >= 0)
189 133 interrupt_reactor();
190
191 133 wakeup_event_.notify_all();
192 133 }
193
194 void
195 4078 select_scheduler::
196 post(std::coroutine_handle<> h) const
197 {
198 struct post_handler final
199 : scheduler_op
200 {
201 std::coroutine_handle<> h_;
202
203 explicit
204 4078 post_handler(std::coroutine_handle<> h)
205 4078 : h_(h)
206 {
207 4078 }
208
209 8156 ~post_handler() = default;
210
211 4078 void operator()() override
212 {
213 4078 auto h = h_;
214
1/2
✓ Branch 0 taken 4078 times.
✗ Branch 1 not taken.
4078 delete this;
215
1/1
✓ Branch 1 taken 4078 times.
4078 h.resume();
216 4078 }
217
218 void destroy() override
219 {
220 delete this;
221 }
222 };
223
224
1/1
✓ Branch 1 taken 4078 times.
4078 auto ph = std::make_unique<post_handler>(h);
225 4078 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
226
227
1/1
✓ Branch 1 taken 4078 times.
4078 std::unique_lock lock(mutex_);
228 4078 completed_ops_.push(ph.release());
229
1/1
✓ Branch 1 taken 4078 times.
4078 wake_one_thread_and_unlock(lock);
230 4078 }
231
232 void
233 170965 select_scheduler::
234 post(scheduler_op* h) const
235 {
236 170965 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
237
238
1/1
✓ Branch 1 taken 170965 times.
170965 std::unique_lock lock(mutex_);
239 170965 completed_ops_.push(h);
240
1/1
✓ Branch 1 taken 170965 times.
170965 wake_one_thread_and_unlock(lock);
241 170965 }
242
243 void
244 4315 select_scheduler::
245 on_work_started() noexcept
246 {
247 4315 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
248 4315 }
249
250 void
251 4309 select_scheduler::
252 on_work_finished() noexcept
253 {
254
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4309 times.
8618 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
255 stop();
256 4309 }
257
258 bool
259 552 select_scheduler::
260 running_in_this_thread() const noexcept
261 {
262
2/2
✓ Branch 1 taken 368 times.
✓ Branch 2 taken 184 times.
552 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
263
1/2
✓ Branch 0 taken 368 times.
✗ Branch 1 not taken.
368 if (c->key == this)
264 368 return true;
265 184 return false;
266 }
267
268 void
269 3 select_scheduler::
270 stop()
271 {
272 3 bool expected = false;
273
1/2
✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
3 if (stopped_.compare_exchange_strong(expected, true,
274 std::memory_order_release, std::memory_order_relaxed))
275 {
276 // Wake all threads so they notice stopped_ and exit
277 {
278
1/1
✓ Branch 1 taken 3 times.
3 std::lock_guard lock(mutex_);
279 3 wakeup_event_.notify_all();
280 3 }
281
1/1
✓ Branch 1 taken 3 times.
3 interrupt_reactor();
282 }
283 3 }
284
285 bool
286 3 select_scheduler::
287 stopped() const noexcept
288 {
289 3 return stopped_.load(std::memory_order_acquire);
290 }
291
292 void
293 34 select_scheduler::
294 restart()
295 {
296 34 stopped_.store(false, std::memory_order_release);
297 34 }
298
299 std::size_t
300 96 select_scheduler::
301 run()
302 {
303
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 96 times.
96 if (stopped_.load(std::memory_order_acquire))
304 return 0;
305
306
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 96 times.
192 if (outstanding_work_.load(std::memory_order_acquire) == 0)
307 {
308 stop();
309 return 0;
310 }
311
312 96 thread_context_guard ctx(this);
313
314 96 std::size_t n = 0;
315
3/3
✓ Branch 1 taken 182285 times.
✓ Branch 3 taken 182189 times.
✓ Branch 4 taken 96 times.
182285 while (do_one(-1))
316
1/2
✓ Branch 1 taken 182189 times.
✗ Branch 2 not taken.
182189 if (n != (std::numeric_limits<std::size_t>::max)())
317 182189 ++n;
318 96 return n;
319 96 }
320
321 std::size_t
322 select_scheduler::
323 run_one()
324 {
325 if (stopped_.load(std::memory_order_acquire))
326 return 0;
327
328 if (outstanding_work_.load(std::memory_order_acquire) == 0)
329 {
330 stop();
331 return 0;
332 }
333
334 thread_context_guard ctx(this);
335 return do_one(-1);
336 }
337
338 std::size_t
339 27 select_scheduler::
340 wait_one(long usec)
341 {
342
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 27 times.
27 if (stopped_.load(std::memory_order_acquire))
343 return 0;
344
345
2/2
✓ Branch 1 taken 3 times.
✓ Branch 2 taken 24 times.
54 if (outstanding_work_.load(std::memory_order_acquire) == 0)
346 {
347
1/1
✓ Branch 1 taken 3 times.
3 stop();
348 3 return 0;
349 }
350
351 24 thread_context_guard ctx(this);
352
1/1
✓ Branch 1 taken 24 times.
24 return do_one(usec);
353 24 }
354
355 std::size_t
356 select_scheduler::
357 poll()
358 {
359 if (stopped_.load(std::memory_order_acquire))
360 return 0;
361
362 if (outstanding_work_.load(std::memory_order_acquire) == 0)
363 {
364 stop();
365 return 0;
366 }
367
368 thread_context_guard ctx(this);
369
370 std::size_t n = 0;
371 while (do_one(0))
372 if (n != (std::numeric_limits<std::size_t>::max)())
373 ++n;
374 return n;
375 }
376
377 std::size_t
378 select_scheduler::
379 poll_one()
380 {
381 if (stopped_.load(std::memory_order_acquire))
382 return 0;
383
384 if (outstanding_work_.load(std::memory_order_acquire) == 0)
385 {
386 stop();
387 return 0;
388 }
389
390 thread_context_guard ctx(this);
391 return do_one(0);
392 }
393
394 void
395 7332 select_scheduler::
396 register_fd(int fd, select_op* op, int events) const
397 {
398 // Validate fd is within select() limits
399
2/4
✓ Branch 0 taken 7332 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 7332 times.
7332 if (fd < 0 || fd >= FD_SETSIZE)
400 detail::throw_system_error(make_err(EINVAL), "select: fd out of range");
401
402 {
403
1/1
✓ Branch 1 taken 7332 times.
7332 std::lock_guard lock(mutex_);
404
405
1/1
✓ Branch 1 taken 7332 times.
7332 auto& state = registered_fds_[fd];
406
2/2
✓ Branch 0 taken 3807 times.
✓ Branch 1 taken 3525 times.
7332 if (events & event_read)
407 3807 state.read_op = op;
408
2/2
✓ Branch 0 taken 3525 times.
✓ Branch 1 taken 3807 times.
7332 if (events & event_write)
409 3525 state.write_op = op;
410
411
2/2
✓ Branch 0 taken 227 times.
✓ Branch 1 taken 7105 times.
7332 if (fd > max_fd_)
412 227 max_fd_ = fd;
413 7332 }
414
415 // Wake the reactor so a thread blocked in select() rebuilds its fd_sets
416 // with the newly registered fd.
417 7332 interrupt_reactor();
418 7332 }
419
420 void
421 7264 select_scheduler::
422 deregister_fd(int fd, int events) const
423 {
424
1/1
✓ Branch 1 taken 7264 times.
7264 std::lock_guard lock(mutex_);
425
426
1/1
✓ Branch 1 taken 7264 times.
7264 auto it = registered_fds_.find(fd);
427
2/2
✓ Branch 2 taken 7102 times.
✓ Branch 3 taken 162 times.
7264 if (it == registered_fds_.end())
428 7102 return;
429
430
1/2
✓ Branch 0 taken 162 times.
✗ Branch 1 not taken.
162 if (events & event_read)
431 162 it->second.read_op = nullptr;
432
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 162 times.
162 if (events & event_write)
433 it->second.write_op = nullptr;
434
435 // Remove entry if both are null
436
3/6
✓ Branch 1 taken 162 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 162 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 162 times.
✗ Branch 7 not taken.
162 if (!it->second.read_op && !it->second.write_op)
437 {
438
1/1
✓ Branch 1 taken 162 times.
162 registered_fds_.erase(it);
439
440 // Recalculate max_fd_ if needed
441
2/2
✓ Branch 0 taken 161 times.
✓ Branch 1 taken 1 time.
162 if (fd == max_fd_)
442 {
443 161 max_fd_ = pipe_fds_[0]; // At minimum, the pipe read end
444
1/2
✗ Branch 7 not taken.
✓ Branch 8 taken 161 times.
161 for (auto& [registered_fd, state] : registered_fds_)
445 {
446 if (registered_fd > max_fd_)
447 max_fd_ = registered_fd;
448 }
449 }
450 }
451 7264 }
452
453 void
454 7332 select_scheduler::
455 work_started() const noexcept
456 {
457 7332 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
458 7332 }
459
460 void
461 182381 select_scheduler::
462 work_finished() const noexcept
463 {
464
2/2
✓ Branch 0 taken 99 times.
✓ Branch 1 taken 182282 times.
364762 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
465 {
466 // Last work item completed - wake all threads so they can exit.
467 99 std::unique_lock lock(mutex_);
468 99 wakeup_event_.notify_all();
469
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 99 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
99 if (reactor_running_ && !reactor_interrupted_)
470 {
471 reactor_interrupted_ = true;
472 lock.unlock();
473 interrupt_reactor();
474 }
475 99 }
476 182381 }
477
478 void
479 14940 select_scheduler::
480 interrupt_reactor() const
481 {
482 14940 char byte = 1;
483
1/1
✓ Branch 1 taken 14940 times.
14940 [[maybe_unused]] auto r = ::write(pipe_fds_[1], &byte, 1);
484 14940 }
485
486 void
487 175043 select_scheduler::
488 wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const
489 {
490
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 175043 times.
175043 if (idle_thread_count_ > 0)
491 {
492 // Idle worker exists - wake it via condvar
493 wakeup_event_.notify_one();
494 lock.unlock();
495 }
496
4/4
✓ Branch 0 taken 3737 times.
✓ Branch 1 taken 171306 times.
✓ Branch 2 taken 3731 times.
✓ Branch 3 taken 6 times.
175043 else if (reactor_running_ && !reactor_interrupted_)
497 {
498 // No idle workers but reactor is running - interrupt it
499 3731 reactor_interrupted_ = true;
500 3731 lock.unlock();
501 3731 interrupt_reactor();
502 }
503 else
504 {
505 // No one to wake
506 171312 lock.unlock();
507 }
508 175043 }
509
510 struct work_guard
511 {
512 select_scheduler const* self;
513 182213 ~work_guard() { self->work_finished(); }
514 };
515
516 long
517 10839 select_scheduler::
518 calculate_timeout(long requested_timeout_us) const
519 {
520
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 10839 times.
10839 if (requested_timeout_us == 0)
521 return 0;
522
523 10839 auto nearest = timer_svc_->nearest_expiry();
524
3/3
✓ Branch 2 taken 10839 times.
✓ Branch 4 taken 35 times.
✓ Branch 5 taken 10804 times.
10839 if (nearest == timer_service::time_point::max())
525 35 return requested_timeout_us;
526
527 10804 auto now = std::chrono::steady_clock::now();
528
3/3
✓ Branch 1 taken 10804 times.
✓ Branch 4 taken 62 times.
✓ Branch 5 taken 10742 times.
10804 if (nearest <= now)
529 62 return 0;
530
531
1/1
✓ Branch 1 taken 10742 times.
10742 auto timer_timeout_us = std::chrono::duration_cast<std::chrono::microseconds>(
532
1/1
✓ Branch 1 taken 10742 times.
21484 nearest - now).count();
533
534
1/2
✓ Branch 0 taken 10742 times.
✗ Branch 1 not taken.
10742 if (requested_timeout_us < 0)
535 10742 return static_cast<long>(timer_timeout_us);
536
537 return static_cast<long>((std::min)(
538 static_cast<long long>(requested_timeout_us),
539 static_cast<long long>(timer_timeout_us)));
540 }
541
542 void
543 90776 select_scheduler::
544 run_reactor(std::unique_lock<std::mutex>& lock)
545 {
546 // Calculate timeout considering timers, use 0 if interrupted
547
3/3
✓ Branch 0 taken 79937 times.
✓ Branch 1 taken 10839 times.
✓ Branch 3 taken 10839 times.
90776 long effective_timeout_us = reactor_interrupted_ ? 0 : calculate_timeout(-1);
548
549 // Build fd_sets from registered_fds_
550 fd_set read_fds, write_fds, except_fds;
551
2/2
✓ Branch 0 taken 1452416 times.
✓ Branch 1 taken 90776 times.
1543192 FD_ZERO(&read_fds);
552
2/2
✓ Branch 0 taken 1452416 times.
✓ Branch 1 taken 90776 times.
1543192 FD_ZERO(&write_fds);
553
2/2
✓ Branch 0 taken 1452416 times.
✓ Branch 1 taken 90776 times.
1543192 FD_ZERO(&except_fds);
554
555 // Always include the interrupt pipe
556 90776 FD_SET(pipe_fds_[0], &read_fds);
557 90776 int nfds = pipe_fds_[0];
558
559 // Add registered fds
560
2/2
✓ Branch 7 taken 17622 times.
✓ Branch 8 taken 90776 times.
108398 for (auto& [fd, state] : registered_fds_)
561 {
562
2/2
✓ Branch 0 taken 14097 times.
✓ Branch 1 taken 3525 times.
17622 if (state.read_op)
563 14097 FD_SET(fd, &read_fds);
564
2/2
✓ Branch 0 taken 3525 times.
✓ Branch 1 taken 14097 times.
17622 if (state.write_op)
565 {
566 3525 FD_SET(fd, &write_fds);
567 // Also monitor for errors on connect operations
568 3525 FD_SET(fd, &except_fds);
569 }
570
2/2
✓ Branch 0 taken 14100 times.
✓ Branch 1 taken 3522 times.
17622 if (fd > nfds)
571 14100 nfds = fd;
572 }
573
574 // Convert timeout to timeval
575 struct timeval tv;
576 90776 struct timeval* tv_ptr = nullptr;
577
2/2
✓ Branch 0 taken 90741 times.
✓ Branch 1 taken 35 times.
90776 if (effective_timeout_us >= 0)
578 {
579 90741 tv.tv_sec = effective_timeout_us / 1000000;
580 90741 tv.tv_usec = effective_timeout_us % 1000000;
581 90741 tv_ptr = &tv;
582 }
583
584
1/1
✓ Branch 1 taken 90776 times.
90776 lock.unlock();
585
586
1/1
✓ Branch 1 taken 90776 times.
90776 int ready = ::select(nfds + 1, &read_fds, &write_fds, &except_fds, tv_ptr);
587 90776 int saved_errno = errno;
588
589 // Process timers outside the lock
590
1/1
✓ Branch 1 taken 90776 times.
90776 timer_svc_->process_expired();
591
592
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 90776 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
90776 if (ready < 0 && saved_errno != EINTR)
593 detail::throw_system_error(make_err(saved_errno), "select");
594
595 // Re-acquire lock before modifying completed_ops_
596
1/1
✓ Branch 1 taken 90776 times.
90776 lock.lock();
597
598 // Drain the interrupt pipe if readable
599
3/4
✓ Branch 0 taken 11169 times.
✓ Branch 1 taken 79607 times.
✓ Branch 2 taken 11169 times.
✗ Branch 3 not taken.
90776 if (ready > 0 && FD_ISSET(pipe_fds_[0], &read_fds))
600 {
601 char buf[256];
602
3/3
✓ Branch 1 taken 22338 times.
✓ Branch 3 taken 11169 times.
✓ Branch 4 taken 11169 times.
22338 while (::read(pipe_fds_[0], buf, sizeof(buf)) > 0) {}
603 }
604
605 // Process I/O completions
606 90776 int completions_queued = 0;
607
2/2
✓ Branch 0 taken 11169 times.
✓ Branch 1 taken 79607 times.
90776 if (ready > 0)
608 {
609 // Iterate over registered fds (copy keys to avoid iterator invalidation)
610 11169 std::vector<int> fds_to_check;
611
1/1
✓ Branch 2 taken 11169 times.
11169 fds_to_check.reserve(registered_fds_.size());
612
2/2
✓ Branch 7 taken 14130 times.
✓ Branch 8 taken 11169 times.
25299 for (auto& [fd, state] : registered_fds_)
613
1/1
✓ Branch 1 taken 14130 times.
14130 fds_to_check.push_back(fd);
614
615
2/2
✓ Branch 5 taken 14130 times.
✓ Branch 6 taken 11169 times.
25299 for (int fd : fds_to_check)
616 {
617
1/1
✓ Branch 1 taken 14130 times.
14130 auto it = registered_fds_.find(fd);
618
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 14130 times.
14130 if (it == registered_fds_.end())
619 continue;
620
621 14130 auto& state = it->second;
622
623 // Check for errors (especially for connect operations)
624 14130 bool has_error = FD_ISSET(fd, &except_fds);
625
626 // Process read readiness
627
5/6
✓ Branch 0 taken 10605 times.
✓ Branch 1 taken 3525 times.
✓ Branch 2 taken 6960 times.
✓ Branch 3 taken 3645 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 6960 times.
14130 if (state.read_op && (FD_ISSET(fd, &read_fds) || has_error))
628 {
629 3645 auto* op = state.read_op;
630 // Claim the op by exchanging to unregistered. Both registering and
631 // registered states mean the op is ours to complete.
632 3645 auto prev = op->registered.exchange(
633 select_registration_state::unregistered, std::memory_order_acq_rel);
634
1/2
✓ Branch 0 taken 3645 times.
✗ Branch 1 not taken.
3645 if (prev != select_registration_state::unregistered)
635 {
636 3645 state.read_op = nullptr;
637
638
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3645 times.
3645 if (has_error)
639 {
640 int errn = 0;
641 socklen_t len = sizeof(errn);
642 if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &errn, &len) < 0)
643 errn = errno;
644 if (errn == 0)
645 errn = EIO;
646 op->complete(errn, 0);
647 }
648 else
649 {
650 3645 op->perform_io();
651 }
652
653 3645 completed_ops_.push(op);
654 3645 ++completions_queued;
655 }
656 }
657
658 // Process write readiness
659
3/6
✓ Branch 0 taken 3525 times.
✓ Branch 1 taken 10605 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 3525 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
14130 if (state.write_op && (FD_ISSET(fd, &write_fds) || has_error))
660 {
661 3525 auto* op = state.write_op;
662 // Claim the op by exchanging to unregistered. Both registering and
663 // registered states mean the op is ours to complete.
664 3525 auto prev = op->registered.exchange(
665 select_registration_state::unregistered, std::memory_order_acq_rel);
666
1/2
✓ Branch 0 taken 3525 times.
✗ Branch 1 not taken.
3525 if (prev != select_registration_state::unregistered)
667 {
668 3525 state.write_op = nullptr;
669
670
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3525 times.
3525 if (has_error)
671 {
672 int errn = 0;
673 socklen_t len = sizeof(errn);
674 if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &errn, &len) < 0)
675 errn = errno;
676 if (errn == 0)
677 errn = EIO;
678 op->complete(errn, 0);
679 }
680 else
681 {
682 3525 op->perform_io();
683 }
684
685 3525 completed_ops_.push(op);
686 3525 ++completions_queued;
687 }
688 }
689
690 // Clean up empty entries
691
3/4
✓ Branch 0 taken 7170 times.
✓ Branch 1 taken 6960 times.
✓ Branch 2 taken 7170 times.
✗ Branch 3 not taken.
14130 if (!state.read_op && !state.write_op)
692
1/1
✓ Branch 1 taken 7170 times.
7170 registered_fds_.erase(it);
693 }
694 11169 }
695
696
2/2
✓ Branch 0 taken 3648 times.
✓ Branch 1 taken 87128 times.
90776 if (completions_queued > 0)
697 {
698
2/2
✓ Branch 0 taken 126 times.
✓ Branch 1 taken 3522 times.
3648 if (completions_queued == 1)
699 126 wakeup_event_.notify_one();
700 else
701 3522 wakeup_event_.notify_all();
702 }
703 90776 }
704
705 std::size_t
706 182309 select_scheduler::
707 do_one(long timeout_us)
708 {
709
1/1
✓ Branch 1 taken 182309 times.
182309 std::unique_lock lock(mutex_);
710
711 for (;;)
712 {
713
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 273085 times.
273085 if (stopped_.load(std::memory_order_acquire))
714 return 0;
715
716 273085 scheduler_op* op = completed_ops_.pop();
717
718
2/2
✓ Branch 0 taken 90872 times.
✓ Branch 1 taken 182213 times.
273085 if (op == &task_op_)
719 {
720 90872 bool more_handlers = !completed_ops_.empty();
721
722
2/2
✓ Branch 0 taken 10935 times.
✓ Branch 1 taken 79937 times.
90872 if (!more_handlers)
723 {
724
2/2
✓ Branch 1 taken 96 times.
✓ Branch 2 taken 10839 times.
21870 if (outstanding_work_.load(std::memory_order_acquire) == 0)
725 {
726 96 completed_ops_.push(&task_op_);
727 96 return 0;
728 }
729
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 10839 times.
10839 if (timeout_us == 0)
730 {
731 completed_ops_.push(&task_op_);
732 return 0;
733 }
734 }
735
736
3/4
✓ Branch 0 taken 10839 times.
✓ Branch 1 taken 79937 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 10839 times.
90776 reactor_interrupted_ = more_handlers || timeout_us == 0;
737 90776 reactor_running_ = true;
738
739
3/4
✓ Branch 0 taken 79937 times.
✓ Branch 1 taken 10839 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 79937 times.
90776 if (more_handlers && idle_thread_count_ > 0)
740 wakeup_event_.notify_one();
741
742
1/1
✓ Branch 1 taken 90776 times.
90776 run_reactor(lock);
743
744 90776 reactor_running_ = false;
745 90776 completed_ops_.push(&task_op_);
746 90776 continue;
747 90776 }
748
749
1/2
✓ Branch 0 taken 182213 times.
✗ Branch 1 not taken.
182213 if (op != nullptr)
750 {
751
1/1
✓ Branch 1 taken 182213 times.
182213 lock.unlock();
752 182213 work_guard g{this};
753
1/1
✓ Branch 1 taken 182213 times.
182213 (*op)();
754 182213 return 1;
755 182213 }
756
757 if (outstanding_work_.load(std::memory_order_acquire) == 0)
758 return 0;
759
760 if (timeout_us == 0)
761 return 0;
762
763 ++idle_thread_count_;
764 if (timeout_us < 0)
765 wakeup_event_.wait(lock);
766 else
767 wakeup_event_.wait_for(lock, std::chrono::microseconds(timeout_us));
768 --idle_thread_count_;
769 90776 }
770 182309 }
771
772 } // namespace boost::corosio::detail
773
774 #endif // BOOST_COROSIO_HAS_SELECT
775