Qt 任务调度器

近期刚刚完成的C/S端远程监控软件,架构为1个TcpClt对应N个TcpSvr。Clt可向Svr发送指令,Svr接收到指令执行,并通过报文反馈进度。

过去,软件管理的个体是每个Svr;现在需要将若干Svr合并为一个群体,各Svr之间执行指令有先后顺序。

因此过去只需单线程一次性向各Svr发送报文,只需在界面所在线程处理。现在需要有一个调度(等待)的过程,要在子线程里处理。

需要解决的问题如下:

  1. 定时调度
  2. 跨线程更新界面。

首先看问题2:跨线程更新界面。

Qt可以用signal/slot机制,跨线程传递信号。

1 Schedler *pSchd;
2 QDialog *pDlg;
3 //...
4 connect(pSchd, SIGNAL(notify(const QString &, const QString &)), pDlg, SLOT(updateUI(const QString &, const QString &)), Qt::QueuedConnection);

其中最后一个参数Qt::QueuedConnection适用于跨线程调用,将信号传递到消息队列中,由QAppliction.exec()或QThread.exec()等函数调用。

该参数可缺省,缺省为Qt::AutoConnection。Qt::AutoConnection根据emitter/receiver是否在同一线程内,自动决定连接方式:当位于同一线程时,采用Qt::DirectConnection(直连方式,立即执行);位于不同线程时,采用Qt::QueuedConnection。

有关Qt::connectionType,QAssistant解释如下:

enum Qt::ConnectionType

This enum describes the types of connection that can be used between signals and slots. In particular, it determines whether a particular signal is delivered to a slot immediately or queued for delivery at a later time.

ConstantValueDescription
Qt::AutoConnection 0 (default) Same as DirectConnection, if the emitter and receiver are in the same thread. Same as QueuedConnection, if the emitter and receiver are in different threads.
Qt::DirectConnection 1 The slot is invoked immediately, when the signal is emitted.
Qt::QueuedConnection 2 The slot is invoked when control returns to the event loop of the receiver's thread. The slot is executed in the receiver's thread.
Qt::BlockingQueuedConnection 4 Same as QueuedConnection, except the current thread blocks until the slot returns. This connection type should only be used where the emitter and receiver are in different threads.Note: Violating this rule can cause your application to deadlock.
Qt::UniqueConnection 0x80 Same as AutoConnection, but the connection is made only if it does not duplicate an existing connection. i.e., if the same signal is already connected to the same slot for the same pair of objects, then the connection will fail. This connection type was introduced in Qt 4.6.
Qt::AutoCompatConnection 3 The default type when Qt 3 support is enabled. Same as AutoConnection but will also cause warnings to be output in certain situations. See Compatibility Signals and Slots for further information.

With queued connections, the parameters must be of types that are known to Qt's meta-object system, because Qt needs to copy the arguments to store them in an event behind the scenes. If you try to use a queued connection and get the error message:

 QObject::connect: Cannot queue arguments of type 'MyType'

Call qRegisterMetaType() to register the data type before you establish the connection.

When using signals and slots with multiple threads, see Signals and Slots Across Threads.

See also Thread Support in Qt, QObject::connect(), and qRegisterMetaType().

回到问题1:调度器需要满足以下条件:

  1. 前一个任务未执行到关键点,禁止下一个任务开始。
  2. 前一个任务执行失败,中断剩余任务。
  3. 所有任务都开始执行后,需等待任务执行结束。

根据以上原则,定义调度器如下:

class AberScheduler : public QObject
{
    Q_OBJECT
public:
    AberScheduler();
    ~AberScheduler();

    int process();    //执行调度
    int init(QList<AberInfo> infoList, const QString &cmdType);

protected:
    void sortMachine();
    int goFirst();
    int goNext();
    bool end();

    void TryGoNext();
    bool IsAllDone();

protected:
    void OnBegin();
    void OnEnd();
    void UpdateAllStatus();    //从通讯线程获取状态
    void UpdateAllUI();    //更新界面线程

signals:
   int sendAberOrder(const QString &machineName, const QString &orderType);
   int notify(const QString &machineName, const QString &progressInfo);
   int updateSingleStatus(const QString &machineName, int *next, int *status, double *progress);
   int setUIDisabled(const QString &machineName, bool bDisabled); 

private:
    QList<AberInfo> m_aberInfoList;
    int m_index;
    QString m_cmdType;
};

机器信息结构体定义如下:

struct AberInfo
{
    QString machineName;    
    int machineType;
    int next;    //下一台机器是否可以开始: -1:叵;0:可;1:中断所有
    int status;    //命令执行状态: 0:未开始;1:正在执行;2:成功;3:失败
    double progress;    //进度,范围: [0.00, 1.00]
};

 核心调度代码如下:

int AberScheduler::process()
{
    int ret_next = -1;
    bool bAllDone = false;

    if(this->m_infoList.count() <= 0)
    {
        return 0;
    }

    OnBegin();

    goFirst();
    UpdateAllUI();

    const AberInfo &info = this.m_infoList[this->m_index];
    emit sendOrder(info.machineName, this->m_cmdType);
    Sleep(5000);

    while(!end())
    {
        Sleep(1000);
        UpdateAllStatus();
        UpdateAllUI();

        ret_next = TryGoNext();
        if(ret_next == 0)  //可以进行下一个
        {
            int ret_go_next = goNext();
            if(ret_go_next == -1)
            {
                onEnd();
                return -1;
            }
            if(!end())
            {
                const AberInfo &info = this.m_infoList[this->m_index];
                emit sendOrder(info.machineName, this->cmd_type);
                Sleep(5000);
            }
            else
            {
                break;
            }
        }
        else if(ret_next == -1)  //中断全部
        {
            UpdateAllStatus();
            UpdateAllUI();
            onEnd();
            return 1;
        }
        bAllDone = IsAllDone();
        if(bAllDone)
        {
            break;
        }
    }

    while(!bAllDone)
    {
        Sleep(1000);
        UpdateAllStatus();
        UpdateAllUI();
        bAllDone = IsAllDone();
    }

    UpdateAllStatus();
    UpdateAllUI();
    onEnd();

}

与发送信号有关的函数:

void AberScheduler::UpdateAllStatus()
{
    //只更新index及之前的机器
    for(int i=0; i <= this->m_index && i < this->m_infoList.count(); i++)
    {
        AberInfo &info = this->m_infoList[i];
        emit UpdateStatus(this->m_cmd_type, info.machineName, &info.next, &info.status, &info.progress);
    }
}

void AberScheduler::UpdateAllUI()
{
    foreach(AberInfo info, this->m_infoList)
    {
        emit notify(info.machineInfo, info.progress, info.status);
    }
}

void AberScheduler::OnBegin()
{
  foreach(AberInfo info, this->m_infoList)
  {
    emit setDisabled(info.machineInfo, true);
  }
}


void AberScheduler::OnEnd()
{
  foreach(AberInfo info, this->m_infoList)
  {
    emit setDisabled(info.machineInfo, false);
  }
}

int AberScheduler::TryGoNext() { AberInfo curInfo = this->m_infoList[m_index]; //??? return curInfo.next; }

使用线程包装AberScheduler,使其能适用于跨线程更新界面。

class AberThread : public QRunnable
{
public:
    AberThread();
    ~AberThread();
    void init(const QList<AberInfo> &infoList, const QString &cmdType);

public:
    void run();
    AberScheduler *schd();

private:
    AberScheduler *m_shcd;
};

void void AberThread::run()
{
  m_schd->process();
}

界面连接signal/slot如下:

void AberDialog::doSth()
{
    QList<AberInfo> infoList;
    //...
    AberThread *bThread = new AberThread;
    bThread->init(infoList, "cmdType1");

    AberScheduler *p_schd = bThread->schd();
    connect(p_schd, SIGNAL(sendAberOrder(const QString &, const QString &)), this, SLOT(sendAberOrder(const QString &, const QString &)));
    connect(p_schd, SIGNAL(notify(const QString &, const QString &)), this, SLOT(updateSingleUI(const QString &, const QString &)));
    connect(p_schd, SIGNAL(updateSingleStatus(const QString &, int *, int *, double *)), this, SLOT(getSingleStatus(const QString &, int *, int *, double *)));
connect(p_schd, SIGNAL(setUIDisabled(const QString &, bool)), this, SLOT(setUIDisabled(const QString &, bool)));


QThreadPool::globalInstance()->start(bThread); }

 QTheadPool是Qt自带的线程池,只能处理QRunnable,而不能处理QThread,可设置同时运行任务的数量,超过数量的线程会在队列中排队。并且无法强制终止QRunnable。如果要实现调度的强制中断,可以采用类似全局变量的方式,在process循环中读取g_flag,若为false,则终止调度(代码就不贴了)。


总结:

1. QRunnable包装AberScheduler,利用多线程防止界面卡死。

2. AberScheduler继承QObject,并使用signal/slot机制,对处理逻辑与界面解耦。

3. 利用QThreadPool实现多个任务同时调度。

P.S.

1. VS2008无法对QObject::connect()函数、SIGNAL宏、SLOT宏做语法检查。即使写了不存在的函数也能编译通过,但信号无法传递,此处极易出错。

原文地址:https://www.cnblogs.com/daxia319/p/4797314.html