多线程 消费和生产模式

一。自行编写线程安全的List来作为作为消费者和生产者的数据类型。

从1.1和1.2可以看出明显差距.

首先object的锁把互斥量放在object上。而lock类的单独出现一个lock的变量,更灵活。

更大的区别是object 锁的wait,notifiyall是对象级别。而lock的 await,single 是条件等待。同一把锁可以分情况唤醒不同的线程,readwait.nofityall(),唤醒卡在读上的线程。     而object.nofityall,把这个锁上的全部线程都唤醒了。

1.1object 锁

public static class MySafeList
    {
        private List<Integer> mData=new ArrayList<>();
        private int num=0;
        public Integer insertID()
        {
            try
            {
                Thread.sleep(1000);
            }
            catch (Exception e)
            {
                LSLog.Log_Exception(e);
            }

            synchronized (this)
            {
                try
                {
                    while (mData.size() > 10)
                    {
                        this.wait();
                    }
                    mData.add(num);
                    num++;
                    this.notifyAll();
                    return num-1;
                }
                catch (Exception e)
                {
                    return null;
                }
            }
        }

        public Integer getID()
        {
            try
            {
                Thread.sleep(2000);
            } catch (Exception e)
            {
                LSLog.Log_Exception(e);
            }

            synchronized (this)
            {
                try
                {
                    while (mData.size()==0)
                    {
                        this.wait();
                    }

                    int res=mData.get(0);
                    mData.remove(0);
                    this.notifyAll();
                    return res;
                }
                catch (Exception e)
                {
                    return null;
                }
            }
        }
    }

 测试方法

 private void MultipleThread()
    {
        final MySafeList mySafeList=new MySafeList();
        Thread thread_produce1=new Thread(new Runnable()
        {
            @Override
            public void run()
            {
                while (true)
                {
                    Integer insered= mySafeList.insertID();
                    if(insered!=null)
                    {
                        LSLog.Log_INFO("insert:" + insered);
                    }
                }
            }
        });
        thread_produce1.start();

        Thread thread_produce2=new Thread(new Runnable()
        {
            @Override
            public void run()
            {
                while (true)
                {
                    Integer insered= mySafeList.insertID();
                    if(insered!=null)
                    {
                        LSLog.Log_INFO("insert:" + insered);
                    }
                }
            }
        });
        thread_produce2.start();

        Thread thread_consume1=new Thread(new Runnable()
        {
            @Override
            public void run()
            {
                while (true)
                {
                    Integer id = mySafeList.getID();
                    LSLog.Log_INFO("get:" + id);
                }
            }
        });
        thread_consume1.start();

        Thread thread_consume2=new Thread(new Runnable()
        {
            @Override
            public void run()
            {
                while (true)
                {
                    Integer id = mySafeList.getID();
                    LSLog.Log_INFO("get:" + id);
                }
            }
        });
        thread_consume2.start();
    }

1.2用reentantlock来锁定。用condition ,singel来处理2种极端情况。并把共享的数据放入到类ThreadsPractice2中。由类来管理。就和java自带的blockingqueue一样。自己管理线程安全。只提供对外方法 get 和 add

而外部调用者不需要考虑任何锁。只需要开线程读和写。

package com.linson.android.hiandroid2.JavaPractice;
import com.linson.LSLibrary.AndroidHelper.LSComponentsHelper;
import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

//线程安全的list
public class ThreadsPractice2
{
    private LinkedList<Integer> mBookID=new LinkedList<>();
    private ReentrantLock mReentrantLock=new ReentrantLock();
    private Condition full=mReentrantLock.newCondition();
    private Condition empty=mReentrantLock.newCondition();

    public Integer getBookID()
    {
        mReentrantLock.lock();
        try
        {
            while (mBookID.size()<=0)
            {
                empty.await();
            }
            int id = mBookID.removeFirst();
            full.signalAll();
            return id;

        } catch (Exception e)
        {
            LSComponentsHelper.LS_Log.Log_Exception(e);
            return 0;
        }
        finally
        {
            mReentrantLock.unlock();
        }
    }

    public void addBookID(int id)
    {
        mReentrantLock.lock();
        try
        {
            while (mBookID.size()>=100)
            {
                full.await();
            }
            mBookID.add(id);
            empty.signalAll();
        } catch (Exception e)
        {
            LSComponentsHelper.LS_Log.Log_Exception(e);
        }
        finally
        {
            mReentrantLock.unlock();
        }
    }
}

测试使用线程安全的list

private void productConsume()
    {
        ThreadsPractice2 threadsPractice2=new ThreadsPractice2();

        Thread w1=new Thread(new Writer(threadsPractice2, 1, 10000));
        Thread w2=new Thread(new Writer(threadsPractice2, 10001, 20000));
        w1.start();
        w2.start();


        Thread r1=new Thread(new Reader(threadsPractice2));
        Thread r2=new Thread(new Reader(threadsPractice2));
        r1.start();
        r2.start();
    }

    private class Writer implements Runnable
    {
        private ThreadsPractice2 mData;
        private int mstart,mend;
        public Writer(ThreadsPractice2 threadsPractice2,int satrt,int end)
        {
            mData=threadsPractice2;
            mstart=satrt;
            mend=end;
        }
        @Override
        public void run()
        {
            long startTime= System.currentTimeMillis();

            for (int i=mstart;i<=mend;i++)
            {
                mData.addBookID(i);
            }

            long endTime=System.currentTimeMillis();
            LSComponentsHelper.LS_Log.Log_INFO(Thread.currentThread().getName()+"use millseconds:"+(endTime-startTime)+". maxid:");
        }
    }

    private class Reader implements Runnable
    {
        private ThreadsPractice2 mData;
        public Reader(ThreadsPractice2 threadsPractice2)
        {
            mData=threadsPractice2;
        }
        @Override
        public void run()
        {
            while (true)
            {
                int bookid=mData.getBookID();
                if(bookid>0)
                {
                    LSComponentsHelper.LS_Log.Log_INFO("bookid:"+bookid);
                }
            }
        }
    }

消费,生产例子2

   //region Product and consume
    private static Stack<Integer> mData=new Stack<>();
    private static Integer mNums=0;
    private static ReentrantLock mlock=new ReentrantLock();
    private static Condition mEmpty=mlock.newCondition();
    private static Condition mFull=mlock.newCondition();


    private void produceAndConsume()
    {
        Thread p1=new Thread(new Producter(),"p1");
        Thread p2=new Thread(new Producter(),"p2");

        Thread c1=new Thread(new Consume(),"c1");

        p1.start();
        p2.start();
        c1.start();
    }


    private class  Producter implements Runnable
    {
        @Override
        public void run()
        {
            while (true)
            {
                try
                {
                    Thread.sleep(3000);//模拟一个耗时的操作。注意,必须放到锁外部,不然没意义。所以我们才需要多线程。
                } catch (Exception e)
                {
                    LSComponentsHelper.LS_Log.Log_Exception(e);
                }
                mlock.lock();
                try
                {
                    while (mData.size()>=15)
                    {
                        mFull.await();
                    }

                    mData.push(mNums);
                    LSComponentsHelper.LS_Log.Log_INFO(Thread.currentThread().getName()+". add "+mNums.toString()+".         size:"+mData.size());
                    mNums+=1;
                    mEmpty.signal();
                } catch (Exception e)
                {
                    LSComponentsHelper.LS_Log.Log_Exception(e);
                } finally
                {
                    mlock.unlock();
                }
            }
        }
    }

    private class Consume implements Runnable
    {
        @Override
        public void run()
        {
            while (true)
            {
                try
                {
                    Thread.sleep(6000);//模拟处理很慢,必须放入到锁外。<1.5秒。读快,集合数据处于处理完的空状态。 >1.5秒读慢,集合数据处于累加状态,最后保持满状态,处理太慢。 可以用 1000和6000来测试。
                } catch (Exception e)
                {
                    LSComponentsHelper.LS_Log.Log_Exception(e);
                }
                mlock.lock();
                try
                {
                    while (mData.size() <= 0)
                    {
                        mEmpty.await();
                    }
                    Integer getn=mData.pop();
                    LSComponentsHelper.LS_Log.Log_INFO(Thread.currentThread().getName()+"reader:"+getn.toString()+".size:"+mData.size());
                    mFull.signal();

                } catch (Exception e)
                {
                    LSComponentsHelper.LS_Log.Log_Exception(e);
                } finally
                {
                    mlock.unlock();
                }
            }
        }
    }
    //endregion

 2.使用系统提供的blockingQueue

queue的size是固定的,建立了后就不能修改。故意设置小一点。看了下实现是一个数组【】,所以会作为一个环形缓冲?

为了测试queue,设计了一个小方法,2个写线程,分别写10000个数据。 3个读线程吧数据读到自己的set中。那么只要3个set的总数等于写的数量。那么就可以确定,写线程没有重复和丢失任何数据。同时也可以证明,读线程,也没有丢失和重复。

呵呵,自带的当然不会错。结果: 7063+6633+6304=20000.应付20000个数字。基本是毫秒级。所以非常好。

不知道c#有没有线程安全的队列,反正c++是不自带。有blockingQueue和readWriterLock. 那基本上java碰到多线程,可以用这2个方案,可以解决绝大部分问题啊。

package com.linson.android.hiandroid2.JavaPractice;

import com.linson.LSLibrary.AndroidHelper.LSComponentsHelper;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockQueueTest
{
    private BlockingQueue<Integer> mBlockingQueue=new ArrayBlockingQueue<>(100);
    private Set<Integer> thread1Set=new HashSet<>();
    private Set<Integer> thread2Set=new HashSet<>();
    private Set<Integer> thread3Set=new HashSet<>();
    public void Start()
    {
        Thread w1=new Thread(new Writer(mBlockingQueue,1,10000));
        Thread w2=new Thread(new Writer(mBlockingQueue,10001,20000));

        Thread r1=new Thread(new Reader(mBlockingQueue),"thread1");
        Thread r2=new Thread(new Reader(mBlockingQueue),"thread2");
        Thread r3=new Thread(new Reader(mBlockingQueue),"thread3");

        w1.start();
        w2.start();
        r1.start();
        r2.start();
        r3.start();

        try
        {
            w1.join();
            w2.join();
            LSComponentsHelper.LS_Log.Log_INFO(thread1Set.size()+"."+thread2Set.size()+"."+thread3Set.size()+".");
            Thread.sleep(5000);
        } catch (Exception e)
        {
            LSComponentsHelper.LS_Log.Log_Exception(e);
        }

        LSComponentsHelper.LS_Log.Log_INFO(thread1Set.size()+"."+thread2Set.size()+"."+thread3Set.size()+".");
    }

    private class Writer implements Runnable
    {
        private BlockingQueue<Integer> mdata;
        private int mStart=0;
        private int mEnd=0;
        public Writer(BlockingQueue<Integer> blockingQueue,int start,int end)
        {
            mdata = blockingQueue;
            mStart=start;
            mEnd=end;
        }
        @Override
        public void run()
        {
            for(int i=mStart;i<=mEnd;i++)
            {
                try
                {
                    mdata.put(i);
                } catch (Exception e)
                {
                    LSComponentsHelper.LS_Log.Log_Exception(e);
                }
            }
        }
    }

    private class Reader implements Runnable
    {
        private BlockingQueue<Integer> mdata;
        public Reader(BlockingQueue<Integer> blockingQueue)
        {
            mdata = blockingQueue;
        }
        @Override
        public void run()
        {
            while (true)
            {
                try
                {
                    int res = mdata.take();
                    if(Thread.currentThread().getName()=="thread1")
                    {
                        thread1Set.add(res);
                    }
                    if(Thread.currentThread().getName()=="thread2")
                    {
                        thread2Set.add(res);
                    }
                    if(Thread.currentThread().getName()=="thread3")
                    {
                        thread3Set.add(res);
                    }
                } catch (Exception e)
                {
                    LSComponentsHelper.LS_Log.Log_Exception(e);
                }
            }
        }
    }
}

3.利用系统自带读写锁。来做一个线程安全的读写缓冲区。

所以可以看到。我们自己做的也是做出一个线程安全的list。系统自带的blockqueue,也是一个线程安全的队列。

而读写锁。我们一般也会根据具体情况。先做一个自己的线程安全的缓冲区。

所以多线程的最佳实践就是用类来 处理共享数据。保证线程安全。

package com.linson.android.hiandroid2.JavaPractice;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class SafeReadWriterCache<key,value>
{
    private Map<key,value> mMap=new HashMap<>();
    private ReentrantReadWriteLock mReentrantReadWriteLock=new ReentrantReadWriteLock();

    public value get(key k)
    {
        value res=null;
        mReentrantReadWriteLock.readLock().lock();
        try
        {
            res=mMap.get(k);
        }
        finally
        {
            mReentrantReadWriteLock.readLock().unlock();
        }
        return res;
    }

    public void set(key kk,value vv)
    {
        mReentrantReadWriteLock.writeLock().lock();
        try
        {
            mMap.put(kk, vv);
        }
        finally
        {
            mReentrantReadWriteLock.writeLock().unlock();
        }
    }

    public void clear()
    {
        mReentrantReadWriteLock.writeLock().lock();
        try
        {
            mMap.clear();
        }
        finally
        {
            mReentrantReadWriteLock.writeLock().unlock();
        }
    }
}

以下例子错误,有空再修正吧。先放到这里。

1.非常简单的,一线程写,一线程读。 读线程阻塞,直到写线程通知读线程。简单,重要是基本不会用错。

#include <stdio.h>
#include <string>
#include <iostream>
#include <memory>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <thread>
#include <unistd.h>

using namespace std;

class book
{
public:
    book():canread(false){}
    
    void Write(const vector<string>& _v)
    {
        unique_lock<mutex> w_mtx(book_mtx,defer_lock);//控制代码1
        w_mtx.lock();//控制代码2
        
        
        
        cout<<this_thread::get_id()<<"  :writer get lock, and start write"<<endl;
        
        cout<<"before write.vector size:"<<books.size()<<endl;
        
        
        for(size_t i=0;i<_v.size();i++)
        {
            books.push_back(_v[i]);
        }
        
        cout<<"after write.vector size:"<<books.size()<<endl;
    
        cout<<this_thread::get_id()<<"  :write end and ready set canread=true, and notify all"<<endl;
        
        canread=true;////控制代码3
        w_mtx.unlock();//控制代码4
        book_cv.notify_all();//控制代码5
    }
    
    void Read()
    {
        unique_lock<mutex> r_mtx(book_mtx,defer_lock);//控制代码1
        r_mtx.lock();//控制代码2
        while(!canread)//控制代码3
        {
            book_cv.wait(r_mtx);//控制代码4
        }
        cout<<this_thread::get_id()<<"  :reader get lock.and set value='' , canread=false "<<endl;
        
        cout<<"before read.vector size:"<<books.size()<<endl;
        vector<string> empty;
        empty.swap(books);
        cout<<"after read.vector size:"<<books.size()<<endl;
    
        
        canread=false;//控制代码5
        r_mtx.unlock();//控制代码6
    }
    
    
private:
    vector<string> books;
    mutex book_mtx;
    condition_variable book_cv;
    bool canread;
};


//写,设置为15秒一次.
void wrtie2book(book& _v)
{
    for(;;)
    {
        vector<string> newbooks;
        newbooks.push_back("aaa");
        newbooks.push_back("bbb");
        newbooks.push_back("ccc");
        _v.Write(newbooks);
        sleep(15);
    }
}

//读是只要可以读就一直读.不能读就阻塞在县城.等待生产者放入数据.
void read2book(book& _v)
{
    for(;;)
    {
        _v.Read();
    }
}


int main()
{
    book mybooks;
    

    thread read1(read2book,std::ref(mybooks));
    read1.detach();

    thread wrtie2(wrtie2book,std::ref(mybooks));
    wrtie2.detach();

    string cmd;
    cin>>cmd;
    return 0;
}

2)如果有多线程读,很多场合下是,每个线程处理一条。

代码也是基本一魔一样。只要读线程也设置一下canread 。同样简单,不出错。

#include <stdio.h>
#include <string>
#include <iostream>
#include <memory>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <thread>
#include <unistd.h>

using namespace std;


class book
{
public:
    book():canread(false){}
    
    void Write(const vector<string>& _v)
    {
        unique_lock<mutex> w_mtx(book_mtx,defer_lock);//控制代码1
        w_mtx.lock();//控制代码2
        
        
        
        cout<<this_thread::get_id()<<"  :writer get lock, and start write"<<endl;
        
        cout<<"before write.vector size:"<<books.size()<<endl;
        
        
        for(size_t i=0;i<_v.size();i++)
        {
            books.push_back(_v[i]);
        }
        
        cout<<"after write.vector size:"<<books.size()<<endl;
    
        cout<<this_thread::get_id()<<"  :write end and ready set canread=true, and notify all"<<endl;
        
        
        if(books.size()>0)//控制代码5
        {
            canread=true;//控制代码6
        }
        else
        {
            canread=false;//控制代码7
        }
        w_mtx.unlock();//控制代码4
        book_cv.notify_all();//控制代码5
    }
    
    void Read()
    {
        unique_lock<mutex> r_mtx(book_mtx,defer_lock);//控制代码1
        r_mtx.lock();//控制代码2
        while(!canread)//控制代码3
        {
            book_cv.wait(r_mtx);//控制代码4
        }
        
        
        cout<<this_thread::get_id()<<"  :reader get lock.and get one item  "<<endl;
        
        cout<<"before read.vector size:"<<books.size()<<endl;
        if(books.size()>0)
        {
            books.erase(books.begin());
        }
        
        
        cout<<"after read.vector size:"<<books.size()<<endl;
        cout<<this_thread::get_id()<<"  :reader check size ,and set canread true or false"<<endl;
        
        
        if(books.size()>0)//控制代码5
        {
            canread=true;//控制代码6
        }
        else
        {
            canread=false;//控制代码7
        }
        r_mtx.unlock();//控制代码8
    }
    
    
private:
    vector<string> books;
    mutex book_mtx;
    condition_variable book_cv;
    bool canread;
};


//写,设置为15秒一次.
void wrtie2book(book& _v)
{
    for(;;)
    {
        vector<string> newbooks;
        newbooks.push_back("aaa");
        newbooks.push_back("bbb");
        newbooks.push_back("ccc");
        _v.Write(newbooks);
        sleep(15);
    }
}

//读是只要可以读就一直读.不能读就阻塞在县城.等待生产者放入数据.
void read2book(book& _v)
{
    for(;;)
    {
        _v.Read();
    }
}


int main()
{
    book mybooks;
    

    thread read1(read2book,std::ref(mybooks));
    read1.detach();
    
    thread read2(read2book,std::ref(mybooks));
    read2.detach();
    
    thread read3(read2book,std::ref(mybooks));
    read3.detach();


    thread wrtie2(wrtie2book,std::ref(mybooks));
    wrtie2.detach();

    string cmd;
    cin>>cmd;
    return 0;
}

 

 

3) 读线程如果不可读会一直阻塞在锁上。

有时候需要读线程过段时间,跳出去,执行读数据函数外面的代码。

比如 执行关闭服务器方法。读线程就没办法关闭了。可以设置一个超时,让读线程超时后,就执行下。当然必须判断是超时引发的,还是有可读数据。

 

unique_lock<mutex> reclck(recmtx, defer_lock);
  reclck.lock();
  while(recvBuffEnable==false)
  {
      recv_cv.wait_for(reclck,std::chrono::seconds(6));
      if(recvBuffEnable==false)
      {
        return std::vector<FDMsg>();
      }
  }

这样6秒钟。醒一次。如果是没有数据可读。那么返回去,检查外部的关闭服务变量是否变化。

 

 

 

c# 的话,有太多的类和语法糖。

所以还是使用最基本的monitor和mutex 来实现锁定和同步。

掌握基本的关键字的用法,可以组合成很多模式,避免学习各种语法糖,分散精力和浪费时间。

需要注意的就是c#和c++的不同。 monitor.wait ,必须在 enter 和exit之间。这个和c++是不同的。开始不小心,按照C++的方式写,弹出了异常。

 

如:c++ ,是先释放锁,后通知wait的线程。

w_mtx.unlock();//控制代码4
book_cv.notify_all();//控制代码5

而c#,是先通知wait 的线程。后释放锁,
如:
Monitor.Pulse(mtx);
Monitor.Exit(mtx);
 

 

例如一个生产和消费者模式的例子。

class Program
    {
        private static Queue<int> data_int = new Queue<int>();

        private static Mutex mtx = new Mutex();

        private static int i = 0;

        private static void write()
        {
            while (true)
            {
                try
                {
                    Monitor.Enter(mtx);
                    data_int.Enqueue(i);
                    Console.WriteLine("write:" + i);
                    i++;
                }
                finally
                {
                    Monitor.Pulse(mtx);
                    Monitor.Exit(mtx);
                }
                Thread.Sleep(3000);//模拟外部数据,1秒获得一次
            }
        }

        private static void read()
        {
            while (true)
            {
                string thread_name = Thread.CurrentThread.Name;
                try
                {
                    Monitor.Enter(mtx);
                    bool hasData = data_int.Count > 0;
                    if (!hasData)
                    {
                        Monitor.Wait(mtx, -1, false);
                    }//wait之后.并未返回,而是继续当前下面语句
                    if (data_int.Count > 0)
                    {
                        int i = data_int.Dequeue();
                        Console.WriteLine("read" + i);
                    }
                }
                finally
                {
                    Monitor.Exit(mtx);
                }
            }
        }

        static void Main(string[] args)
        {
            Thread read1 = new Thread(read);
            read1.Name = "read1";
            read1.Start();

            Thread read2 = new Thread(read);
            read2.Name = "read2";
            read2.Start();

            Thread write1 = new Thread(write);
            write1.Name = "write1";
            write1.Start();

            //Thread write2 = new Thread(write);
            //write2.Start();

        }
    }

 

这个例子中,monitor和mutex 就类似于c++的  unique_lock 和mutex.

Assembly code
1
2
3
         |- 拥有锁的线程
lockObj->      |- 就绪队列(ready queue)
         |- 等待队列(wait queue)
原文地址:https://www.cnblogs.com/lsfv/p/6658672.html