三个线程同步中一个纠结的问题

前几天一直没想明白,搜索我的记忆海,愣是没找到这种三个线程相交互的!

 

1、问题描述

目前有三个class

WriteDB他主要负责把网页的信息写到数据中(间隔5分钟),每次更新之后都会他会相应的把DataOperation中的变量isDataUpdate_设置为true;

Search他负责处理对用户执行查询信息的操作,当有用户执行Search,他会相应的把DataOperation中的变量isUserSearch_设置为true;

DataOperation这是一个中间的类,负责做类WriteDB和类Search的同步操作,会一直检查这两个变量isDataUpdate_和isUserSearch_,当他们都为false的时候,把最新在内存中的数据库写入到对应的文件中,以便能够查询最新的信息,如果其中一个为true,他都等待。

 

2、迷糊的那段时间

也就是前几天,一直在想,怎么能实现上面的东西!一直被WriteDB这个class和isUserSearch_(bool类型)所困扰!

还写出来类似第一版的DataOperation代码:

class DataOperation
{
public:

	DataOperation()
	{
		running_flag = true;
		isDataUpdate_ = false;
		isUserSearch_ = false;
	}

	void Stop()
	{
		running_flag = false;
		{
			boost::recursive_mutex::scoped_lock lock(mtx);
			cond.notify_one();
		}
	}

	void EmitSignalThread()
	{
		while (running_flag)
		{
			if (isSearchEnd_)
			{
				break;
			}

			{
				boost::recursive_mutex::scoped_lock lock(mtx);

				if (IsReplaceDB())
				{
					// 需要替换数据库
					for (int i = 0; i <= 999; i++)
					{
						cout << "正在替换数据库" << i << endl;
					}
				}

				cond.notify_all();
			}
			boost::this_thread::sleep(boost::posix_time::millisec(1000));
		}
	}

	// 当后台有数据要更新,并且目前没有用户进行search的时候replace
	// 返回true表明正在进行,false表示还需要等待
	bool IsReplaceDB()
	{	
		return (isDataUpdate_ && isUserSearch_);
	}

	// 这个会阻塞
	void WaitReplaceDB()
	{
		boost::recursive_mutex::scoped_lock lock(mtx);
		cout << "如果程序没有动静,那么表明此刻正在更新数据库...等待几秒即可!" << endl;
		cond.wait(mtx);
		boost::this_thread::sleep(boost::posix_time::millisec(100));
	}

	void SetIsDataUpdate(bool isDataUpdate = true)
	{
		isDataUpdate_ = isDataUpdate;
	}

	void SetIsUserSearch(bool isUserSearch = true)
	{
		isUserSearch_ = isUserSearch;
	}

	void SetIsSearchEnd(bool isSearchEnd = false)
	{
		isSearchEnd_ = isSearchEnd;
	}

	bool GetIsSearchEnd()
	{
		return isSearchEnd_;
	}

private:
	bool running_flag;		// 数据助手是否需要存活
	bool isDataUpdate_;		// 数据是否有更新
	bool isUserSearch_;		// 当前是否有用户正在查询
	bool isSearchEnd_;		// 程序是否已经退出
	boost::recursive_mutex mtx;
	boost::condition_variable_any cond;
};

3、困扰的原因

我一直在想,以前的生产者和消费者是怎么做的?有没有想过还带有条件变量的?

 

4、能不能更简单

比如我就想过,只在DataOperation类中检测这两个bool变量,如果满足则更新DB,不满足Sleep;最终发现也有漏洞,毕竟这么做了Search中也要对应检测DB当前是否在更新,更新就Sleep----举一个极端的例子,但是是很可能发生:当前DataOperation发现没有用户在执行查询,那么DataOperation去设置标记(表面将要去更新数据库),当在设置标记的时候,用户更好输入了查询,发现数据库没在更新,那么就会造成同时出现DataOperation正在更新数据库、用户正在查询的情况!那么此时系统就会崩溃了。

 

5、所以说

条件变量、互斥量是确实有存在的意义的!

 

6、再来看一下整体的设计

从作用来看WriteDB只是自己做自己的,并不和多线程的同步有关,他仅仅是每次都设置isDataUpdate_为true就行;剩下的同步仅在于Search和Dataoperation中,bool isUserSearch_;bool isSearchEnd_;都没有实际的存在价值;只需要把isUserSearch_设置为条件变量就行。这样前面的两个bool变量的功能就被取代了(同时提一下,原来的条件变量,在这里才真正的用起来)

 

7、最终的测试代码

#include "iostream"
#include "boost/thread.hpp"
#include "boost/bind.hpp"
using namespace std;
#include "windows.h"
#include "fstream"


namespace MyNamespace
{
	ofstream cout("log.txt");
}

//#define WRITEFILE_NAMESPACE

#ifdef WRITEFILE_NAMESPACE
ofstream out("log.txt");
#endif // 0


class DataOperation
{
public:

	DataOperation()
	{
		running_flag_  = true;
		isDataUpdate_  = false;
	}

	void Stop()
	{
		running_flag_ = false;
	}

	void EmitSignalThread()
	{
		while (running_flag_)
		{
			boost::recursive_mutex::scoped_lock lock(mtx);
			bool flag = GetDataUpdate();

			// 如果有更新 那些就替换数据库
			if (GetDataUpdate())
			{
				// 需要替换数据库
				for (int i = 0; i <= 999; i++)
				{
					if (i == 1)
					{
#ifdef WRITEFILE_NAMESPACE
						out << "正在替换数据库" << i << endl;
#else
						cout << "正在替换数据库" << i << endl;
#endif // WRITEFILE_NAMESPACE
						
					}
					if (i == 999)
					{
#ifdef WRITEFILE_NAMESPACE
						out << "替换完毕" << i << endl;
#else
						cout << "替换完毕" << i << endl;
#endif // WRITEFILE_NAMESPACE
					}
				}
				isDataUpdate_ = false;
			}
			isUserSearch_.notify_all();
			// 等待10秒钟,所以结束的时候可能会在这里阻塞
	//		boost::this_thread::sleep(boost::posix_time::millisec(10000));
		}
	}

	// 这个会阻塞
	void WaitReplaceDB()
	{
		boost::recursive_mutex::scoped_lock lock(mtx);
#ifdef WRITEFILE_NAMESPACE
		out << "如果程序没有动静,那么表明此刻正在更新数据库...等待几秒即可!" << endl;
#else
		cout << "如果程序没有动静,那么表明此刻正在更新数据库...等待几秒即可!" << endl;
#endif // WRITEFILE_NAMESPACE
		isUserSearch_.wait(mtx);
		boost::this_thread::sleep(boost::posix_time::millisec(100));
	}

	void SetDataUpdate(bool isDataUpdate = true)
	{
		isDataUpdate_ = isDataUpdate;
	}

	bool GetDataUpdate()
	{
		return isDataUpdate_;
	}

	bool GetRunningFlag()
	{
		return running_flag_;
	}

private:
	bool running_flag_;							 // 数据助手是否需要存活
	bool isDataUpdate_;		
	boost::recursive_mutex mtx;
	boost::condition_variable_any isUserSearch_; // 当前是否有用户正在查询
};


class WriteDB
{
public:
	void Start(DataOperation& dataoperation)
	{
		while (true)
		{
			// 在这里 这个线程进行数据库的更新可能需要大概好几分钟
			for (int i = 0; i <= 9999; i++)
			{
				if (i == 1)
				{
#ifdef WRITEFILE_NAMESPACE
					out << "WriteDB wait..." << i << endl;
#else
					cout << "WriteDB wait..." << i << endl;
#endif // WRITEFILE_NAMESPACE
				}
				if (i == 9999)
				{
#ifdef WRITEFILE_NAMESPACE
					out << "WriteDB wait..." << i << endl;
#else
					cout << "WriteDB wait..." << i << endl;
#endif // WRITEFILE_NAMESPACE
				}
			}

			dataoperation.SetDataUpdate();

			// 查询一下,如果程序退出了,那么线程也退出
			if (!dataoperation.GetRunningFlag())
			{
#ifdef WRITEFILE_NAMESPACE
				out << "bye-bye" << endl;
#else
				cout << "bye-bye" << endl;
#endif // WRITEFILE_NAMESPACE
				break;
			}
			Sleep(10000);
		}
	}
};


class Search
{
public:
	void Start(DataOperation& dataoperation)
	{
		string user_enter;
		while (true)
		{
			getline(cin, user_enter);
			if (user_enter == "q")
			{
				dataoperation.Stop();
				break;
			}

#ifdef WRITEFILE_NAMESPACE
			out << "当前用户输入的是:" << user_enter << endl;
#else
			cout << "当前用户输入的是:" << user_enter << endl;
#endif // WRITEFILE_NAMESPACE

			// 如果正在更新数据库,那么wait
			dataoperation.WaitReplaceDB();
		//	out << "如果程序没有动静,那么表明此刻正在更新数据库...等待几秒即可!" << endl;

			// 如果没在更新数据库,直接进行查询
			for (int i = 0; i <= 99999; i++)
			{
			//	cout << "正在为您查询数据..." << i << endl;
				//////////////////////////////////////////////////////////////////////////
				if (i == 1)
				{
#ifdef WRITEFILE_NAMESPACE
					out << "正在为您查询数据..." << i << endl;
#else
					cout << "正在为您查询数据..." << i << endl;
#endif // WRITEFILE_NAMESPACE
				}
				if (i == 99999)
				{
#ifdef WRITEFILE_NAMESPACE
					out << "查询完毕..." << i << endl;
#else
					cout << "查询完毕..." << i << endl;
#endif // WRITEFILE_NAMESPACE
				}
				//////////////////////////////////////////////////////////////////////////
			}

			// 向等待的线程发出信号,让他去更新数据库
			Sleep(100);
		}
	}
};


int main()
{
	boost::thread_group grp;
	WriteDB write_db;
	Search search;
	DataOperation data_operation;

 	grp.create_thread(boost::bind(&WriteDB::Start, &write_db, boost::ref(data_operation)));
	grp.create_thread(boost::bind(&Search::Start, &search, boost::ref(data_operation)));
 	grp.create_thread(boost::bind(&DataOperation::EmitSignalThread, boost::ref(data_operation)));

	grp.join_all();

	return 0;
}



原文地址:https://www.cnblogs.com/jiangu66/p/3239023.html