nndeploy C++ API  0.2.0
nndeploy C++ API
local_thread.h
Go to the documentation of this file.
1 
2 #ifndef _NNDEPLOY_THREAD_POOL_LOCAL_THREAD_H_
3 #define _NNDEPLOY_THREAD_POOL_LOCAL_THREAD_H_
4 
5 #include <memory>
6 #include <thread>
7 
8 #include "nndeploy/base/status.h"
11 
12 namespace nndeploy {
13 namespace thread_pool {
14 
15 class LocalThread {
16  public:
17  explicit LocalThread() {
18  done_ = true;
19  pool_threads_ = nullptr;
20  index_ = -1;
22  }
23 
25 
31  void destroy() {
32  done_ = false;
33  if (thread_.joinable()) {
34  thread_.join(); // 等待线程结束
35  }
36  }
37 
38  void init() {
39  done_ = true;
40  steal_targets_.clear();
41  for (int i = 0; i < total_thread_size_ - 1; i++) {
42  auto target = (index_ + i + 1) % total_thread_size_;
43  steal_targets_.push_back(target);
44  }
45  steal_targets_.shrink_to_fit();
46  thread_ = std::move(std::thread(&LocalThread::run, this));
47  }
48 
49  void setThreadPoolInfo(int index, std::vector<LocalThread *> *pool_threads,
50  int total_thread_size) {
51  index_ = index;
52  pool_threads_ = pool_threads;
53  total_thread_size_ = total_thread_size;
54  }
55 
61  if (std::any_of(pool_threads_->begin(), pool_threads_->end(),
62  [](LocalThread *thd) { return nullptr == thd; })) {
64  }
65 
66  while (done_) {
67  RTask task;
68  if (popTask(task) || stealTask(task)) {
69  task();
70  } else {
71  std::unique_lock<std::mutex> lk(mutex_);
72  cv_.wait_for(lk, std::chrono::milliseconds(100));
73  }
74  }
75  return base::Status();
76  }
77 
83  void pushTask(RTask &&task) {
84  while (!(primary_queue_.tryPush(std::forward<RTask>(task)))) {
85  std::this_thread::yield();
86  }
87  cv_.notify_one();
88  }
89 
95  bool popTask(RTask &task) { return primary_queue_.tryPop(task); }
96 
102  bool stealTask(RTask &task) {
103  if (pool_threads_->size() < total_thread_size_) {
104  return false;
105  }
106 
107  for (auto &target : steal_targets_) {
108  if (((*pool_threads_)[target]) &&
109  ((*pool_threads_)[target])->primary_queue_.trySteal(task)) {
110  return true;
111  }
112  }
113 
114  return false;
115  }
116 
117  protected:
118  bool done_;
119  std::thread thread_;
120  int index_;
123  std::vector<LocalThread *> *pool_threads_;
124  std::vector<int> steal_targets_;
125  std::mutex mutex_;
126  std::condition_variable cv_;
127 };
128 
129 } // namespace thread_pool
130 } // namespace nndeploy
131 
132 #endif //_NNDEPLOY_THREAD_POOL_LOCAL_THREAD_H_
void setThreadPoolInfo(int index, std::vector< LocalThread * > *pool_threads, int total_thread_size)
Definition: local_thread.h:49
SafeWSQueue< RTask > primary_queue_
Definition: local_thread.h:122
std::condition_variable cv_
Definition: local_thread.h:126
std::vector< LocalThread * > * pool_threads_
Definition: local_thread.h:123
@ kStatusCodeErrorThreadPool
Definition: status.h:23