Kea 3.2.0-git
io_service_thread_pool.cc
Go to the documentation of this file.
1// Copyright (C) 2022-2026 Internet Systems Consortium, Inc. ("ISC")
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7#include <config.h>
8
11#include <asiolink/io_service.h>
15#include <util/unlock_guard.h>
16
17#include <boost/shared_ptr.hpp>
18
19#include <atomic>
20#include <functional>
21#include <iostream>
22#include <list>
23#include <mutex>
24#include <thread>
25
26using namespace isc;
27using namespace isc::asiolink;
28using namespace isc::util;
29
31 bool defer_start /* = false */)
32 : pool_size_(pool_size), io_service_(io_service),
33 run_state_(State::STOPPED), mutex_(), thread_cv_(),
34 main_cv_(), paused_(0), running_(0), exited_(0) {
35 if (!pool_size) {
36 isc_throw(BadValue, "pool_size must be non 0");
37 }
38
39 // If we weren't given an IOService, create our own.
40 if (!io_service_) {
41 io_service_.reset(new IOService());
42 }
43
44 // If we're not deferring the start, do it now.
45 if (!defer_start) {
46 run();
47 }
48}
49
50// This destructor throws (when it must not) when called by a thread
51// which is not the main one (e.g. a worker thread). This can't happen
52// but static analyzers e.g. cppcheck do not know this so they complain.
53// cppcheck-suppress throwInNoexceptFunction
57
58void
62
63void
67
68void
72
74IoServiceThreadPool::getState() {
75 std::lock_guard<std::mutex> lck(mutex_);
76 return (run_state_);
77}
78
79bool
80IoServiceThreadPool::validateStateChange(State state) const {
81 switch (run_state_) {
82 case State::STOPPED:
83 return (state == State::RUNNING);
84 case State::RUNNING:
85 return (state != State::RUNNING);
86 case State::PAUSED:
87 return (state != State::PAUSED);
88 }
89 return (false);
90}
91
92std::string
93IoServiceThreadPool::stateToText(State state) {
94 switch (state) {
95 case State::STOPPED:
96 return (std::string("stopped"));
97 case State::RUNNING:
98 return (std::string("running"));
99 case State::PAUSED:
100 return (std::string("paused"));
101 }
102 return (std::string("unknown-state"));
103}
104
105void
109
110void
111IoServiceThreadPool::checkPermissions(State state) {
112 auto id = std::this_thread::get_id();
113 if (checkThreadId(id)) {
114 isc_throw(MultiThreadingInvalidOperation, "invalid thread pool state change to "
115 << IoServiceThreadPool::stateToText(state) << " performed by worker thread");
116 }
117}
118
119bool
120IoServiceThreadPool::checkThreadId(std::thread::id id) {
121 for (auto const& thread : threads_) {
122 if (id == thread->get_id()) {
123 return (true);
124 }
125 }
126 return (false);
127}
128
129void
130IoServiceThreadPool::setState(State state) {
131 checkPermissions(state);
132
133 std::unique_lock<std::mutex> main_lck(mutex_);
134
135 // Bail if the transition is invalid.
136 if (!validateStateChange(state)) {
137 return;
138 }
139
140 run_state_ = state;
141 // Notify threads of state change.
142 thread_cv_.notify_all();
143
144 switch (state) {
145 case State::RUNNING: {
146 // Restart the IOService.
147 io_service_->restart();
148
149 // While we have fewer threads than we should, make more.
150 while (threads_.size() < pool_size_) {
151 boost::shared_ptr<std::thread> thread(new std::thread(
152 std::bind(&IoServiceThreadPool::threadWork, this)));
153
154 // Add thread to the pool.
155 threads_.push_back(thread);
156 }
157
158 // Main thread waits here until all threads are running.
159 main_cv_.wait(main_lck,
160 [&]() {
161 return (running_ == threads_.size());
162 });
163
164 exited_ = 0;
165 break;
166 }
167
168 case State::PAUSED: {
169 // Stop IOService.
170 if (!io_service_->stopped()) {
171 try {
172 io_service_->poll();
173 } catch (...) {
174 // Catch all exceptions.
175 // Logging is not available.
176 }
177 io_service_->stop();
178 }
179
180 // Main thread waits here until all threads are paused.
181 main_cv_.wait(main_lck,
182 [&]() {
183 return (paused_ == threads_.size());
184 });
185
186 break;
187 }
188
189 case State::STOPPED: {
190 // Stop IOService.
191 if (!io_service_->stopped()) {
192 try {
193 io_service_->poll();
194 } catch (...) {
195 // Catch all exceptions.
196 // Logging is not available.
197 }
198 io_service_->stop();
199 }
200
201 // Main thread waits here until all threads have exited.
202 main_cv_.wait(main_lck,
203 [&]() {
204 return (exited_ == threads_.size());
205 });
206
207 for (auto const& thread : threads_) {
208 thread->join();
209 }
210
211 threads_.clear();
212 break;
213 }}
214}
215
216void
217IoServiceThreadPool::threadWork() {
218 bool done = false;
219 while (!done) {
220 switch (getState()) {
221 case State::RUNNING: {
222 {
223 std::unique_lock<std::mutex> lck(mutex_);
224 running_++;
225
226 // If We're all running notify main thread.
227 if (running_ == pool_size_) {
228 main_cv_.notify_all();
229 }
230 }
231
232 try {
233 // Run the IOService.
234 io_service_->run();
235 } catch (...) {
236 // Catch all exceptions.
237 // Logging is not available.
238 }
239
240 {
241 std::unique_lock<std::mutex> lck(mutex_);
242 running_--;
243 }
244
245 break;
246 }
247
248 case State::PAUSED: {
249 std::unique_lock<std::mutex> lck(mutex_);
250 paused_++;
251
252 // If we're all paused notify main.
253 if (paused_ == threads_.size()) {
254 main_cv_.notify_all();
255 }
256
257 // Wait here till I'm released.
258 thread_cv_.wait(lck,
259 [&]() {
260 return (run_state_ != State::PAUSED);
261 });
262
263 paused_--;
264 break;
265 }
266
267 case State::STOPPED: {
268 done = true;
269 break;
270 }}
271 }
272
273 std::unique_lock<std::mutex> lck(mutex_);
274 exited_++;
275
276 // If we've all exited, notify main.
277 if (exited_ == threads_.size()) {
278 main_cv_.notify_all();
279 }
280}
281
284 return (io_service_);
285}
286
287uint16_t
289 return (pool_size_);
290}
291
292uint16_t
294 return (threads_.size());
295}
A generic exception that is thrown if a parameter given to a method is considered invalid in that con...
Exception thrown when a worker thread is trying to stop or pause the respective thread pool (which wo...
Defines a State within the State Model.
Definition state_model.h:61
#define isc_throw(type, stream)
A shortcut macro to insert known values into exception arguments.
Defines the logger used by the top-level component of kea-lfc.