多线程网络文件传输
效果
【】sender 文件发送
可以选择多个文件,每个文件路径动态生成一个进度条和一个runner,并且将runner加入manager。
void AndySender::on_pbt_selectFile_clicked() { QFileDialog fileDialog(this);
//设置窗口的标题 fileDialog.setWindowTitle("选择要发送的文件"); fileDialog.setNameFilter("所有文件(*.*)"); //设置一个过滤器 //这个标志用来设置选择的类型,比如默认是单个文件。QFileDialog::ExistingFiles 多个文件,还可以用来选择文件夹QFileDialog::Directory。QFileDialog::ExistingFile 单个文件。注意这个ExistingFile,单词后面多了一个s 表示选择多个文件。要看清楚了。 fileDialog.setFileMode(QFileDialog::ExistingFiles); //弹出对话框 if (fileDialog.exec() == QDialog::Accepted) {
TaskManager *manager =TaskManager::getInstance(); //strPathList 返回值是一个list,如果是单个文件选择的话,只要取出第一个来就行了。 QStringList strPathList = fileDialog.selectedFiles(); foreach(QString path , strPathList) { qDebug()<<"添加文件上传任务:"<<path; AndyProgressBar * progressBar =new AndyProgressBar(); TaskRunner * runner =new TaskRunner; QString hostIp =ui->lineEdit->text(); if(hostIp=="") { hostIp ="127.0.0.1"; } runner->setHostIp(hostIp); runner->setTask(path,TaskRunner::uploadFile); connect(runner,SIGNAL(taskFinish(void*)),runner,SLOT(deleteLater())); connect(runner,SIGNAL(UpdatePercent(int)), progressBar,SLOT (on_UpdatePercent(int))); connect(runner,SIGNAL(UpdateMaximum(int)), progressBar,SLOT (on_UpdateMaximum(int))); connect(runner,SIGNAL(UpdateText(QString)), progressBar,SLOT(on_UpdateText(QString)));
progressBar->SetMaxRange(100); progressBar->SetFileName(path.split("/").last()); ui->vl_content->addWidget(progressBar); manager->addTask(runner); } } } |
【】TaskRunner 建立tcpsocket
建立tcpsocket
TaskRunner::TaskRunner() { send = new QTcpSocket(this); fileBytes = sentBytes = restBytes = 0; loadBytes = LOADBYTES; /* 连接已建立 -> 开始发数据 */ connect(send, SIGNAL(connected()), this, SLOT(start_transfer())); /* 数据已发出 -> 继续发 */ connect(send, SIGNAL(bytesWritten(qint64)), this, SLOT(continue_transfer(qint64))); /* socket出错 -> 错误处理 */ connect(send, SIGNAL(error(QAbstractSocket::SocketError)), this, SLOT(show_error(QAbstractSocket::SocketError))); } |
发送文件首部
/*
首部
=
总大小
+
文件名长度
+
文件名
*/
out << qint64(0) << qint64(0) << sfName;
/* 总大小加上首部的大小 */
fileBytes += buf.size();
TaskRunner::~TaskRunner() { SafeDelete(send); }
void TaskRunner::setHostIp(QString ip) { _ip =ip; } void TaskRunner::on_DoRequest() { if(type ==uploadFile) { // 建立连接 tcp send->connectToHost(QHostAddress(_ip), PORT); if(send->waitForConnected(1000)) { qDebug()<<"连接服务器成功!";
sentBytes = 0; }
} } void TaskRunner::setTask(QString path,TaskType _type) { fileName =path; type =_type; emit UpdateText("任务等待中"); } /*--- 开始传送 ---*/ void TaskRunner::start_transfer() { file = new QFile(fileName); if(!file->open(QFile::ReadOnly)) { emit UpdatePercent(0); emit UpdateText("文件打开失败"); qDebug() << "*** start_transfer(): File-Open-Error"; return; }
fileBytes = file->size(); emit UpdatePercent(0); emit UpdateText("连接已经建立");
QByteArray buf; QDataStream out(&buf, QIODevice::WriteOnly); out.setVersion(DATA_STREAM_VERSION);
/* 无路径文件名 */ QString sfName = fileName.right(fileName.size() - fileName.lastIndexOf('/') - 1); /* 首部 = 总大小 + 文件名长度 + 文件名 */ out << qint64(0) << qint64(0) << sfName; /* 总大小加上首部的大小 */ fileBytes += buf.size(); emit UpdateMaximum(fileBytes); /* 重写首部的前两个长度字段 */ out.device()->seek(0); out << fileBytes << (qint64(buf.size()) - 2 * sizeof(qint64)); /* 发送首部,计算剩余大小 */ restBytes = fileBytes - send->write(buf); }
/*--- 继续传输 ---*/ void TaskRunner::continue_transfer(qint64 sentSize) { sentBytes += sentSize; emit UpdatePercent(sentBytes); /* 还有数据要发 */ if(restBytes > 0) { /* 从文件读数据 */ QByteArray buf = file->read(qMin(loadBytes, restBytes)); /* 发送 */ restBytes -= send->write(buf); } else file->close(); /* 全部发送完 */ if(sentBytes == fileBytes) { send->close(); // 关socket fileName.clear(); // 清空文件名 emit UpdateText("发送完成"); emit taskFinish(nullptr); } }
/*--- 出错处理 ---*/ void TaskRunner::show_error(QAbstractSocket::SocketError) { qDebug() << "*** Socket Error"; send->close(); UpdateText("任务失败,稍后重试!"); UpdatePercent(0); fileName.clear(); emit taskFinish(nullptr); } |
【】TaskManager线程管理(单例)
生成线程列表
#pragma once #include "preheader.h"
class TaskRunner; class TaskManager : public QObject { Q_OBJECT
public: static TaskManager* getInstance();
void addTask(TaskRunner*); private: TaskManager(); ~TaskManager(); static TaskManager *m_instance;
QList<QThread*> _threadList; QList<TaskRunner*> _waittingTasks; //等待中的任务 QMap<QThread*,TaskRunner*> _runningTasks; //进行中的任务
private slots: void onUpdateTaskList(); void on_CircleTaskList(); }; |
生成三十个线程(使用匿名函数),每隔200毫秒检查有没有新的task加入waitlist,新的task加入线程列表中未使用的线程,task结束时,task结束时将runningTasks更新,
#include "TaskManager.h" #include "TaskRunner.h"
const int MaxThreadCount =30; TaskManager* TaskManager::m_instance = NULL; TaskManager::TaskManager() { auto initThreadList=[&]() { for(int i=0; i<MaxThreadCount; i++) { QThread *thread =new QThread(this); thread->start(); _threadList.push_back(thread); } }; initThreadList(); QTimer *timer=new QTimer(); connect(timer,SIGNAL(timeout()), this,SLOT(on_CircleTaskList())); timer->start(200);
}
TaskManager::~TaskManager() {
}
TaskManager* TaskManager::getInstance() { if(m_instance == NULL) m_instance = new TaskManager(); return m_instance; } void TaskManager::addTask(TaskRunner* runner) { _waittingTasks.push_front(runner); } void TaskManager::on_CircleTaskList() { auto circleTaskList =[&](QList<TaskRunner*>& _waitTasks, QList<QThread*> &threadList , QMap<QThread*,TaskRunner*> &runningTasks ) { while(_waitTasks.count()>0 && runningTasks.count() <threadList.count()) {
TaskRunner* runner = _waitTasks.takeFirst(); if(runner) { for(int i=0; i<threadList.count(); i++) { QThread* thread =threadList[i]; if(runningTasks.contains(thread)) continue; else { runner->moveToThread(thread); connect(runner, SIGNAL(taskFinish(void*)), this,SLOT(onUpdateTaskList())); runningTasks.insert(thread,runner); QTimer::singleShot(0,runner, &TaskRunner::on_DoRequest); break; } }
} } } ; circleTaskList(_waittingTasks,_threadList,_runningTasks); } void TaskManager::onUpdateTaskList() { TaskRunner * runner =(TaskRunner*)sender();
if(runner) { for(auto itr =_runningTasks.begin(); itr !=_runningTasks.end(); itr++) { if(itr.value() ==runner) { _runningTasks.erase(itr++); break; } } runner->deleteLater(); } } |
【】Receiver
【】TcpServer accept incoming TCP connections
为每一个连接生成一个TcpSocket 和一个线程,然后moveToThread,加入列表 QHash<int,TcpSocket *> * tcpClient;,在线程里面使用这个socket,必须使用incomingConnection,tcp断开后通知线程管理类,
Note: If you want to handle an incoming connection as a new QTcpSocket object in another thread you have to pass the socketDescriptor to the other thread and create the QTcpSocket object there and use its setSocketDescriptor() method.
#ifndef TCPSERVER_H #define TCPSERVER_H
#include <QTcpServer> #include <QHash> #include "tcpsocket.h"
//继承QTCPSERVER以实现多线程TCPscoket的服务器。 //如果socket的信息处理直接处理的话,很多新建的信号和槽是用不到的 class TcpServer : public QTcpServer { Q_OBJECT public: explicit TcpServer(QObject *parent = 0,int numConnections = 10000); ~TcpServer();
void setMaxPendingConnections(int numConnections);//重写设置最大连接数函数 signals: void connectClient(const int , const QString & ,const quint16 );//发送新用户连接信息 void readData(const int,const QString &, quint16, const QByteArray &);//发送获得用户发过来的数据 void sockDisConnect(int ,QString ,quint16);//断开连接的用户信息 void sentData(const QByteArray &,const int);//向scoket发送消息 void sentDisConnect(int i); //断开特定连接,并释放资源,-1为断开所有。
void UpdateText(QString); void UpdateMaximum(int); void UpdatePercent(int); void createNewTask(TcpSocket*); public slots: void clear(); //断开所有连接,线程计数器请0 protected slots: void sockDisConnectSlot(int handle,const QString & ip, quint16 prot, QThread *th);//断开连接的用户信息
protected: void incomingConnection(qintptr socketDescriptor);//覆盖已获取多线程 private: QHash<int,TcpSocket *> * tcpClient;//管理连接的map int maxConnections;
};
#endif // TCPSERVER_H |
【】TcpSocket 在线程使用的socket,接受文件
#include "tcpsocket.h" #include <QtConcurrent/QtConcurrent> #include <QHostAddress> #include <QDebug> extern QString _saveDir; const quint16 PORT = 3333; const qint64 LOADBYTES = 4 * 1024; // 4 kilo-byte const int DATA_STREAM_VERSION = QDataStream::Qt_4_8; TcpSocket::TcpSocket(qintptr socketDescriptor, QObject *parent) : //构造函数在主线程执行,lambda在子线程 QTcpSocket(parent),socketID(socketDescriptor) { this->setSocketDescriptor(socketDescriptor); connect(this,&TcpSocket::readyRead,this,&TcpSocket::readData); connect(this,SIGNAL(error(QAbstractSocket::SocketError)), this, SLOT(receive_error(QAbstractSocket::SocketError)));
emit UpdateText("开始接收数据!") ; receive_gotBytes=0; }
TcpSocket::~TcpSocket() { }
/*--- 出错处理 ---*/ void TcpSocket::receive_error(QAbstractSocket::SocketError) { qDebug() << "*** Socket Error ***" << this->errorString(); this->close(); // 关cocket
receive_fileName.clear(); // 清空文件名 receive_fileBytes = receive_gotBytes = receive_nameSize = 0; emit UpdateText("接收数据失败!") ;
SafeDelete(receive_file); } void TcpSocket::readData() { QDataStream in(this); in.setVersion(DATA_STREAM_VERSION);
/* 首部未接收/未接收完 */ if(receive_gotBytes <= 2 * sizeof(qint64)) { if(!receive_nameSize) // 前两个长度字段未接收 { if(this->bytesAvailable() >= 2 * sizeof(qint64)) { in >> receive_fileBytes >> receive_nameSize; receive_gotBytes += 2 * sizeof(qint64); emit UpdateMaximum(receive_fileBytes); emit UpdatePercent(receive_gotBytes); } else // 数据不足,等下次 { qDebug()<<" 数据不足文件名长度,等下次 Errir 044"; return; } } else if(this->bytesAvailable() > receive_nameSize) { in >> receive_fileName; receive_gotBytes += receive_nameSize; emit UpdatePercent(receive_gotBytes); qDebug()<< "--- File Name: " << receive_fileName; } else // 数据不足文件名长度,等下次 { qDebug()<<" 数据不足文件名长度,等下次 Errir 046"; return; } }
/* 已读文件名、文件未打开 -> 尝试打开文件 */ if(!receive_fileName.isEmpty() && receive_file == Q_NULLPTR) { QString saveFilePath =_saveDir+(receive_fileName); qDebug()<<_saveDir<<"保存路径为 :"<< saveFilePath; receive_file = new QFile(saveFilePath); if(!receive_file->open(QFile::WriteOnly)) // 打开失败 { qDebug() << "*** File Open Failed ***" ; SafeDelete(receive_file); return; } emit UpdateText(QString("文件%1打开成功!").arg(saveFilePath)) ; } if(receive_file == Q_NULLPTR) // 文件未打开,不能进行后续操作 return;
if(receive_gotBytes < receive_fileBytes) // 文件未接收完 { receive_gotBytes += this->bytesAvailable(); emit UpdatePercent(receive_gotBytes); receive_file->write(this->readAll()); } if(receive_gotBytes == receive_fileBytes) // 文件接收完 { this->close(); // 关socket receive_file->close(); // 关文件 SafeDelete(receive_file); emit UpdatePercent(receive_gotBytes); emit UpdateText("文件接收完成!") ; } } |
【】ThreadHandle 为每一个接收文件的socket设置线程
#include "threadhandle.h" #include "eventdispatcher_libev/eventdispatcher_libev.h"
ThreadHandle::ThreadHandle() { initfist = false; }
ThreadHandle::~ThreadHandle() //停止所有线程,并释放资源 { QThread * tmp; for (auto it = threadSize.begin(); it != threadSize.end(); ++it) { tmp = it.key(); tmp->exit(); tmp->wait(3000); delete tmp; } }
ThreadHandle & ThreadHandle::getClass() { static ThreadHandle th; return th; }
QThread *ThreadHandle::getThread() { if (!initfist) { initThreadType(THREADSIZE,10); } if (type == THREADSIZE) return findThreadSize(); else return findHandleSize(); }
void ThreadHandle::removeThread(QThread * thread) { auto t = threadSize.find(thread); if (t != threadSize.end()) { t.value() --; if (type == HANDLESIZE && t.value() == 0 && threadSize.size() > 1) { threadSize.remove(thread); thread->exit(); thread->wait(3000); delete thread; } } }
void ThreadHandle::initThreadType(ThreadType type, unsigned int max) { if (!initfist) { this->type = type; this->size = max; if (this->size == 0) { if(type == THREADSIZE) this->size = 10; else this->size = 1000; }
if (type == THREADSIZE) initThreadSize(); else { QThread * tmp = new QThread; #ifndef Q_OS_WIN tmp->setEventDispatcher(new EventDispatcherLibEv()); #endif threadSize.insert(tmp,0); tmp->start(); } } initfist = true; }
void ThreadHandle::initThreadSize() //建立好线程并启动, { QThread * tmp; for (unsigned int i = 0; i < size;++i) { tmp = new QThread; #ifndef Q_OS_WIN tmp->setEventDispatcher(new EventDispatcherLibEv()); #endif threadSize.insert(tmp,0); tmp->start(); } }
QThread * ThreadHandle::findHandleSize() //查找到线程里的连接数小于最大值就返回查找到的,找不到就新建一个线程 { for (auto it = threadSize.begin();it != threadSize.end() ;++it) { if (it.value() < size) { it.value() ++; return it.key(); } } QThread * tmp = new QThread; #ifndef Q_OS_WIN tmp->setEventDispatcher(new EventDispatcherLibEv()); #endif threadSize.insert(tmp,1); tmp->start(); return tmp; }
QThread *ThreadHandle::findThreadSize() //遍历查找所有线程中连接数最小的那个,返回 { auto it = threadSize.begin(); auto ite = threadSize.begin(); for (++it ; it != threadSize.end(); ++it) { if (it.value() < ite.value()) { ite = it; } } ite.value() ++; return ite.key(); }
void ThreadHandle::clear()//仅仅清空计数,线程不释放 { for (auto it = threadSize.begin();it != threadSize.end() ;++it) { it.value() = 0; } } |