#Win32 API Concurrency signal finished task in thread pool

10 messages · Page 1 of 1 (latest)

vital furnaceBOT
#

When your question is answered use !solved to mark the question as resolved.

Remember to ask specific questions, provide necessary details, and reduce your question to its simplest form. For tips on how to ask a good question use !howto ask.

fallen pagoda
#
template <typename Task>
class ThreadPool {
private:
  std::vector<HANDLE> m_threads;
  BaseTaskStack<Task>& m_taskStack;
  std::function<void(BaseTaskStack<Task>&)> m_callableFunction;

  HANDLE m_cleanPool;

public:
  ThreadPool(BaseTaskStack<Task>& stack, const int maxThreads, std::function<void(BaseTaskStack<Task>&)> callableFunction)
    : m_taskStack(stack),
    m_callableFunction(std::move(callableFunction)),
    m_cleanPool(CreateEvent(nullptr, TRUE, FALSE, nullptr)) {
    for (int i = 0; i < maxThreads; ++i) {
      m_threads.push_back(reinterpret_cast<HANDLE>(_beginthreadex(nullptr, 0, ThreadFunction, this, 0, nullptr)));
    }
  }

  ~ThreadPool() {
    StopThreads();

    WaitForMultipleObjects(static_cast<DWORD>(m_threads.size()), m_threads.data(), TRUE, INFINITE);

    for (const auto& thread : m_threads) {
      CloseHandle(thread);
    }

    CloseHandle(m_cleanPool);
  }

  static unsigned int __stdcall ThreadFunction(void* params) {
    auto* pool = static_cast<ThreadPool*>(params);

    while (WaitForSingleObject(pool->m_cleanPool, 0) != WAIT_OBJECT_0) {
      pool->m_callableFunction(pool->m_taskStack);
    }

    _endthreadex(0);
    return 0;
  }

  void AddTask(Task&& task) {
    m_taskStack.PushTask(std::move(task));
  }

  void StopThreads() const {
    SetEvent(m_cleanPool);
  }

  ThreadPool(const ThreadPool&) = delete;
  ThreadPool& operator=(const ThreadPool& other) = delete;

  ThreadPool(ThreadPool&&) = delete;
  ThreadPool& operator=(ThreadPool&&) = delete;
};
#

The issue resides in the ThreadFunction member function. For now I am trying o implement a quick sort which I have already implemented while reading about future and promise in Concurrency in Action. However the plan is to restrict myself to low level concurrency constructs.

#

Here is the full code after some minor cleanups:

#include <functional>
#include <iostream>
#include <vector>
#include <algorithm>
#include <stack>
#include <windows.h>
#include <process.h>

template <typename Iterator, typename Comparator = std::less<typename std::iterator_traits<Iterator>::value_type>>
class SortingTask {
public:
  Iterator m_begin;
  Iterator m_end;
  Comparator m_comparator;

  SortingTask() = default;

  SortingTask(Iterator begin, Iterator end, Comparator comparator)
    : m_begin(std::move(begin)), m_end(std::move(end)), m_comparator(std::move(comparator)) {}

  SortingTask(SortingTask&& other) noexcept
    : m_begin(std::move(other.m_begin)), m_end(std::move(other.m_end)), m_comparator(std::move(other.m_comparator)) {}

  SortingTask& operator=(SortingTask&& other) noexcept {
    if (this != &other) {
      m_begin = std::move(other.m_begin);
      m_end = std::move(other.m_end);
      m_comparator = std::move(other.m_comparator);
    }
    return *this;
  }

  SortingTask(const SortingTask&) = delete;
  SortingTask& operator=(const SortingTask&) = delete;
};

template <typename Task>
class BaseTaskStack {
public:
  virtual ~BaseTaskStack() = default;

  virtual void PushTask(Task&&) = 0;
  virtual bool PopTask(Task&) = 0;

  BaseTaskStack() = default;

  BaseTaskStack(const BaseTaskStack&) = delete;
  BaseTaskStack& operator=(const BaseTaskStack& other) = delete;

  BaseTaskStack(BaseTaskStack&&) = delete;
  BaseTaskStack& operator=(BaseTaskStack&&) = delete;
};
#
template <typename Iterator, typename Compare = std::less<typename std::iterator_traits<Iterator>::value_type>>
class SortingTaskStack final : public BaseTaskStack<SortingTask<Iterator, Compare>> {
private:
  std::stack<SortingTask<Iterator, Compare>> m_stack;
  CRITICAL_SECTION m_criticalSection{};

public:
  SortingTaskStack() : BaseTaskStack<SortingTask<Iterator, Compare>>() {
    InitializeCriticalSection(&m_criticalSection);
  }

  ~SortingTaskStack() override {
    DeleteCriticalSection(&m_criticalSection);
  }

  void PushTask(SortingTask<Iterator, Compare>&& task) override {
    EnterCriticalSection(&m_criticalSection);

    m_stack.push(std::move(task));

    LeaveCriticalSection(&m_criticalSection);
  }

  bool PopTask(SortingTask<Iterator, Compare>& task) override {
    EnterCriticalSection(&m_criticalSection);

    if (m_stack.empty()) {
      LeaveCriticalSection(&m_criticalSection);
      return false;
    }

    task = std::move(m_stack.top());
    m_stack.pop();

    LeaveCriticalSection(&m_criticalSection);

    return true;
  }

  SortingTaskStack(const SortingTaskStack&) = delete;
  SortingTaskStack& operator=(const SortingTaskStack& other) = delete;

  SortingTaskStack(SortingTaskStack&&) = delete;
  SortingTaskStack& operator=(SortingTaskStack&&) = delete;
};
#
template <typename Task>
class ThreadPool {
private:
  std::vector<HANDLE> m_threads;
  BaseTaskStack<Task>& m_taskStack;
  std::function<void(BaseTaskStack<Task>&)> m_callableFunction;

  HANDLE m_cleanPool;

public:
  ThreadPool(BaseTaskStack<Task>& stack, const int maxThreads, std::function<void(BaseTaskStack<Task>&)> callableFunction)
    : m_taskStack(stack),
    m_callableFunction(std::move(callableFunction)),
    m_cleanPool(CreateEvent(nullptr, TRUE, FALSE, nullptr)) {
    for (int i = 0; i < maxThreads; ++i) {
      m_threads.push_back(reinterpret_cast<HANDLE>(_beginthreadex(nullptr, 0, ThreadFunction, this, 0, nullptr)));
    }
  }

  ~ThreadPool() {
    StopThreads();

    WaitForMultipleObjects(static_cast<DWORD>(m_threads.size()), m_threads.data(), TRUE, INFINITE);

    for (const auto& thread : m_threads) {
      CloseHandle(thread);
    }

    CloseHandle(m_cleanPool);
  }

  static unsigned int __stdcall ThreadFunction(void* params) {
    auto* pool = static_cast<ThreadPool*>(params);

    while (WaitForSingleObject(pool->m_cleanPool, 0) != WAIT_OBJECT_0) {
      pool->m_callableFunction(pool->m_taskStack);
    }

    _endthreadex(0);
    return 0;
  }

  void AddTask(Task&& task) {
    m_taskStack.PushTask(std::move(task));
  }

  void StopThreads() const {
    SetEvent(m_cleanPool);
  }

  ThreadPool(const ThreadPool&) = delete;
  ThreadPool& operator=(const ThreadPool& other) = delete;

  ThreadPool(ThreadPool&&) = delete;
  ThreadPool& operator=(ThreadPool&&) = delete;
};
#
template <typename Iterator, typename Compare>
void QuickSort(BaseTaskStack<SortingTask<Iterator, Compare>>& stack) {
  if (SortingTask<Iterator, Compare> task; stack.PopTask(task)) {
    const auto begin = task.m_begin;
    const auto end = task.m_end;
    const auto compare = task.m_comparator;

    if (begin != end) {
      const auto pivot = *std::next(begin, std::distance(begin, end) / 2);
      const auto middle1 = std::partition(begin, end, [pivot, compare](const auto& element) { return compare(element, pivot); });
      const auto middle2 = std::partition(begin, end, [pivot, compare](const auto& element) { return !compare(pivot, element); });

      SortingTask<Iterator, Compare> leftTask(begin, middle1, compare);
      SortingTask<Iterator, Compare> rightTask(middle2, end, compare);

      stack.PushTask(std::move(leftTask));
      stack.PushTask(std::move(rightTask));
    }
  }
}

int main() {
  std::vector data = { 1, 30, -4, 3, 5, -4, 1, 6, -8, 2, -5, 64, 1, 92 };

  SortingTaskStack<decltype(data.begin())> taskStack;

  ThreadPool<SortingTask<decltype(data.begin())>> threadPool(taskStack, 4, &QuickSort<decltype(data.begin()), decltype(std::less<int>())>);

  SortingTask initialTask(data.begin(), data.end(), std::less<int>());
  threadPool.AddTask(std::move(initialTask));

  Sleep(5000); // Really ugly

  threadPool.StopThreads();

  for (const auto num : data) {
    std::cout << num << " ";
  }

  return 0;
}
#

Another stupid idea was using an atomic integer to keep track of tasks in the stack/queue but that isn't effective since a non empty stack/queue doesn't necessarily mean that some task is still running.

fallen pagoda
#

Some more thinking later, I can probably write a Join function or Get function that waits until the tasks are done. Though the issue is that I don't know when tasks are done.

#

Would it be considered bad design if I were to just call WaitForMultiple on the whole thread pool's stack/queue?