Two-Phase Termination Design Pattern

分两个阶段结束线程

第一阶段:结束线程
第二阶段:清理释放资源

简单版

package com.dwz.concurrency2.chapter17;

import java.util.Random;

public class CounterIncrement extends Thread {
    
    private volatile boolean terminated = false;
    
    private int counter = 0;
    
    private Random random = new Random(System.currentTimeMillis());
    
    @Override
    public void run() {
        try {
            while (!terminated) {
                System.out.println(Thread.currentThread().getName() + " " + counter++);
                Thread.sleep(random.nextInt(1000));
            } 
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            this.clean();
        }
    }
    
    private void clean() {
        System.out.println("do some clean work for the second phase, current counter is " + counter);
    }
    
    public void close() {
        this.terminated = true;
        this.interrupt();
    }
}

测试

package com.dwz.concurrency2.chapter17;

public class CounterTest {
    public static void main(String[] args) throws InterruptedException {
        CounterIncrement counterIncrement = new CounterIncrement();
        counterIncrement.start();
        
        Thread.sleep(10_000L);
        counterIncrement.close();
    }
}

改进版

AppServer服务器

package com.dwz.concurrency2.chapter17;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class AppServer extends Thread {
    private final int port;
    
    private static final int DEFAULT_PORT = 12722;
    
    private volatile boolean start = true;
    
    private List<ClientHandler> clientHandlers = new ArrayList<>();
    
    private final ExecutorService executor = Executors.newFixedThreadPool(10);
    
    private ServerSocket server;
    
    public AppServer() {
        this(DEFAULT_PORT);
    }
    
    public AppServer(int port) {
        this.port = port;
    }
    
    @Override
    public void run() {
        try {
            this.server = new ServerSocket(port);
            while (start) {
                Socket client = server.accept();
                ClientHandler clientHandler = new ClientHandler(client);
                executor.submit(clientHandler);
                this.clientHandlers.add(clientHandler);
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            this.dispose();
        }
    }
    
    private void dispose() {
        this.clientHandlers.stream().forEach(ClientHandler::stop);
        this.executor.shutdown();
    }

    public void shotdown() throws IOException {
        this.start = false;
        this.interrupt();
        this.server.close();
    }
}

ClientHandler控制器

package com.dwz.concurrency2.chapter17;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.Socket;

public class ClientHandler implements Runnable {
    private final Socket socket;
    
    private volatile boolean running = true;

    public ClientHandler(Socket socket) {
        this.socket = socket;
    }
    
    @Override
    public void run() {
        //将需要关闭的资源放到try(){}的()中会主动帮我们释放资源,@since jdk1.7
        try (InputStream inputstream = socket.getInputStream();
             OutputStream outputstream = socket.getOutputStream();
             BufferedReader br = new BufferedReader(new InputStreamReader(inputstream));
             PrintWriter printWriter = new PrintWriter(outputstream)) {
            while (running) {
                String message = br.readLine();
                if(null == message) {
                    break;
                }
                System.out.println("Come from client ->" + message);
                printWriter.write("echo dang->" + message + "
");
                printWriter.flush();
            }
        } catch (IOException e) {
            this.running = false;
        } finally {
            this.stop();
        }
    }
    
    public void stop() {
        if(!running) {
            return;
        }
        
        this.running = false;
        try {
            this.socket.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

测试步骤:

1.启动AppServer 

2.在cmd中输入指令 telnet localhost port连上服务器发送消息

package com.dwz.concurrency2.chapter17;

import java.io.IOException;

public class AppServerClient {
    public static void main(String[] args) throws InterruptedException, IOException {
        AppServer appServer = new AppServer(13345);
        appServer.start();
        
        Thread.sleep(20_000L);
        appServer.shotdown();
    }
}
原文地址:https://www.cnblogs.com/zheaven/p/12167191.html