Raymii.org
Quis custodiet ipsos custodes?Home | About | All pages | Cluster Status | RSS Feed
My go-to C++ code for asynchronous work processing on a separate thread
Published: 17-12-2024 23:59 | Author: Remy van Elst | Text only version of this article
Table of Contents
You probably recognise this situation. You're working on your code and realise that the thing you're writing might take long, be blocking or is batch-wise. It might be resizing images, calling some API or processing hardware inputs or a stream of incoming messages. These tasks, if not handled efficiently, can halt your application, leaving it unresponsive. To avoid this, one solution is to offload these time-consuming operations to a separate thread, allowing the main application to continue executing without interruptions.
In this article, I'll show you how you can implement asynchronous work processing in C++ using a worker thread. This example class is my go-to for this situation and is easily adapted to handle more complex use cases. It has a queue of work items and uses a std::thread
, a std::mutex
combined with a std::condition_variable
to manage work asynchronously, processing items one by one.
Recently I removed all Google Ads from this site due to their invasive tracking, as well as Google Analytics. Please, if you found this content useful, consider a small donation using any of the options below:
I'm developing an open source monitoring app called Leaf Node Monitoring, for windows, linux & android. Go check it out!
Consider sponsoring me on Github. It means the world to me if you show your appreciation and you'll help pay the server costs.
You can also sponsor me by getting a Digital Ocean VPS. With this referral link you'll get $200 credit for 60 days. Spend $25 after your credit expires and I'll get $25!
You can easily customise and template this class so that it has a generic queue and accepts a callback. By doing so, you can inherit from it in any of your own classed that need to do some batch work. For this example I've chosen not to do so since that would make it more complex.
For even more complex stuff I have great experiences using concurrentqueue.
The class explained
This class, classWithWorkerThread
, is a multi-threaded worker model that
processes a queue of work items one by one using a dedicated thread. The
class has a std::queue
to store work items. For this example those work
items are of type exampleWorkItem
, which have a name and a wait time
(in seconds). The items to work on and the work-method is completely up to
you. Could be resizing images, processing messages, whatever you fancy.
The constructor starts a worker thread, which runs the method
functionThatProcessesWorkOnThread()
. The worker thread uses a
std::unique_lock
with a std::condition_variable
to wait for new work to
be added to the queue or for a shutdown signal.
The unique_lock
is released during the condition_variable
wait, allowing
other threads to modify the queue. Once notified, two things happen in the
worker thread. First it checks if it was not a spurious wakeup
by making sure the work queue is not empty and the thread is not shutting
down. Then the thread proceeds to process the next item. If no work is
available, it waits again, preventing unnecessary CPU consumption.
The addSomethingToProcess
method is used to add work items to the queue. It
locks the mutex
to ensure thread safety, adds the new work item to the
queue, and then notifies the worker thread that new work is available via the
condition_variable
. After adding the item, the lock is released, allowing
the worker thread to process the item.
When the class is destroyed, the destructor ensures that the worker thread is
stopped gracefully. It sets the _threadRunning
flag to false, notifies the
worker thread, and joins the worker thread to wait for its completion before
exiting, ensuring that all resources are cleaned up properly.
The worker thread itself waits on the condition_variable
and processes the
work items one by one. It first pop's
an item from the queue, then the
mutex
is temporarily released during processing of the item to allow others
to add more items to the queue. Processing of work items can be time
consuming and we don't want others to wait before adding items. Once the work
is completed, the worker thread starts again, waiting on the condition_variable
.
Note that a std::queue
itself is not thread safe. But due to my use of mutex
locks in this class there are no race conditions. Reading or writing via
other methods without proper locking could result in race conditions.
The code
This is the class. For this article example it is header only and the work processing is based on example items.
#pragma once
/*
* Copyright (c) 2024 Remy van Elst <raymii.org>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, version 3.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
#include <atomic>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <queue>
#include <string>
#include <iostream>
using namespace std::chrono_literals;
namespace raymii {
struct exampleWorkItem {
std::string name;
int waitSec = 5;
};
class classWithWorkerThread {
public:
classWithWorkerThread()
{
// Start worker thread
_threadRunning = true;
_workerThread = std::thread(&classWithWorkerThread::functionThatProcessesWorkOnThread, this);
}
classWithWorkerThread(const classWithWorkerThread&) = delete;
classWithWorkerThread& operator=(const classWithWorkerThread&) = delete;
classWithWorkerThread(classWithWorkerThread&&) = delete;
classWithWorkerThread& operator=(classWithWorkerThread&&) = delete;
~classWithWorkerThread()
{
// Stop worker thread
{
std::lock_guard<std::mutex> lock(_threadMutex);
_threadRunning = false;
}
_threadCV.notify_one();
if (_workerThread.joinable()) {
_workerThread.join();
}
}
void addSomethingToProcess(const exampleWorkItem &work)
{
{ // scoped lock mutex before adding work item
std::lock_guard<std::mutex> lock(_threadMutex);
// add item to queue
_itemsToProcess.emplace(work);
} // lock is out of scope here
// notify worker thread that new work is available
_threadCV.notify_one();
}
private:
// methods related to Worker Thread
void functionThatProcessesWorkOnThread()
{
// only process work while running. Otherwise, exit method.
while (_threadRunning) {
// Acquire mutex. Is unlocked during the cv wait call. When woken up, its locked again
std::unique_lock<std::mutex> lock(_threadMutex);
// wait until notified that new work is available.
_threadCV.wait(lock, [&] {
// When woken, check if it's not a spurious wakeup.
// Either there is work to process: !_itemsToProcess.empty()
// or the thread is stopping: !_threadRunning
return !_itemsToProcess.empty() || !_threadRunning;
});
if (!_threadRunning) // extra check to see if we're stopped after wakeup
break; // if so, gracefully do an early exit.
// Start processing work item. Make sure queue is not empty
if (!_itemsToProcess.empty())
{
// Retrieve item from queue
exampleWorkItem workItem = std::move(_itemsToProcess.front());
_itemsToProcess.pop();
// Unlock mutex because processing can be time-consuming
lock.unlock();
// Process Item
std::cout << "Start Processing item " << workItem.name << ", sleeping " << std::to_string(workItem.waitSec) << " seconds " ;
for (int i = 0; i < workItem.waitSec; ++i)
{
std::cout << ".";
std::this_thread::sleep_for(1s);
}
std::cout << "; Finished Processing item " << workItem.name << ";" << std::endl;
// Lock mutex again after finishing time-consuming work
lock.lock();
}
// End processing work item.
}
}
std::queue<exampleWorkItem> _itemsToProcess;
std::thread _workerThread;
std::mutex _threadMutex;
std::condition_variable _threadCV;
std::atomic<bool> _threadRunning = false;
};
} //namespace raymii
Here is an example main.ccp
program that uses this class. If you press
ENTER
5 work items are added to the queue and processed. If you type exit
the program quits. You can watch work being done while you can enter any
text. If you enter exit
, the program finishes the current work item and
then quits without crashing.
int main() {
raymii::classWithWorkerThread exampleClass;
std::cout << "Press ENTER to add 5 work items. "
"Type 'exit' and press ENTER to quit.\n";
std::string input;
int taskCounter = 1;
while (true) {
std::getline(std::cin, input);
if (input == "exit") break;
std::cout << "Adding 5 work items..." << std::endl;
for (int i = 0; i < 5; ++i) {
raymii::exampleWorkItem item;
item.name = "Task_" + std::to_string(taskCounter++);
item.waitSec = 5 + (i % 10);
exampleClass.addSomethingToProcess(item);
}
}
std::cout << "Exiting program.\n";
return 0;
}
Example output:
Press ENTER to add 5 work items. Type 'exit' and press ENTER to quit.
** PRESSES ENTER**
Adding 5 work items...
Start Processing item Task_1, sleeping 5 seconds .....; Finished Processing item Task_1;
Start Processing item Task_2, sleeping 6 seconds ......; Finished Processing item Task_2;
Start Processing item Task_3, sleeping 7 seconds .......; Finished Processing item Task_3;
Start Processing item Task_4, sleeping 8 seconds ........; Finished Processing item Task_4;
Start Processing item Task_5, sleeping 9 seconds .........; Finished Processing item Task_5;
** PRESSES ENTER AGAIN**
Adding 5 work items...
Start Processing item Task_6, sleeping 5 seconds .....; Finished Processing item Task_6;
** TYPES exit**
Start Processing item Task_7, sleeping 6 seconds ..Exiting program.
....; Finished Processing item Task_7;
Process finished with exit code 0
Tags: async
, c++
, cpp
, development
, performance
, software
, thread