89.58% Lines (43/48) 100.00% Functions (7/7)
TLA Baseline Branch
Line Hits Code Line Hits Code
1   // 1   //
2   // Copyright (c) 2026 Steve Gerbino 2   // Copyright (c) 2026 Steve Gerbino
3   // 3   //
4   // Distributed under the Boost Software License, Version 1.0. (See accompanying 4   // Distributed under the Boost Software License, Version 1.0. (See accompanying
5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6   // 6   //
7   // Official repository: https://github.com/cppalliance/corosio 7   // Official repository: https://github.com/cppalliance/corosio
8   // 8   //
9   9  
10   #ifndef BOOST_COROSIO_DETAIL_THREAD_POOL_HPP 10   #ifndef BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
11   #define BOOST_COROSIO_DETAIL_THREAD_POOL_HPP 11   #define BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
12   12  
13   #include <boost/corosio/detail/config.hpp> 13   #include <boost/corosio/detail/config.hpp>
14   #include <boost/corosio/detail/intrusive.hpp> 14   #include <boost/corosio/detail/intrusive.hpp>
15   #include <boost/capy/ex/execution_context.hpp> 15   #include <boost/capy/ex/execution_context.hpp>
16   #include <boost/capy/test/thread_name.hpp> 16   #include <boost/capy/test/thread_name.hpp>
17   17  
18   #include <condition_variable> 18   #include <condition_variable>
19   #include <cstdio> 19   #include <cstdio>
20   #include <mutex> 20   #include <mutex>
21   #include <stdexcept> 21   #include <stdexcept>
22   #include <thread> 22   #include <thread>
23   #include <vector> 23   #include <vector>
24   24  
25   namespace boost::corosio::detail { 25   namespace boost::corosio::detail {
26   26  
27   /** Base class for thread pool work items. 27   /** Base class for thread pool work items.
28   28  
29   Derive from this to create work that can be posted to a 29   Derive from this to create work that can be posted to a
30   @ref thread_pool. Uses static function pointer dispatch, 30   @ref thread_pool. Uses static function pointer dispatch,
31   consistent with the IOCP `op` pattern. 31   consistent with the IOCP `op` pattern.
32   32  
33   @par Example 33   @par Example
34   @code 34   @code
35   struct my_work : pool_work_item 35   struct my_work : pool_work_item
36   { 36   {
37   int* result; 37   int* result;
38   static void execute( pool_work_item* w ) noexcept 38   static void execute( pool_work_item* w ) noexcept
39   { 39   {
40   auto* self = static_cast<my_work*>( w ); 40   auto* self = static_cast<my_work*>( w );
41   *self->result = 42; 41   *self->result = 42;
42   } 42   }
43   }; 43   };
44   44  
45   my_work w; 45   my_work w;
46   w.func_ = &my_work::execute; 46   w.func_ = &my_work::execute;
47   w.result = &r; 47   w.result = &r;
48   pool.post( &w ); 48   pool.post( &w );
49   @endcode 49   @endcode
50   */ 50   */
51   struct pool_work_item : intrusive_queue<pool_work_item>::node 51   struct pool_work_item : intrusive_queue<pool_work_item>::node
52   { 52   {
53   /// Static dispatch function signature. 53   /// Static dispatch function signature.
54   using func_type = void (*)(pool_work_item*) noexcept; 54   using func_type = void (*)(pool_work_item*) noexcept;
55   55  
56   /// Completion handler invoked by the worker thread. 56   /// Completion handler invoked by the worker thread.
57   func_type func_ = nullptr; 57   func_type func_ = nullptr;
58   }; 58   };
59   59  
60   /** Shared thread pool for dispatching blocking operations. 60   /** Shared thread pool for dispatching blocking operations.
61   61  
62   Provides a fixed pool of reusable worker threads for operations 62   Provides a fixed pool of reusable worker threads for operations
63   that cannot be integrated with async I/O (e.g. blocking DNS 63   that cannot be integrated with async I/O (e.g. blocking DNS
64   calls). Registered as an `execution_context::service` so it 64   calls). Registered as an `execution_context::service` so it
65   is a singleton per io_context. 65   is a singleton per io_context.
66   66  
67   Threads are created eagerly in the constructor. The default 67   Threads are created eagerly in the constructor. The default
68   thread count is 1. 68   thread count is 1.
69   69  
70   @par Thread Safety 70   @par Thread Safety
71   All public member functions are thread-safe. 71   All public member functions are thread-safe.
72   72  
73   @par Shutdown 73   @par Shutdown
74   Sets a shutdown flag, notifies all threads, and joins them. 74   Sets a shutdown flag, notifies all threads, and joins them.
75   In-flight blocking calls complete naturally before the thread 75   In-flight blocking calls complete naturally before the thread
76   exits. 76   exits.
77   */ 77   */
78   class thread_pool final : public capy::execution_context::service 78   class thread_pool final : public capy::execution_context::service
79   { 79   {
80   std::mutex mutex_; 80   std::mutex mutex_;
81   std::condition_variable cv_; 81   std::condition_variable cv_;
82   intrusive_queue<pool_work_item> work_queue_; 82   intrusive_queue<pool_work_item> work_queue_;
83   std::vector<std::thread> threads_; 83   std::vector<std::thread> threads_;
84   bool shutdown_ = false; 84   bool shutdown_ = false;
85   85  
86   void worker_loop(unsigned index); 86   void worker_loop(unsigned index);
87   87  
88   public: 88   public:
89   using key_type = thread_pool; 89   using key_type = thread_pool;
90   90  
91   /** Construct the thread pool service. 91   /** Construct the thread pool service.
92   92  
93   Eagerly creates all worker threads. 93   Eagerly creates all worker threads.
94   94  
95   @par Exception Safety 95   @par Exception Safety
96   Strong guarantee. If thread creation fails, all 96   Strong guarantee. If thread creation fails, all
97   already-created threads are shut down and joined 97   already-created threads are shut down and joined
98   before the exception propagates. 98   before the exception propagates.
99   99  
100   @param ctx Reference to the owning execution_context. 100   @param ctx Reference to the owning execution_context.
101   @param num_threads Number of worker threads. Must be 101   @param num_threads Number of worker threads. Must be
102   at least 1. 102   at least 1.
103   103  
104   @throws std::logic_error If `num_threads` is 0. 104   @throws std::logic_error If `num_threads` is 0.
105   */ 105   */
HITCBC 106   607 explicit thread_pool(capy::execution_context& ctx, unsigned num_threads = 1) 106   607 explicit thread_pool(capy::execution_context& ctx, unsigned num_threads = 1)
HITCBC 107   607 { 107   607 {
108   (void)ctx; 108   (void)ctx;
HITCBC 109   607 if (!num_threads) 109   607 if (!num_threads)
HITCBC 110   1 throw std::logic_error("thread_pool requires at least 1 thread"); 110   1 throw std::logic_error("thread_pool requires at least 1 thread");
HITCBC 111   606 threads_.reserve(num_threads); 111   606 threads_.reserve(num_threads);
112   try 112   try
113   { 113   {
HITCBC 114   1215 for (unsigned i = 0; i < num_threads; ++i) 114   1215 for (unsigned i = 0; i < num_threads; ++i)
HITCBC 115   1218 threads_.emplace_back([this, i] { worker_loop(i + 1); }); 115   1218 threads_.emplace_back([this, i] { worker_loop(i + 1); });
116   } 116   }
MISUBC 117   catch (...) 117   catch (...)
118   { 118   {
MISUBC 119   shutdown(); 119   shutdown();
MISUBC 120   throw; 120   throw;
MISUBC 121   } 121   }
HITCBC 122   609 } 122   609 }
123   123  
HITCBC 124   1211 ~thread_pool() override = default; 124   1211 ~thread_pool() override = default;
125   125  
126   thread_pool(thread_pool const&) = delete; 126   thread_pool(thread_pool const&) = delete;
127   thread_pool& operator=(thread_pool const&) = delete; 127   thread_pool& operator=(thread_pool const&) = delete;
128   128  
129   /** Enqueue a work item for execution on the thread pool. 129   /** Enqueue a work item for execution on the thread pool.
130   130  
131   Zero-allocation: the caller owns the work item's storage. 131   Zero-allocation: the caller owns the work item's storage.
132   132  
133   @param w The work item to execute. Must remain valid until 133   @param w The work item to execute. Must remain valid until
134   its `func_` has been called. 134   its `func_` has been called.
135   135  
136   @return `true` if the item was enqueued, `false` if the 136   @return `true` if the item was enqueued, `false` if the
137   pool has already shut down. 137   pool has already shut down.
138   */ 138   */
139   bool post(pool_work_item* w) noexcept; 139   bool post(pool_work_item* w) noexcept;
140   140  
141   /** Shut down the thread pool. 141   /** Shut down the thread pool.
142   142  
143   Signals all threads to exit after draining any 143   Signals all threads to exit after draining any
144   remaining queued work, then joins them. 144   remaining queued work, then joins them.
145   */ 145   */
146   void shutdown() override; 146   void shutdown() override;
147   }; 147   };
148   148  
149   inline void 149   inline void
HITCBC 150   609 thread_pool::worker_loop(unsigned index) 150   609 thread_pool::worker_loop(unsigned index)
151   { 151   {
152   // Name format chosen to fit Linux's 15-char pthread limit: 152   // Name format chosen to fit Linux's 15-char pthread limit:
153   // "tpool-svc-" (10) + up to 4 digit index leaves "tpool-svc-9999". 153   // "tpool-svc-" (10) + up to 4 digit index leaves "tpool-svc-9999".
154   char name[16]; 154   char name[16];
HITCBC 155   609 std::snprintf(name, sizeof(name), "tpool-svc-%u", index); 155   609 std::snprintf(name, sizeof(name), "tpool-svc-%u", index);
HITCBC 156   609 capy::set_current_thread_name(name); 156   609 capy::set_current_thread_name(name);
157   157  
158   for (;;) 158   for (;;)
159   { 159   {
160   pool_work_item* w; 160   pool_work_item* w;
161   { 161   {
HITCBC 162   788 std::unique_lock<std::mutex> lock(mutex_); 162   788 std::unique_lock<std::mutex> lock(mutex_);
HITCBC 163   788 cv_.wait( 163   788 cv_.wait(
HITCBC 164   1416 lock, [this] { return shutdown_ || !work_queue_.empty(); }); 164   1300 lock, [this] { return shutdown_ || !work_queue_.empty(); });
165   165  
HITCBC 166   788 w = work_queue_.pop(); 166   788 w = work_queue_.pop();
HITCBC 167   788 if (!w) 167   788 if (!w)
168   { 168   {
HITCBC 169   609 if (shutdown_) 169   609 if (shutdown_)
HITCBC 170   1218 return; 170   1218 return;
MISUBC 171   continue; 171   continue;
172   } 172   }
HITCBC 173   788 } 173   788 }
HITCBC 174   179 w->func_(w); 174   179 w->func_(w);
HITCBC 175   179 } 175   179 }
176   } 176   }
177   177  
178   inline bool 178   inline bool
HITCBC 179   180 thread_pool::post(pool_work_item* w) noexcept 179   180 thread_pool::post(pool_work_item* w) noexcept
180   { 180   {
181   { 181   {
HITCBC 182   180 std::lock_guard<std::mutex> lock(mutex_); 182   180 std::lock_guard<std::mutex> lock(mutex_);
HITCBC 183   180 if (shutdown_) 183   180 if (shutdown_)
HITCBC 184   1 return false; 184   1 return false;
HITCBC 185   179 work_queue_.push(w); 185   179 work_queue_.push(w);
HITCBC 186   180 } 186   180 }
HITCBC 187   179 cv_.notify_one(); 187   179 cv_.notify_one();
HITCBC 188   179 return true; 188   179 return true;
189   } 189   }
190   190  
191   inline void 191   inline void
HITCBC 192   610 thread_pool::shutdown() 192   610 thread_pool::shutdown()
193   { 193   {
194   { 194   {
HITCBC 195   610 std::lock_guard<std::mutex> lock(mutex_); 195   610 std::lock_guard<std::mutex> lock(mutex_);
HITCBC 196   610 shutdown_ = true; 196   610 shutdown_ = true;
HITCBC 197   610 } 197   610 }
HITCBC 198   610 cv_.notify_all(); 198   610 cv_.notify_all();
199   199  
HITCBC 200   1219 for (auto& t : threads_) 200   1219 for (auto& t : threads_)
201   { 201   {
HITCBC 202   609 if (t.joinable()) 202   609 if (t.joinable())
HITCBC 203   609 t.join(); 203   609 t.join();
204   } 204   }
HITCBC 205   610 threads_.clear(); 205   610 threads_.clear();
206   206  
207   { 207   {
HITCBC 208   610 std::lock_guard<std::mutex> lock(mutex_); 208   610 std::lock_guard<std::mutex> lock(mutex_);
HITCBC 209   610 while (work_queue_.pop()) 209   610 while (work_queue_.pop())
210   ; 210   ;
HITCBC 211   610 } 211   610 }
HITCBC 212   610 } 212   610 }
213   213  
214   } // namespace boost::corosio::detail 214   } // namespace boost::corosio::detail
215   215  
216   #endif // BOOST_COROSIO_DETAIL_THREAD_POOL_HPP 216   #endif // BOOST_COROSIO_DETAIL_THREAD_POOL_HPP