Skip to main content

Raymii.org Raymii.org Logo

Quis custodiet ipsos custodes?
Home | About | All pages | Cluster Status | RSS Feed

My go-to C++ code for asynchronous work processing on a separate thread

Published: 17-12-2024 23:59 | Author: Remy van Elst | Text only version of this article


Table of Contents


threads

You probably recognise this situation. You're working on your code and realise that the thing you're writing might take long, be blocking or is batch-wise. It might be resizing images, calling some API or processing hardware inputs or a stream of incoming messages. These tasks, if not handled efficiently, can halt your application, leaving it unresponsive. To avoid this, one solution is to offload these time-consuming operations to a separate thread, allowing the main application to continue executing without interruptions. In this article, I'll show you how you can implement asynchronous work processing in C++ using a worker thread. This example class is my go-to for this situation and is easily adapted to handle more complex use cases. It has a queue of work items and uses a std::thread, a std::mutex combined with a std::condition_variable to manage work asynchronously, processing items one by one.

Recently I removed all Google Ads from this site due to their invasive tracking, as well as Google Analytics. Please, if you found this content useful, consider a small donation using any of the options below:

I'm developing an open source monitoring app called Leaf Node Monitoring, for windows, linux & android. Go check it out!

Consider sponsoring me on Github. It means the world to me if you show your appreciation and you'll help pay the server costs.

You can also sponsor me by getting a Digital Ocean VPS. With this referral link you'll get $200 credit for 60 days. Spend $25 after your credit expires and I'll get $25!

You can easily customise and template this class so that it has a generic queue and accepts a callback. By doing so, you can inherit from it in any of your own classed that need to do some batch work. For this example I've chosen not to do so since that would make it more complex.

For even more complex stuff I have great experiences using concurrentqueue.

The class explained

This class, classWithWorkerThread, is a multi-threaded worker model that processes a queue of work items one by one using a dedicated thread. The class has a std::queue to store work items. For this example those work items are of type exampleWorkItem, which have a name and a wait time (in seconds). The items to work on and the work-method is completely up to you. Could be resizing images, processing messages, whatever you fancy.

The constructor starts a worker thread, which runs the method functionThatProcessesWorkOnThread(). The worker thread uses a std::unique_lock with a std::condition_variable to wait for new work to be added to the queue or for a shutdown signal.

The unique_lock is released during the condition_variable wait, allowing other threads to modify the queue. Once notified, two things happen in the worker thread. First it checks if it was not a spurious wakeup by making sure the work queue is not empty and the thread is not shutting down. Then the thread proceeds to process the next item. If no work is available, it waits again, preventing unnecessary CPU consumption.

The addSomethingToProcess method is used to add work items to the queue. It locks the mutex to ensure thread safety, adds the new work item to the queue, and then notifies the worker thread that new work is available via the condition_variable. After adding the item, the lock is released, allowing the worker thread to process the item.

When the class is destroyed, the destructor ensures that the worker thread is stopped gracefully. It sets the _threadRunning flag to false, notifies the worker thread, and joins the worker thread to wait for its completion before exiting, ensuring that all resources are cleaned up properly.

The worker thread itself waits on the condition_variable and processes the work items one by one. It first pop's an item from the queue, then the mutex is temporarily released during processing of the item to allow others to add more items to the queue. Processing of work items can be time consuming and we don't want others to wait before adding items. Once the work is completed, the worker thread starts again, waiting on the condition_variable.

Note that a std::queue itself is not thread safe. But due to my use of mutex locks in this class there are no race conditions. Reading or writing via other methods without proper locking could result in race conditions.

The code

This is the class. For this article example it is header only and the work processing is based on example items.

#pragma once

/*
 * Copyright (c) 2024 Remy van Elst <raymii.org>
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, version 3.
 *
 * This program is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
 * General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 *
 */

#include <atomic>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <queue>
#include <string>
#include <iostream>

using namespace std::chrono_literals;

namespace raymii {

struct exampleWorkItem {
    std::string name;
    int waitSec = 5;
};

class classWithWorkerThread {
public:

    classWithWorkerThread()
    {
        // Start worker thread
        _threadRunning = true;
        _workerThread = std::thread(&classWithWorkerThread::functionThatProcessesWorkOnThread, this);
    }

    classWithWorkerThread(const classWithWorkerThread&) = delete;
    classWithWorkerThread& operator=(const classWithWorkerThread&) = delete;
    classWithWorkerThread(classWithWorkerThread&&) = delete;
    classWithWorkerThread& operator=(classWithWorkerThread&&) = delete;

    ~classWithWorkerThread()
    {
        // Stop worker thread
        {
            std::lock_guard<std::mutex> lock(_threadMutex);
            _threadRunning = false;
        }
        _threadCV.notify_one();
        if (_workerThread.joinable()) {
            _workerThread.join();
        }
    }

    void addSomethingToProcess(const exampleWorkItem &work)
    {
        { // scoped lock mutex before adding work item
            std::lock_guard<std::mutex> lock(_threadMutex);
            // add item to queue
            _itemsToProcess.emplace(work);
        } // lock is out of scope here

        // notify worker thread that new work is available
        _threadCV.notify_one();
    }

private:
    // methods related to Worker Thread
    void functionThatProcessesWorkOnThread()
    {
        // only process work while running. Otherwise, exit method.
        while (_threadRunning) {
            // Acquire mutex. Is unlocked during the cv wait call. When woken up, its locked again
            std::unique_lock<std::mutex> lock(_threadMutex);

            // wait until notified that new work is available.
            _threadCV.wait(lock, [&] {
                // When woken, check if it's not a spurious wakeup.
                // Either there is work to process: !_itemsToProcess.empty()
                // or the thread is stopping: !_threadRunning
                return !_itemsToProcess.empty() || !_threadRunning;
            });

            if (!_threadRunning) // extra check to see if we're stopped after wakeup
                break; // if so, gracefully do an early exit.

            // Start processing work item. Make sure queue is not empty
            if (!_itemsToProcess.empty())
            {
                // Retrieve item from queue
                exampleWorkItem workItem = std::move(_itemsToProcess.front());
                _itemsToProcess.pop();

                // Unlock mutex because processing can be time-consuming
                lock.unlock();

                // Process Item
                std::cout << "Start Processing item " << workItem.name << ", sleeping " << std::to_string(workItem.waitSec) << " seconds " ;
                for (int i = 0; i < workItem.waitSec; ++i)
                {
                    std::cout << ".";
                    std::this_thread::sleep_for(1s);
                }
                std::cout << "; Finished Processing item " << workItem.name << ";" << std::endl;

                // Lock mutex again after finishing time-consuming work
                lock.lock();
            }
            // End processing work item.
        }
    }

    std::queue<exampleWorkItem> _itemsToProcess;
    std::thread _workerThread;
    std::mutex _threadMutex;
    std::condition_variable _threadCV;
    std::atomic<bool> _threadRunning = false;
};

} //namespace raymii

Here is an example main.ccp program that uses this class. If you press ENTER 5 work items are added to the queue and processed. If you type exit the program quits. You can watch work being done while you can enter any text. If you enter exit, the program finishes the current work item and then quits without crashing.

int main() {

    raymii::classWithWorkerThread exampleClass;

    std::cout << "Press ENTER to add 5 work items. "
                 "Type 'exit' and press ENTER to quit.\n";

    std::string input;
    int taskCounter = 1;

    while (true) {
        std::getline(std::cin, input);
        if (input == "exit") break;

        std::cout << "Adding 5 work items..." << std::endl;
        for (int i = 0; i < 5; ++i) {
            raymii::exampleWorkItem item;
            item.name = "Task_" + std::to_string(taskCounter++);
            item.waitSec = 5 + (i % 10);

            exampleClass.addSomethingToProcess(item);
        }
    }

    std::cout << "Exiting program.\n";

    return 0;
}

Example output:

Press ENTER to add 5 work items. Type 'exit' and press ENTER to quit.
** PRESSES ENTER**

Adding 5 work items...
Start Processing item Task_1, sleeping 5 seconds .....; Finished Processing item Task_1;
Start Processing item Task_2, sleeping 6 seconds ......; Finished Processing item Task_2;
Start Processing item Task_3, sleeping 7 seconds .......; Finished Processing item Task_3;
Start Processing item Task_4, sleeping 8 seconds ........; Finished Processing item Task_4;
Start Processing item Task_5, sleeping 9 seconds .........; Finished Processing item Task_5;

** PRESSES ENTER AGAIN**
Adding 5 work items...
Start Processing item Task_6, sleeping 5 seconds .....; Finished Processing item Task_6;
** TYPES exit**
Start Processing item Task_7, sleeping 6 seconds ..Exiting program.
....; Finished Processing item Task_7;

Process finished with exit code 0
Tags: async , c++ , cpp , development , performance , software , thread