多线程网络文件传输2

多线程网络文件传输

效果

【】sender 文件发送

可以选择多个文件,每个文件路径动态生成一个进度条和一个runner,并且将runner加入manager。

 

void AndySender::on_pbt_selectFile_clicked()

{

QFileDialog fileDialog(this);

 

//设置窗口的标题

fileDialog.setWindowTitle("选择要发送的文件");

fileDialog.setNameFilter("所有文件(*.*)"); //设置一个过滤器

//这个标志用来设置选择的类型,比如默认是单个文件。QFileDialog::ExistingFiles 多个文件,还可以用来选择文件夹QFileDialog::Directory。QFileDialog::ExistingFile 单个文件。注意这个ExistingFile,单词后面多了一个s 表示选择多个文件。要看清楚了。

fileDialog.setFileMode(QFileDialog::ExistingFiles);

//弹出对话框

if (fileDialog.exec() == QDialog::Accepted)

{

 

TaskManager *manager =TaskManager::getInstance();

//strPathList 返回值是一个list,如果是单个文件选择的话,只要取出第一个来就行了。

QStringList strPathList = fileDialog.selectedFiles();

foreach(QString path , strPathList)

{

qDebug()<<"添加文件上传任务:"<<path;

AndyProgressBar * progressBar =new AndyProgressBar();

TaskRunner * runner =new TaskRunner;

QString hostIp =ui->lineEdit->text();

if(hostIp=="")

{

hostIp ="127.0.0.1";

}

runner->setHostIp(hostIp);

runner->setTask(path,TaskRunner::uploadFile);

connect(runner,SIGNAL(taskFinish(void*)),runner,SLOT(deleteLater()));

connect(runner,SIGNAL(UpdatePercent(int)), progressBar,SLOT (on_UpdatePercent(int)));

connect(runner,SIGNAL(UpdateMaximum(int)), progressBar,SLOT (on_UpdateMaximum(int)));

connect(runner,SIGNAL(UpdateText(QString)), progressBar,SLOT(on_UpdateText(QString)));

 

progressBar->SetMaxRange(100);

progressBar->SetFileName(path.split("/").last());

ui->vl_content->addWidget(progressBar);

manager->addTask(runner);

}

}

}

 

【】TaskRunner 建立tcpsocket

建立tcpsocket

 

TaskRunner::TaskRunner()

{

send = new QTcpSocket(this);

fileBytes = sentBytes = restBytes = 0;

loadBytes = LOADBYTES;

/* 连接已建立 -> 开始发数据 */

connect(send, SIGNAL(connected()),

this, SLOT(start_transfer()));

/* 数据已发出 -> 继续发 */

connect(send, SIGNAL(bytesWritten(qint64)),

this, SLOT(continue_transfer(qint64)));

/* socket出错 -> 错误处理 */

connect(send, SIGNAL(error(QAbstractSocket::SocketError)),

this, SLOT(show_error(QAbstractSocket::SocketError)));

}

 

发送文件首部

/*
						首部
								=
										总大小
												+
														文件名长度
																+
																		文件名
																				*/
																			

out << qint64(0) << qint64(0) << sfName;

/* 总大小加上首部的大小 */

fileBytes += buf.size();

 

TaskRunner::~TaskRunner()

{

SafeDelete(send);

}

 

void TaskRunner::setHostIp(QString ip)

{

_ip =ip;

}

void TaskRunner::on_DoRequest()

{

if(type ==uploadFile)

{

// 建立连接 tcp

send->connectToHost(QHostAddress(_ip), PORT);

if(send->waitForConnected(1000))

{

qDebug()<<"连接服务器成功!";

 

sentBytes = 0;

}

 

}

}

void TaskRunner::setTask(QString path,TaskType _type)

{

fileName =path;

type =_type;

emit UpdateText("任务等待中");

}

/*--- 开始传送 ---*/

void TaskRunner::start_transfer()

{

file = new QFile(fileName);

if(!file->open(QFile::ReadOnly))

{

emit UpdatePercent(0);

emit UpdateText("文件打开失败");

qDebug() << "*** start_transfer(): File-Open-Error";

return;

}

 

fileBytes = file->size();

emit UpdatePercent(0);

emit UpdateText("连接已经建立");

 

QByteArray buf;

QDataStream out(&buf, QIODevice::WriteOnly);

out.setVersion(DATA_STREAM_VERSION);

 

/* 无路径文件名 */

QString sfName = fileName.right(fileName.size() -

fileName.lastIndexOf('/') - 1);

/* 首部 = 总大小 + 文件名长度 + 文件名 */

out << qint64(0) << qint64(0) << sfName;

/* 总大小加上首部的大小 */

fileBytes += buf.size();

emit UpdateMaximum(fileBytes);

/* 重写首部的前两个长度字段 */

out.device()->seek(0);

out << fileBytes << (qint64(buf.size()) - 2 * sizeof(qint64));

/* 发送首部,计算剩余大小 */

restBytes = fileBytes - send->write(buf);

}

 

/*--- 继续传输 ---*/

void TaskRunner::continue_transfer(qint64 sentSize)

{

sentBytes += sentSize;

emit UpdatePercent(sentBytes);

/* 还有数据要发 */

if(restBytes > 0)

{

/* 从文件读数据 */

QByteArray buf = file->read(qMin(loadBytes, restBytes));

/* 发送 */

restBytes -= send->write(buf);

}

else

file->close();

/* 全部发送完 */

if(sentBytes == fileBytes)

{

send->close(); // 关socket

fileName.clear(); // 清空文件名

emit UpdateText("发送完成");

emit taskFinish(nullptr);

}

}

 

/*--- 出错处理 ---*/

void TaskRunner::show_error(QAbstractSocket::SocketError)

{

qDebug() << "*** Socket Error";

send->close();

UpdateText("任务失败,稍后重试!");

UpdatePercent(0);

fileName.clear();

emit taskFinish(nullptr);

}

 

 

 

【】TaskManager线程管理(单例)

生成线程列表

#pragma once

#include "preheader.h"

 

class TaskRunner;

class TaskManager : public QObject

{

Q_OBJECT

 

public:

static TaskManager* getInstance();

 

 

void addTask(TaskRunner*);

private:

TaskManager();

~TaskManager();

static TaskManager *m_instance;

 

QList<QThread*> _threadList;

QList<TaskRunner*> _waittingTasks; //等待中的任务

QMap<QThread*,TaskRunner*> _runningTasks; //进行中的任务

 

 

private slots:

void onUpdateTaskList();

void on_CircleTaskList();

};

 

    生成三十个线程(使用匿名函数),每隔200毫秒检查有没有新的task加入waitlist,新的task加入线程列表中未使用的线程,task结束时,task结束时将runningTasks更新,

#include "TaskManager.h"

#include "TaskRunner.h"

 

const int MaxThreadCount =30;

TaskManager* TaskManager::m_instance = NULL;

TaskManager::TaskManager()

{

auto initThreadList=[&]()

{

for(int i=0; i<MaxThreadCount; i++)

{

QThread *thread =new QThread(this);

thread->start();

_threadList.push_back(thread);

}

};

initThreadList();

QTimer *timer=new QTimer();

connect(timer,SIGNAL(timeout()), this,SLOT(on_CircleTaskList()));

timer->start(200);

 

}

 

TaskManager::~TaskManager()

{

 

}

 

TaskManager* TaskManager::getInstance()

{

if(m_instance == NULL)

m_instance = new TaskManager();

return m_instance;

}

void TaskManager::addTask(TaskRunner* runner)

{

_waittingTasks.push_front(runner);

}

void TaskManager::on_CircleTaskList()

{

auto circleTaskList =[&](QList<TaskRunner*>& _waitTasks, QList<QThread*> &threadList , QMap<QThread*,TaskRunner*> &runningTasks )

{

while(_waitTasks.count()>0 && runningTasks.count() <threadList.count())

{

 

TaskRunner* runner = _waitTasks.takeFirst();

if(runner)

{

for(int i=0; i<threadList.count(); i++)

{

QThread* thread =threadList[i];

if(runningTasks.contains(thread))

continue;

else

{

runner->moveToThread(thread);

connect(runner, SIGNAL(taskFinish(void*)), this,SLOT(onUpdateTaskList()));

runningTasks.insert(thread,runner);

QTimer::singleShot(0,runner, &TaskRunner::on_DoRequest);

break;

}

}

 

}

}

} ;

circleTaskList(_waittingTasks,_threadList,_runningTasks);

}

void TaskManager::onUpdateTaskList()

{

TaskRunner * runner =(TaskRunner*)sender();

 

if(runner)

{

for(auto itr =_runningTasks.begin(); itr !=_runningTasks.end(); itr++)

{

if(itr.value() ==runner)

{

_runningTasks.erase(itr++);

break;

}

}

runner->deleteLater();

}

}

 

 

【】Receiver

 

【】TcpServer accept incoming TCP connections

为每一个连接生成一个TcpSocket 和一个线程,然后moveToThread,加入列表 QHash<int,TcpSocket *> * tcpClient;,在线程里面使用这个socket,必须使用incomingConnection,tcp断开后通知线程管理类,

 

Note: If you want to handle an incoming connection as a new QTcpSocket object in another thread you have to pass the socketDescriptor to the other thread and create the QTcpSocket object there and use its setSocketDescriptor() method.

#ifndef TCPSERVER_H

#define TCPSERVER_H

 

#include <QTcpServer>

#include <QHash>

#include "tcpsocket.h"

 

 

//继承QTCPSERVER以实现多线程TCPscoket的服务器。

//如果socket的信息处理直接处理的话,很多新建的信号和槽是用不到的

class TcpServer : public QTcpServer

{

Q_OBJECT

public:

explicit TcpServer(QObject *parent = 0,int numConnections = 10000);

~TcpServer();

 

void setMaxPendingConnections(int numConnections);//重写设置最大连接数函数

signals:

void connectClient(const int , const QString & ,const quint16 );//发送新用户连接信息

void readData(const int,const QString &, quint16, const QByteArray &);//发送获得用户发过来的数据

void sockDisConnect(int ,QString ,quint16);//断开连接的用户信息

void sentData(const QByteArray &,const int);//向scoket发送消息

void sentDisConnect(int i); //断开特定连接,并释放资源,-1为断开所有。

 

void UpdateText(QString);

void UpdateMaximum(int);

void UpdatePercent(int);

void createNewTask(TcpSocket*);

public slots:

void clear(); //断开所有连接,线程计数器请0

protected slots:

void sockDisConnectSlot(int handle,const QString & ip, quint16 prot, QThread *th);//断开连接的用户信息

 

protected:

void incomingConnection(qintptr socketDescriptor);//覆盖已获取多线程

private:

QHash<int,TcpSocket *> * tcpClient;//管理连接的map

int maxConnections;

 

};

 

#endif // TCPSERVER_H

 

 

【】TcpSocket 在线程使用的socket,接受文件

#include "tcpsocket.h"

#include <QtConcurrent/QtConcurrent>

#include <QHostAddress>

#include <QDebug>

extern QString _saveDir;

const quint16 PORT = 3333;

const qint64 LOADBYTES = 4 * 1024; // 4 kilo-byte

const int DATA_STREAM_VERSION = QDataStream::Qt_4_8;

TcpSocket::TcpSocket(qintptr socketDescriptor, QObject *parent) : //构造函数在主线程执行,lambda在子线程

QTcpSocket(parent),socketID(socketDescriptor)

{

this->setSocketDescriptor(socketDescriptor);

connect(this,&TcpSocket::readyRead,this,&TcpSocket::readData);

connect(this,SIGNAL(error(QAbstractSocket::SocketError)),

this, SLOT(receive_error(QAbstractSocket::SocketError)));

 

emit UpdateText("开始接收数据!") ;

receive_gotBytes=0;

}

 

TcpSocket::~TcpSocket()

{

}

 

/*--- 出错处理 ---*/

void TcpSocket::receive_error(QAbstractSocket::SocketError)

{

qDebug() << "*** Socket Error ***" << this->errorString();

this->close(); // 关cocket

 

receive_fileName.clear(); // 清空文件名

receive_fileBytes = receive_gotBytes = receive_nameSize = 0;

emit UpdateText("接收数据失败!") ;

 

SafeDelete(receive_file);

}

void TcpSocket::readData()

{

QDataStream in(this);

in.setVersion(DATA_STREAM_VERSION);

 

 

/* 首部未接收/未接收完 */

if(receive_gotBytes <= 2 * sizeof(qint64))

{

if(!receive_nameSize) // 前两个长度字段未接收

{

if(this->bytesAvailable() >= 2 * sizeof(qint64))

{

in >> receive_fileBytes >> receive_nameSize;

receive_gotBytes += 2 * sizeof(qint64);

emit UpdateMaximum(receive_fileBytes);

emit UpdatePercent(receive_gotBytes);

}

else // 数据不足,等下次

{

qDebug()<<" 数据不足文件名长度,等下次 Errir 044";

return;

}

}

else if(this->bytesAvailable() > receive_nameSize)

{

in >> receive_fileName;

receive_gotBytes += receive_nameSize;

emit UpdatePercent(receive_gotBytes);

qDebug()<< "--- File Name: "

<< receive_fileName;

}

else // 数据不足文件名长度,等下次

{

qDebug()<<" 数据不足文件名长度,等下次 Errir 046";

return;

}

}

 

/* 已读文件名、文件未打开 -> 尝试打开文件 */

if(!receive_fileName.isEmpty() && receive_file == Q_NULLPTR)

{

QString saveFilePath =_saveDir+(receive_fileName);

qDebug()<<_saveDir<<"保存路径为 :"<< saveFilePath;

receive_file = new QFile(saveFilePath);

if(!receive_file->open(QFile::WriteOnly)) // 打开失败

{

qDebug() << "*** File Open Failed ***" ;

SafeDelete(receive_file);

return;

}

emit UpdateText(QString("文件%1打开成功!").arg(saveFilePath)) ;

}

if(receive_file == Q_NULLPTR) // 文件未打开,不能进行后续操作

return;

 

if(receive_gotBytes < receive_fileBytes) // 文件未接收完

{

receive_gotBytes += this->bytesAvailable();

emit UpdatePercent(receive_gotBytes);

receive_file->write(this->readAll());

}

if(receive_gotBytes == receive_fileBytes) // 文件接收完

{

this->close(); // 关socket

receive_file->close(); // 关文件

SafeDelete(receive_file);

emit UpdatePercent(receive_gotBytes);

emit UpdateText("文件接收完成!") ;

}

}

 

【】ThreadHandle 为每一个接收文件的socket设置线程

 

#include "threadhandle.h"

#include "eventdispatcher_libev/eventdispatcher_libev.h"

 

ThreadHandle::ThreadHandle()

{

initfist = false;

}

 

ThreadHandle::~ThreadHandle() //停止所有线程,并释放资源

{

QThread * tmp;

for (auto it = threadSize.begin(); it != threadSize.end(); ++it)

{

tmp = it.key();

tmp->exit();

tmp->wait(3000);

delete tmp;

}

}

 

ThreadHandle & ThreadHandle::getClass()

{

static ThreadHandle th;

return th;

}

 

QThread *ThreadHandle::getThread()

{

if (!initfist)

{

initThreadType(THREADSIZE,10);

}

if (type == THREADSIZE)

return findThreadSize();

else

return findHandleSize();

}

 

void ThreadHandle::removeThread(QThread * thread)

{

auto t = threadSize.find(thread);

if (t != threadSize.end())

{

t.value() --;

if (type == HANDLESIZE && t.value() == 0 && threadSize.size() > 1)

{

threadSize.remove(thread);

thread->exit();

thread->wait(3000);

delete thread;

}

}

}

 

void ThreadHandle::initThreadType(ThreadType type, unsigned int max)

{

if (!initfist)

{

this->type = type;

this->size = max;

if (this->size == 0)

{

if(type == THREADSIZE)

this->size = 10;

else

this->size = 1000;

}

 

if (type == THREADSIZE)

initThreadSize();

else

{

QThread * tmp = new QThread;

#ifndef Q_OS_WIN

tmp->setEventDispatcher(new EventDispatcherLibEv());

#endif

threadSize.insert(tmp,0);

tmp->start();

}

}

initfist = true;

}

 

void ThreadHandle::initThreadSize() //建立好线程并启动,

{

QThread * tmp;

for (unsigned int i = 0; i < size;++i)

{

tmp = new QThread;

#ifndef Q_OS_WIN

tmp->setEventDispatcher(new EventDispatcherLibEv());

#endif

threadSize.insert(tmp,0);

tmp->start();

}

}

 

QThread * ThreadHandle::findHandleSize() //查找到线程里的连接数小于最大值就返回查找到的,找不到就新建一个线程

{

for (auto it = threadSize.begin();it != threadSize.end() ;++it)

{

if (it.value() < size)

{

it.value() ++;

return it.key();

}

}

QThread * tmp = new QThread;

#ifndef Q_OS_WIN

tmp->setEventDispatcher(new EventDispatcherLibEv());

#endif

threadSize.insert(tmp,1);

tmp->start();

return tmp;

}

 

QThread *ThreadHandle::findThreadSize() //遍历查找所有线程中连接数最小的那个,返回

{

auto it = threadSize.begin();

auto ite = threadSize.begin();

for (++it ; it != threadSize.end(); ++it)

{

if (it.value() < ite.value())

{

ite = it;

}

}

ite.value() ++;

return ite.key();

}

 

void ThreadHandle::clear()//仅仅清空计数,线程不释放

{

for (auto it = threadSize.begin();it != threadSize.end() ;++it)

{

it.value() = 0;

}

}

 

原文地址:https://www.cnblogs.com/tangyuanjie/p/13992149.html