Java Thread part 3

wait()和notify()的使用

package threadtest3;

public class Car {
    private boolean waxOn = false;
    
    /**
     * wait()和notify()方法必须在锁内,使用wait()时会释放锁
     * notify()和notifyAll()只会唤醒释放他获得的锁的wait()方法
     * notify()随机唤醒同一个锁的其中一个wait()方法
     * notifyAll()唤醒同一个锁的所有wait()方法
     */
    public synchronized void waxed() {
        waxOn = true;
        notifyAll();
    }
    
    public synchronized void buffed() {
        waxOn = false;
        notifyAll();
    }
    
    public synchronized void waitForWaxing()
            throws InterruptedException {
        while (waxOn == false)
            wait();
    }
    
    public synchronized void waitForBuffing()
            throws InterruptedException {
        while (waxOn == true)
            wait();
    }
}

package threadtest3;

import java.util.concurrent.TimeUnit;

public class WaxOn extends Thread {
    private Car car;
    
    public WaxOn(Car car) {
        this.car = car;
    }
    
    @Override
    public void run() {
        try {
            while (!Thread.currentThread().isInterrupted()) {
                System.out.println("Wax on!");
                TimeUnit.MILLISECONDS.sleep(200);
                car.waxed();
                car.waitForBuffing();
            }
        } catch (InterruptedException e) {
            System.out.println("Exitng via interrupt");
        }
        System.out.println("Ending wax on task");
    }
}

package threadtest3;

import java.util.concurrent.TimeUnit;

public class WaxOff extends Thread {
    private Car car;
    
    public WaxOff(Car car) {
        this.car = car;
    }

    @Override
    public void run() {
        try {
            while (!Thread.currentThread().isInterrupted()) {
                car.waitForWaxing();
                TimeUnit.MILLISECONDS.sleep(200);
                System.out.println("Wax off!");
                car.buffed();
            }
        } catch (InterruptedException e) {
            System.out.println("Exiting via interrupt");
        }
        System.out.println("Ending wax off task");
    }
}

package threadtest3;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class WaxOMatic {
    public static void main(String[] args)
            throws InterruptedException {
        ExecutorService exec = Executors.newCachedThreadPool();
        Car c = new Car();
        exec.execute(new WaxOn(c));
        exec.execute(new WaxOff(c));
        TimeUnit.SECONDS.sleep(5);
        exec.shutdownNow();
    }
}

Wax on!
Wax off!
Wax on!
Wax off!
Wax on!
Wax off!
Wax on!
Wax off!
Wax on!
Wax off!
Wax on!
Wax off!
Wax on!
Wax off!
Wax on!
Wax off!
Wax on!
Wax off!
Wax on!
Wax off!
Wax on!
Wax off!
Wax on!
Wax off!
Wax on!
Exiting via interrupt
Exitng via interrupt
Ending wax on task
Ending wax off task

生产者与消费者模型

package threadtest3;

public class Meal {
    private final int orderNum;
    public Meal(int num) {
        orderNum = num;
    }
    
    @Override
    public String toString() {
        return "Meal " + orderNum;
    }
}

package threadtest3;

/**
 * 服务员,负责端餐
 * @author alexis
 *
 */
public class WaitPerson extends Thread {
    private Restaurant res;
    
    public WaitPerson(Restaurant res) {
        this.res = res;
    }
    
    @Override
    public void run() {
        try {
            while (!Thread.currentThread().isInterrupted()) {
                synchronized(this) {
                    while (res.meal == null) {
                        wait();
                    }
                }
                System.out.println("Waitperson got " + res.meal);
                synchronized(res.chef) {
                    res.meal = null;
                    res.chef.notifyAll();
                }
            }
        } catch (InterruptedException e) {
            System.out.println("Exiting via interrupt");
        }
    }
}

package threadtest3;

import java.util.concurrent.TimeUnit;

/**
 * 厨师,负责做餐
 * @author alexis
 *
 */
public class Chef extends Thread {
    private Restaurant res;
    private int count = 0;
    public Chef(Restaurant res) {
        this.res = res;
    }
    
    @Override
    public void run() {
        try {
            while (!Thread.currentThread().isInterrupted()) {
                synchronized (this) {
                    while (res.meal != null) {
                        wait();
                    }
                }
                if (++count == 10) {
                    System.out.println("Out of food");
                    res.exec.shutdownNow();
                }
                System.out.println("Chef make a meal");
                synchronized (res.waitPerson) {
                    res.meal = new Meal(count);
                    res.waitPerson.notifyAll();
                }
                // 注意当执行 shutdownNow() 方法的时候
                // run()方法并没有直接立即退出
                // 而是尝试回到 isInterrupted() 方法判断后退出
                // 此时执行到了 sleep() 方法,在这里抛出了
                // InterruptedException() 后退出了
                // 所以我们可以看到shutdownNow()以后还输出了
                // 一句 Chef make a meal
                TimeUnit.MILLISECONDS.sleep(200);
            }
        } catch (InterruptedException e) {
            System.out.println("Exiting via interrupt");
        }
    }
}

package threadtest3;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Restaurant {
    Meal meal;
    ExecutorService exec = Executors.newCachedThreadPool();
    WaitPerson waitPerson = new WaitPerson(this);
    Chef chef = new Chef(this);
    
    public Restaurant() {
        exec.execute(waitPerson);
        exec.execute(chef);
    }
    
    public static void main(String[] args) {
        new Restaurant();
    }
}

Chef make a meal
Waitperson got Meal 1
Chef make a meal
Waitperson got Meal 2
Chef make a meal
Waitperson got Meal 3
Chef make a meal
Waitperson got Meal 4
Chef make a meal
Waitperson got Meal 5
Chef make a meal
Waitperson got Meal 6
Chef make a meal
Waitperson got Meal 7
Chef make a meal
Waitperson got Meal 8
Chef make a meal
Waitperson got Meal 9
Out of food
Chef make a meal
Exiting via interrupt
Exiting via interrupt

使用重入锁替代synchronized和wait() notify()

package threadtest4;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 使用显式重入锁ReentrantLock代替synchronized
 * 使用Condition代替Object的wait()和notify()
 * @author alexis
 *
 */
public class Car {
    private boolean waxOn = false;
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();
    
    public void waxed() {
        lock.lock();
        try {
            waxOn = true;
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }
    
    public void buffed() {
        lock.lock();
        try {
            waxOn = false;
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }
    
    public void waitForWaxing() throws InterruptedException {
        lock.lock();
        try {
            while (waxOn == false) {
                condition.await();
            }
        } finally {
            lock.unlock();
        }
    }
    
    public void waitForBuffing() throws InterruptedException {
        lock.lock();
        try {
            while (waxOn == true) {
                condition.await();
            }
        } finally {
            lock.unlock();
        }
    }
}

package threadtest4;

import java.util.concurrent.TimeUnit;

public class Waxing extends Thread {
    Car car;

    public Waxing(Car car) {
        this.car = car;
    }

    @Override
    public void run() {
        try {
            while (!Thread.currentThread().isInterrupted()) {
                System.out.println("Wax on!");
                TimeUnit.MILLISECONDS.sleep(100);
                car.waxed();
                car.waitForBuffing();
            }
        } catch (InterruptedException e) {
            System.out.println("Exiting via interrupt");
        }
        System.out.println("Ending wax on task");
    }
}

package threadtest4;

import java.util.concurrent.TimeUnit;

public class Buffing extends Thread {
    Car car;
    public Buffing(Car car) {
        this.car = car;
    }
    
    @Override
    public void run() {
        try {
            while (!Thread.currentThread().isInterrupted()) {
                car.waitForWaxing();
                System.out.println("Wax off!");
                TimeUnit.MILLISECONDS.sleep(200);    
                car.buffed();
            }
        } catch(InterruptedException e) {
            System.out.println("Exiting via interrupt");
        }
        System.out.println("Ending wax off task");
    }
}

package threadtest4;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class WaxTask {
    public static void main(String[] args)
            throws InterruptedException {
        Car c = new Car();
        ExecutorService exec = Executors.newCachedThreadPool();
        exec.execute(new Waxing(c));
        exec.execute(new Buffing(c));
        TimeUnit.SECONDS.sleep(3);
        exec.shutdownNow();
    }
}

Wax on!
Wax off!
Wax on!
Wax off!
Wax on!
Wax off!
Wax on!
Wax off!
Wax on!
Wax off!
Wax on!
Wax off!
Wax on!
Wax off!
Wax on!
Wax off!
Wax on!
Wax off!
Wax on!
Wax off!
Exiting via interrupt
Ending wax off task
Exiting via interrupt
Ending wax on task

使用同步数据结构替代锁和wait() notify()等

package threadtest5;

public class Toast {
    private static int id = 0;
    public enum Status { DRY, BUTTER };
    private Status status = Status.DRY;
    
    public Toast() {
        ++id;
    }
    
    @Override
    public String toString() {
        return "Toast " + id;
    }

    public void butter() {
        status = Status.BUTTER;
    }
    
    public Status getStatus() {
        return status;
    }
}

package threadtest5;

import java.util.concurrent.TimeUnit;

public class Drying extends Thread {
    // 使用同步队列替代同步方法
    // 并且不再需要wait()和notify()
    // 一切都由同步队列实现
    ToastQueue dryQueue;

    public Drying(ToastQueue dryQueue) {
        this.dryQueue = dryQueue;
    }

    @Override
    public void run() {
        try {
            while (!Thread.currentThread().isInterrupted()) {
                Toast t = new Toast();
                System.out.println("Drying " + t);
                dryQueue.put(t);
                TimeUnit.MILLISECONDS.sleep(200);
            }
        } catch (InterruptedException e) {
            System.out
                    .println(Thread.currentThread() + "Exiting via interrupt");
        }
    }
}

package threadtest5;

import java.util.concurrent.TimeUnit;

public class Buttering extends Thread {
    private ToastQueue dryQueue;
    private ToastQueue butterQueue;

    public Buttering(ToastQueue dryQueue, ToastQueue butterQueue) {
        this.dryQueue = dryQueue;
        this.butterQueue = butterQueue;
    }

    @Override
    public void run() {
        try {
            while (!Thread.currentThread().isInterrupted()) {
                Toast t = dryQueue.take();
                t.butter();
                System.out.println("Buttering " + t);
                butterQueue.put(t);
                TimeUnit.MILLISECONDS.sleep(200);
            }
        } catch (InterruptedException e) {
            System.out
                    .println(Thread.currentThread() + "Exiting via interrupt");
        }
    }
}

package threadtest5;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class Eating extends Thread {
    private ToastQueue butterQueue;

    public Eating(ToastQueue butterQueue) {
        this.butterQueue = butterQueue;
    }

    @Override
    public void run() {
        try {
            while (!Thread.currentThread().isInterrupted()) {
                Toast t = butterQueue.take();
                if (t.getStatus() != Toast.Status.BUTTER) {
                    System.out.println("Error " + t);
                    System.exit(1);
                }
                System.out.println("Eating " + t);
                TimeUnit.MILLISECONDS.sleep(200);
            }
        } catch (InterruptedException e) {
            System.out
                    .println(Thread.currentThread() + "Exiting via interrupt");
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ToastQueue dryQueue = new ToastQueue();
        ToastQueue butterQueue = new ToastQueue();
        ExecutorService exec = Executors.newCachedThreadPool();
        exec.execute(new Drying(dryQueue));
        exec.execute(new Buttering(dryQueue, butterQueue));
        exec.execute(new Eating(butterQueue));
        TimeUnit.SECONDS.sleep(3);
        exec.shutdownNow();
    }
}

Drying Toast 1
Buttering Toast 1
Eating Toast 1
Drying Toast 2
Buttering Toast 2
Eating Toast 2
Drying Toast 3
Buttering Toast 3
Eating Toast 3
Drying Toast 4
Buttering Toast 4
Eating Toast 4
Drying Toast 5
Buttering Toast 5
Eating Toast 5
Drying Toast 6
Buttering Toast 6
Eating Toast 6
Drying Toast 7
Buttering Toast 7
Eating Toast 7
Drying Toast 8
Buttering Toast 8
Eating Toast 8
Drying Toast 9
Buttering Toast 9
Eating Toast 9
Drying Toast 10
Buttering Toast 10
Eating Toast 10
Drying Toast 11
Buttering Toast 11
Eating Toast 11
Drying Toast 12
Buttering Toast 12
Eating Toast 12
Drying Toast 13
Buttering Toast 13
Eating Toast 13
Drying Toast 14
Buttering Toast 14
Eating Toast 14
Drying Toast 15
Buttering Toast 15
Eating Toast 15
Thread[pool-1-thread-3,5,main]Exiting via interrupt
Thread[pool-1-thread-2,5,main]Exiting via interrupt
Thread[pool-1-thread-1,5,main]Exiting via interrupt

管道的使用

package threadtest6;

import java.io.IOException;
import java.io.PipedWriter;
import java.util.concurrent.TimeUnit;

public class Sender extends Thread {
    private PipedWriter out = new PipedWriter();
    
    public PipedWriter getOut() {
        return out;
    }

    @Override
    public void run() {
        try {
            while (!Thread.currentThread().isInterrupted()) {
                for (char c = 'A'; c < 'z'; c++) {
                    out.write(c);
                    TimeUnit.MILLISECONDS.sleep(100);
                }
            }
        } catch (IOException e) {
            System.out.println("Exiting via IOException");
        } catch (InterruptedException e) {
            System.out.println("Exiting via interrupt");
        }
    }
}

package threadtest6;

import java.io.IOException;
import java.io.PipedReader;
import java.io.PipedWriter;
import java.util.concurrent.TimeUnit;

public class Reciever extends Thread {
    private PipedReader in;
    
    /**
     * PipedReader通过PipedWriter构造
     */
    public Reciever(PipedWriter out)
            throws IOException {
        in = new PipedReader(out);
    }

    @Override
    public void run() {
        try {
            while (!Thread.currentThread().isInterrupted()) {
                System.out.println((char) in.read());
                TimeUnit.MILLISECONDS.sleep(100);
            }
        } catch (IOException e) {
            System.out.println("Exiting via IOException");
        } catch (InterruptedException e) {
            System.out.println("Exiting via interrupt");
        }
    }
}

package threadtest6;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class Test {
    public static void main(String[] args)
            throws IOException, InterruptedException {
        ExecutorService exec = Executors.newCachedThreadPool();
        Sender sender = new Sender();
        exec.execute(sender);
        exec.execute(new Reciever(sender.getOut()));
        TimeUnit.SECONDS.sleep(5);
        exec.shutdownNow();
    }
}

A
B
C
D
E
F
G
H
I
J
K
L
M
N
O
P
Q
R
S
T
U
V
W
X
Y
Z
[
\
]
^
_
`
a
b
c
d
e
f
g
h
i
j
k
l
m
n
o
p
q
r
Exiting via interrupt
Exiting via interrupt

原文地址:https://www.cnblogs.com/zemliu/p/2950916.html