【并发】6、借助FQueue 实现多线程生产消费队列

1.这里先要说一下为什么会想到fqueue,因为这个是一个轻量级的消息队列框架,并且速度很快,用起来很方便,就是这样

当然后期考虑使用redis,这里先上一个fqueue的版本,后面有时间我再吧他改成redis版本吧,感觉可能redis版本可能更适合

package queue.fqueue.vo;

/**
 * @ProjectName: cutter-point
 * @Package: queue.fqueue.vo
 * @ClassName: EventVo
 * @Author: xiaof
 * @Description: ${description}
 * @Date: 2019/6/11 10:30
 * @Version: 1.0
 */
public interface EventVo {

    public void doOperater();

}
package queue.fqueue.vo;

import java.io.Serializable;

/**
 * @ProjectName: cutter-point
 * @Package: queue.fqueue.vo
 * @ClassName: TempVo
 * @Author: xiaof
 * @Description: ${description}
 * @Date: 2019/6/11 10:18
 * @Version: 1.0
 */
public class TempVo implements Serializable, EventVo {

    private String name;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "TempVo{name='" + name + "'}";
    }

    @Override
    public void doOperater() {
        System.out.println(name + " : say hello fqueue!");
    }
}
package queue.fqueue;

import net.apexes.fqueue.FQueue;
import queue.fqueue.vo.TempVo;

import java.io.*;

/**
 * @ProjectName: cutter-point
 * @Package: queue.fqueue
 * @ClassName: FqueueProducter
 * @Author: xiaof
 * @Description: ${description}
 * @Date: 2019/6/11 10:36
 * @Version: 1.0
 */
public class FqueueProducter implements Runnable {

    private FQueue fQueue;

    public FqueueProducter(FQueue fQueue) {
        this.fQueue = fQueue;
    }

    @Override
    public void run() {

        while(true) {
            try {
                Thread.sleep(2000);

                TempVo tempVo = new TempVo();
                tempVo.setName(Thread.currentThread().getName() + ",time is:" + System.currentTimeMillis());
                //序列化为字节
                OutputStream arrayOutputStream = new ByteArrayOutputStream();
                ObjectOutputStream objectOutputStream = new ObjectOutputStream(arrayOutputStream);
                objectOutputStream.writeObject(tempVo);
                arrayOutputStream.flush();

                fQueue.add(((ByteArrayOutputStream) arrayOutputStream).toByteArray());

            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }
}
package queue.fqueue;

import net.apexes.fqueue.FQueue;
import queue.fqueue.vo.EventVo;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;

/**
 * @ProjectName: cutter-point
 * @Package: queue.fqueue
 * @ClassName: FqueueProducter
 * @Author: xiaof
 * @Description: ${description}
 * @Date: 2019/6/11 9:40
 * @Version: 1.0
 */
public class FqueueConsume implements Runnable {

    private FQueue fQueue;

    public FqueueConsume(FQueue fQueue) {
        this.fQueue = fQueue;
    }

    @Override
    public void run() {

        while(true) {

            byte bytes[] = fQueue.poll();

            //反序列化对象
            if(bytes == null || bytes.length <= 0) {
                Thread.yield();
                continue;
            }

            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
            try {
                ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream);
                EventVo eventVo = (EventVo) objectInputStream.readObject();

                eventVo.doOperater();

            } catch (IOException e) {
                e.printStackTrace();
            } catch (ClassNotFoundException e) {
                e.printStackTrace();
            }
        }

    }
}

测试代码:

@Test
    public void test3() throws IOException, FileFormatException, InterruptedException {
        FQueue queue1 = new FQueue("db1");


        //读写取数据
        for(int i = 0; i < 5; ++i) {
            System.out.println("输出测试" + i);
            FqueueProducter producter = new FqueueProducter(queue1);

            Thread t = new Thread(producter);
            t.start();
        }

        //读写取数据
        for(int i = 0; i < 2; ++i) {
            System.out.println("输出测试" + i);
            FqueueConsume fqueueConsume = new FqueueConsume(queue1);

            Thread t = new Thread(fqueueConsume);
            t.setDaemon(true);
            t.start();
        }

        while(true) {
            Thread.sleep(1000);
        }

    }

效果展示:

原文地址:https://www.cnblogs.com/cutter-point/p/11002395.html