10分钟阅读

关于C ++中的Qt Multithreading的遗失文章

Andrei在Moderty,EMC,摩托罗拉和Deutsche Bank上有15年以上的移动,桌面和Web使用C ++,C#和JS。

C ++开发人员 努力构建强大的多线程QT应用,但多线程对于所有这些竞争条件,同步和死锁和Livelocks绝不容易。为了你的信用,你不会放弃并发现自己彻底冲洗堆叠流。尽管如此,从十几个不同的答案中挑选正确和工作的解决方案是相当不琐碎的,特别是每个解决方案都有自己的缺点。

多线程是一个广泛的编程和执行模型,允许在一个过程的上下文中存在多个线程。这些线程共享过程资源,但能够独立执行。线程编程模型为开发人员提供了具有同时执行的有用抽象的开发人员。多线程也可以应用于一个过程以在多处理系统上启用并行执行。

维基百科

本文的目标是通过QT框架汇总关于并发编程的基本知识,特别是最误解的主题。预计读者将在Qt和C ++中具有以前的背景以了解内容。

Choosing between using qthreadpool. qthread.

Qt. Framework提供了许多用于多线程的工具。挑选合适的工具首先可以具有挑战性,但实际上,决策树由两个选项组成:您要么想要Qt要为您管理线程,或者您要自行管理线程。但是,还有其他重要标准:

  1. 不需要事件循环的任务。具体地,在任务执行期间不使用信号/插槽机制的任务。
    用: QTConcurrent. qthreadpool. + qrunnable. .

  2. 使用信号/插槽的任务,因此需要事件循环。
    使用:Worker对象移动到+ qthread. .

The great flexibility of the Qt framework allows you to work around the “missing event loop” problem and to add one to qrunnable. :

class MyTask : public QObject, public QRunnable
{
    Q_OBJECT
    
public:
    void MyTask::run() {
        _loop.exec();  
    }
    
public slots:
    // you need a signal connected to this slot to exit the loop,
    // otherwise the thread running the loop would remain blocked...
    void finishTask() {
        _loop.exit();
    }
    
private:
    QEventLoop _loop;
}

尽管如此,试图避免这种“解决方法”,因为这些是危险的,而且没有高效:如果由于等待信号而阻止来自线程池(运行MyTask)的线程,则无法执行池的其他任务。

 alt text.

You can also run a qthread. without any event loop by overriding qthread. ::run() method and this is perfectly fine as long as you know what you are doing. For example, do not expect method quit() to work in such case.

一次运行一个任务实例

想象一下,您需要确保只能执行一次一个任务实例,并且所有待处理的运行请求都在某个队列等待。当任务访问独占资源时,通常需要这一点,例如使用TCP套接字写入同一文件或发送数据包时。

让我们忘记电脑科学和生产者 - 消费者模式一会儿,考虑一些微不足道的东西;在实际项目中可以轻松找到的东西。

A naïve solution to this problem could be using a QMutex. Inside the task function, you could simply acquire the mutex effectively serializing all threads attempting to run the task. This would guarantee that only one thread at a time could be running the function. However, this solution impacts the performance by introducing 高争用 问题是因为所有这些线程都将被阻止(在互斥锁中)之前。如果您有许多线程使用这样的任务并在介于两者之间进行一些有用的工作,那么所有这些线程都会刚刚睡觉。

void logEvent(const QString & event) {  
    static QMutex lock;  
    QMutexLocker locker(& lock);   // high contention!  
    logStream << event;            // exclusive resource  
}  

为避免争用,我们需要一个队列和一个生活在自己的线程中并处理队列的工作者。这几乎是经典的 生产者消费者 图案。工人 ( 消费者 )将逐个逐个挑选队列的请求 生产者 can simply add its requests into the queue. Sounds simple at first and you may think of using QQueueQWaitCondition, but hold on and let’s see if we can achieve the goal without these primitives:

  • We can use qthreadpool. as it has a queue of pending tasks

或者

  • We can use default qthread. ::run() because it has QEventLoop

The first option is to use qthreadpool. . We can create a qthreadpool. instance and use qthreadpool. ::setMaxThreadCount(1). Then we can be using QTConcurrent.::run() to schedule requests:

class Logger: public QObject
{
public:
    explicit Logger(QObject *parent = nullptr) : QObject(parent) {
        threadPool.setMaxThreadCount(1);
    }

    void logEvent(const QString &event) {
        QtConcurrent::run(&threadPool, [this, event]{
            logEventCore(event);
        });
    }

private:
    void logEventCore(const QString &event) {
        logStream << event;
    }

    QThreadPool threadPool;
};

This solution has one benefit: qthreadpool. ::clear() allows you to instantly 取消 所有待处理请求,例如当您的应用程序需要快速关闭时。但是,还有一个重要的缺点是连接的 螺纹亲和力: logEventCore function will be likely executing in different threads from call to call. And we know Qt has some classes that require 螺纹亲和力: QTimer, QTcpSocket 和 possibly some others.

Qt. 规范关于线程 - 关联:计时器在一个线程中开始,不能从另一个线程停止。并且只有拥有套接字实例的线程可以使用此套接字。这意味着您必须在启动它们的线程中停止任何运行的计时器,并且必须在拥有套接字的线程中调用qtcpsocket :: close()。这两个例子通常在析构函数中执行。

The better solution relies on using QEventLoop provided by qthread. . The idea is simple: we use a signal/slot mechanism to issue requests, and the event loop running inside the thread will serve as a queue allowing just one slot at a time to be executed.

// the worker that will be moved to a thread
class LogWorker: public QObject
{
    Q_OBJECT

public:
    explicit LogWorker(QObject *parent = nullptr);

public slots:
    // this slot will be executed by event loop (one call at a time)
    void logEvent(const QString &event);
};

Implementation of LogWorker constructor and logEvent is straightforward and therefore not provided here. Now we need a service that will be managing the thread and the worker instance:

// interface
class LogService : public QObject
{
    Q_OBJECT
    
public:
    explicit LogService(QObject *parent = nullptr);
    ~LogService();

signals:
    // to use the service, just call this signal to send a request:
    // logService->logEvent("event");
    void logEvent(const QString &event);

private:
    QThread *thread;
    LogWorker *worker;
};

// implementation
LogService::LogService(QObject *parent) : QObject(parent) {
    thread = new QThread(this);
    worker = new LogWorker;
    worker->moveToThread(thread);
    connect(this, &LogService::logEvent, worker, &LogWorker::logEvent);
    connect(thread, &QThread::finished, worker, &QObject::deleteLater);
    thread->start();
}

LogService::~LogService() {
    thread->quit();
    thread->wait();
}

 alt text.

让我们讨论这个代码如何工作:

  • In the constructor, we create a thread and worker instance. Notice that the worker does not receive a parent, because it will be moved to the new thread. Because of this, Qt won’t be able to release the worker’s memory automatically, and therefore, we need to do this by connecting qthread. ::finished signal to deleteLater slot. We also connect the proxy method LogService::logEvent() to LogWorker::logEvent() which will be using Qt. ::QueuedConnection mode because of different threads.
  • In the destructor, we put the quit event into the event loop’s queue. This event will be handled all other events are handled. For example, if we have made hundreds of logEvent() calls just prior to the destructor call, the logger will handle them all before it fetches the quit event. This takes time, of course, so we must wait() until the event loop exits. It is worth mentioniong that all future logging requests posted 永远不会处理退出事件。
  • The logging itself (LogWorker::logEvent) will always be done in the same thread, therefore this approach is working well for classes requiring 螺纹亲和力. At the same time, LogWorker constructor and destructor are executed in the main thread (specifically the thread LogService is running in), and therefore, you need to be very careful about what code you are running there. Specifically, do not stop timers or use sockets in the worker’s destructor unless you could be running the destructor in the same thread!

在同一线程中执行工作者的析构函数

If your worker is dealing with timers or sockets, you need to ensure the destructor is executed in the same thread (the thread you created for the worker and where you moved the worker to). The obvious way to support this is to subclass qthread. delete worker inside qthread. ::run() method. Consider the following template:

模板<typename TWorker>
类线程:qthread
{
public:
    explicit Thread(TWorker *worker, QObject *parent = nullptr)
        : QThread(parent), _worker(worker) {
        _worker->moveToThread(this);
        start();
    }

    ~Thread() {
        quit();
        wait();
    }

    TWorker worker() const {
        return _worker;
    }

protected:
    void run() override {
        QThread::run();
        delete _worker;
    }

private:
    TWorker *_worker;
};

Using this template, we redefine LogService from the previous example:

// interface
class LogService : public Thread<LogWorker>
{
    Q_OBJECT

public:
    explicit LogService(QObject *parent = nullptr);

signals:
    void **logEvent**(const QString &event);
};

// implementation
LogService::**LogService**(QObject *parent)
    : Thread<LogWorker>(new LogWorker, parent) {
    connect(this, &LogService::logEvent, worker(), &LogWorker::logEvent);
}

让我们讨论这应该是如何工作的:

  • We made LogService to be the qthread. object because we needed to implement the custom run() function. We used private subclassing to prevent accessing qthread. ’s functions since we want to control thread’s lifecycle internally.
  • In Thread::run() function we run the event loop by calling the default qthread. ::run() implementation, and destroy the worker instance right after the event loop has quit. Note that the worker’s destructor is executed in the same thread.
  • LogService::logEvent() 是将日志记录事件发布到线程的事件队列的代理函数(信号)。

暂停和恢复线程

另一个有趣的机会是能够暂停和恢复自定义线程。想象一下,当应用程序最小化,锁定或丢失网络连接时,您的应用程序正在进行一些需要暂停的处理。这可以通过构建一个自定义异步队列来实现,该队列将持有所有挂起请求,直到恢复工作者。但是,由于我们正在寻找最简单的解决方案,我们将使用(再次)事件循环的队列以获取相同的目的。

To suspend a thread, we clearly need it to wait on a certain waiting condition. If the thread is blocked this way, its event loop is not handling any events and Qt has to put keep in the queue. Once resumed, the event loop will be processing all accumulated requests. For the waiting condition, we simply use QWaitCondition object that also requires a QMutex. To design a generic solution that could be reused by any worker, we need to put all suspend/resume logic into a reusable base class. Let’s call it SuspendableWorker. Such a class shall support two methods:

  • suspend() would be a blocking call that sets the thread waiting on a waiting condition. This would be done by posting a suspend request into the queue and waiting until it is handled. Pretty much similar to qthread. ::quit() + wait().
  • resume() 会发出等待条件唤醒睡眠线程以继续执行。

让我们查看界面和实现:

// interface
class SuspendableWorker : public QObject
{
    Q_OBJECT

public:
    explicit SuspendableWorker(QObject *parent = nullptr);
    ~SuspendableWorker();

    // resume() must be called from the outer thread.
    void resume();

    // suspend() must be called from the outer thread.
    // the function would block the caller's thread until
    // the worker thread is suspended.
    void suspend();

private slots:
    void suspendImpl();

private:
    QMutex _waitMutex;
    QWaitCondition _waitCondition;
};
// implementation
SuspendableWorker::SuspendableWorker(QObject *parent) : QObject(parent) {
    _waitMutex.lock();
}

SuspendableWorker::~SuspendableWorker() {
    _waitCondition.wakeAll();
    _waitMutex.unlock();
}

void SuspendableWorker::resume() {
    _waitCondition.wakeAll();
}

void SuspendableWorker::suspend() {
    QMetaObject::invokeMethod(this, &SuspendableWorker::suspendImpl);
    // acquiring mutex to block the calling thread
    _waitMutex.lock();
    _waitMutex.unlock();
}

void SuspendableWorker::suspendImpl() {
    _waitCondition.wait(&_waitMutex);
}

Remember that a suspended thread will never receive a quit event. For this reason, we cannot use this safely with vanilla qthread. unless we resume the thread before posting quit. Let’s integrate this into our custom Thread<T> template to make it bulletproof.

 alt text.

模板<typename TWorker>
类线程:qthread
{
public:
    explicit Thread(TWorker *worker, QObject *parent = nullptr)
        : QThread(parent), _worker(worker) {
        _worker->moveToThread(this);
        start();
    }

    ~Thread() {
        resume();
        quit();
        wait();
    }

    void suspend() {
        auto worker = qobject_cast<SuspendableWorker*>(_worker);
        if (worker != nullptr) {
            worker->suspend();
        }
    }

    void resume() {
        auto worker = qobject_cast<SuspendableWorker*>(_worker);
        if (worker != nullptr) {
            worker->resume();
        }
    }

    TWorker worker() const {
        return _worker;
    }

protected:
    void run() override {
        QThread::*run*();
        delete _worker;
    }

private:
    TWorker *_worker;
};

With these changes, we will resume the thread before posting the quit event. Also, Thread<TWorker> still allows any kind of worker to be passed in regardless if it is a SuspendableWorker or not.

使用情况如下:

LogService logService;
logService.logEvent("processed event");
logService.suspend();
logService.logEvent("queued event");
logService.resume();
// "queued event" is now processed.

挥发性VS原子

This is a commonly misunderstood topic. Most people believe that volatile variables can be used to serve certain flags accessed by multiple threads and that this preserves from data race conditions. That is false, and QAtomic* classes (or std::atomic) must be used for this purpose.

Let’s consider a realistic example: a TcpConnection connection class that works in a dedicated thread, and we want this class to export a thread-safe method: bool isConnected(). Internally, the class will be listening to socket events: connecteddisconnected to maintain an internal boolean flag:

// pseudo-code, won't compile
class TcpConnection : QObject
{
    Q_OBJECT 

public:
    // this is not thread-safe!
    bool isConnected() const {
        return _connected;
    }
    
private slots:
    void handleSocketConnected() {
        _connected = true;
    }
    
    void handleSocketDisconnected() {
        _connected = false;
    }
    
private:
    bool _connected;
}

Making _connected member volatile is not going to solve the problem and is not going to make isConnected() thread-safe. This solution will work 99% of the time, but the remaining 1% will make your life a nightmare. To fix this, we need to protect the variable access from multiple threads. Let’s use QReadWriteLocker for this purpose:

// pseudo-code, won't compile
class TcpConnection : QObject
{
    Q_OBJECT 

public:
    bool isConnected() const {
        QReadLocker locker(&_lock);
        return _connected;
    }
    
private slots:
    void handleSocketConnected() {
        QWriteLocker locker(&_lock);
        _connected = true;
    }
    
    void handleSocketDisconnected() {
        QWriteLocker locker(&_lock);
        _connected = false;
    }
    
private:
    QReadWriteLocker _lock;
    bool _connected;
}

This works reliably, but not as fast as using “lock-free” atomic operations. The third solution is both fast and thread-safe (the example is using std::atomic instead of QAtomicInt, but semantically these are identical):

// pseudo-code, won't compile
class TcpConnection : QObject
{
    Q_OBJECT 

public:
    bool isConnected() const {
        return _connected;
    }
    
private slots:
    void handleSocketConnected() {
        _connected = true;
    }
    
    void handleSocketDisconnected() {
        _connected = false;
    }
    
private:
    std::atomic<bool> _connected;
}

结论

在本文中,我们讨论了对QT框架和设计解决方案的同时编程的几个重要问题,以解决特定用例。我们还没有考虑许多简单的主题,如使用原子基元,读写锁以及许多其他人,但如果您对这些感兴趣,请在下面留下您的评论并询问此类教程。

如果您有兴趣探索QMake,我最近也发表了 qmake的重要指南。这是一个伟大的阅读!

理解基础知识

多线程如何有用?

此多线程模型为开发人员提供了一个有用的抽象,以便并发执行。但是,它在应用于单个进程时确实闪烁:在多处理器系统上启用并行执行。

多线程是什么?

多线程是一个编程和执行模型,允许在单个进程的上下文中存在多个线程。这些线程共享过程资源,但能够独立执行。

什么是QT应用?

用Qt框架构建的应用程序。 Qt应用程序通常使用C ++构建,因为框架本身是用C ++构建的。但是,还有其他语言绑定,例如Python-Qt。

Qt. 支持哪些并发模型?

任务基于QThreadPool和QRUnnable和QRUNNABLES使用QThread类进行线程编程的并发性。

QT开发人员可以使用哪些同步基元?

最常用的是QMutex,qsemaphore和QReadWriteLock。 Qatomic *类别提供免费原子操作。