socket服务端和客户端互发和心跳检测实例

基础版

网上百度了一个简单的socket服务端和客户端监听代码 并且已经试验完成。直接上代码

服务端:

package com.whalecloud.uip.server.socket;

import java.io.DataOutputStream;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * @author lin.hongwen2@iwhalecloud.com
 * @date 2020/3/10 16:07
 */
public class SocketServer {

    private static final int PORT = 5209;


    public static void test() {
        ServerSocket server = null;
        Socket socket = null;
        DataOutputStream out = null;
        try {
            server = new ServerSocket(PORT);
            socket = server.accept();
            out = new DataOutputStream(socket.getOutputStream());
            while (true) {
                Thread.sleep(1000);
                out.writeUTF(getRandomStr());
                out.flush();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    private static String getRandomStr() {
        String str = "";
        int ID = (int) (Math.random() * 30);
        int x = (int) (Math.random() * 200);
        int y = (int) (Math.random() * 300);
        int z = (int) (Math.random() * 10);
        str = "ID:" + ID + "/x:" + x + "/y:" + y + "/z:" + z;
        return str;
    }


    public static void main(String[] args) {
        test();
    }
}

客户端:

package com.whalecloud.uip.server.socket;

import java.io.DataInputStream;
import java.io.InputStream;
import java.net.Socket;

/**
 * @author lin.hongwen2@iwhalecloud.com
 * @date 2020/3/10 16:10
 */
public class SocketClient {
    private static final String HOST = "127.0.0.1";
    private static final int PORT = 5209;

    private static void test() {
        Socket socket = null;
        DataInputStream dis = null;
        InputStream is = null;

        try {
            socket = new Socket(HOST, PORT);
            is = socket.getInputStream();
            dis = new DataInputStream(is);
            while (true) {
                System.out.println("receive_msg:" + dis.readUTF());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        test();
    }
}

启动两个项目就可以在控制台看到接收到的信息了

完整进阶版

客户端:

ClientMain---启动类
package com.whalecloud.uip.client.socket;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @author lin.hongwen2@iwhalecloud.com
 * @date 2020/3/11 16:57
 */
public class ClientMain {

    public static void main(String[] args) {
        SocketClientResponseInterface socketClientResponseInterface = new SocketClientResponseInterface() {
            @Override
            public void onSocketConnect() {
                SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                String strDate = df.format(new Date());
                System.out.println(strDate + "连接成功");
            }

            @Override
            public void onSocketReceive(Object socketResult, int code) {
                SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                String strDate = df.format(new Date());
                System.out.println(strDate+"拿到消息:"+socketResult);
            }

            @Override
            public void onSocketDisable(String msg, int code) {
                SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                String strDate = df.format(new Date());
                System.out.println(strDate+"断开连接");
            }
        };
        try {
            SocketClient socketClient = new SocketClient(socketClientResponseInterface);
            while (true) {
                socketClient.sendData("客户端11发送测试数据");
                Thread.sleep(20000);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}
View Code

SocketClientResponseInterface---返回信息接口类
package com.whalecloud.uip.client.socket;


/**
 * @author lin.hongwen2@iwhalecloud.com
 * @date 2020/3/10 16:42
 */
public interface SocketClientResponseInterface<T> {
    /**
     * 客户端连接回调
     */
    void onSocketConnect();

    /**
     * 客户端收到服务端消息回调
     *
     * @param socketResult
     * @param code
     */
    void onSocketReceive(T socketResult, int code);

    /**
     * 客户端关闭回调
     *
     * @param msg
     * @param code
     */
    void onSocketDisable(String msg, int code);
}
View Code

SocketCloseInterface---连接关闭信息接口类
package com.whalecloud.uip.client.socket;

/**
 * @author lin.hongwen2@iwhalecloud.com
 * @date 2020/3/10 17:24
 */
public interface SocketCloseInterface {

    /**
     * 客户端收到服务端消息回调
     */
    void onSocketShutdownInput();

    /**
     * 客户端关闭回调
     */
    void onSocketDisconnection();
}
View Code

SocketClient---客户端调用类
package com.whalecloud.uip.client.socket;

import org.apache.http.util.TextUtils;

/**
 * @author lin.hongwen2@iwhalecloud.com
 * @date 2020/3/10 17:05
 */
public class SocketClient {

    private static final String TAG = SocketClient.class.getSimpleName();

    private SocketClientThread socketClientThread;

    public SocketClient(SocketClientResponseInterface socketClientResponseInterface) {
        socketClientThread = new SocketClientThread("socketClientThread", socketClientResponseInterface);
        socketClientThread.start();
        //ThreadPoolUtil.getInstance().addExecuteTask(socketClientThread);
    }

    public <T> void sendData(T data) {
        //convert to string or serialize object
        String s = (String) data;
        if (TextUtils.isEmpty(s)) {
            System.out.print(TAG+"sendData: 消息不能为空");
            return;
        }
        if (socketClientThread != null) {
            socketClientThread.sendMsg(s);
        }
    }

    public void stopSocket() {
        //一定要在子线程内执行关闭socket等IO操作
        new Thread(() -> {
            socketClientThread.setReConnect(false);
            socketClientThread.stopThread();
        }).start();
    }
}
View Code
SocketClientThread---客户端线程类
package com.whalecloud.uip.client.socket;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;

import javax.net.SocketFactory;

/**
 * @author lin.hongwen2@iwhalecloud.com
 * @date 2020/3/10 16:10
 * 写数据采用死循环,没有数据时wait,有新消息时notify
 * 连接线程
 */
public class SocketClientThread extends Thread implements SocketCloseInterface{

    private static final String TAG = SocketClientThread.class.getSimpleName();

    private volatile String name;

    private boolean isLongConnection = true;
    private boolean isReConnect = true;
    private SocketSendThread mSocketSendThread;
    private SocketReceiveThread mSocketReceiveThread;
    private SocketHeartBeatThread mSocketHeartBeatThread;
    private Socket mSocket;

    private boolean isSocketAvailable;

    private SocketClientResponseInterface socketClientResponseInterface;

    public SocketClientThread(String name, SocketClientResponseInterface socketClientResponseInterface) {
        this.name = name;
        this.socketClientResponseInterface = socketClientResponseInterface;
    }

    @Override
    public void run() {
        final Thread currentThread = Thread.currentThread();
        final String oldName = currentThread.getName();
        currentThread.setName("Processing-" + name);
        try {
            initSocket();
            System.out.print(TAG + "run: SocketClientThread end");
        } finally {
            currentThread.setName(oldName);
        }
    }

    /**
     * 初始化socket客户端
     */
    private void initSocket() {
        try {
            mSocket = SocketFactory.getDefault().createSocket();
            SocketAddress socketAddress = new InetSocketAddress(SocketUtil.ADDRESS, SocketUtil.PORT);
            mSocket.connect(socketAddress, 10000);

            isSocketAvailable = true;

            //开启接收线程
            mSocketReceiveThread = new SocketReceiveThread("SocketReceiveThread",
                new BufferedReader(new InputStreamReader(mSocket.getInputStream(), "UTF-8")),
                socketClientResponseInterface, this);
            mSocketReceiveThread.start();

            //开启发送线程
            PrintWriter printWriter = new PrintWriter(mSocket.getOutputStream(), true);
            System.out.print(TAG+"initSocket: " + printWriter);
            mSocketSendThread = new SocketSendThread("SocketSendThread", printWriter);
            mSocketSendThread.setCloseSendTask(false);
            mSocketSendThread.start();

            //开启心跳线程
            if (isLongConnection) {
                mSocketHeartBeatThread = new SocketHeartBeatThread("SocketHeartBeatThread",
                    printWriter, mSocket, this);
                mSocketHeartBeatThread.start();
            }

            if (socketClientResponseInterface != null) {
                socketClientResponseInterface.onSocketConnect();
            }
        } catch (ConnectException e) {
            failedMessage("服务器连接异常,请检查网络", SocketUtil.FAILED);
            e.printStackTrace();
            stopThread();
        } catch (IOException e) {
            failedMessage("网络发生异常,请稍后重试", SocketUtil.FAILED);
            e.printStackTrace();
            stopThread();
        }
    }

    /**
     * 发送消息
     */
    public void sendMsg(String data) {
        if (mSocketSendThread != null) {
            mSocketSendThread.sendMsg(data);
        }
    }

    /**
     * 关闭socket客户端
     */
    public synchronized void stopThread() {
        //关闭接收线程
        closeReceiveTask();
        //唤醒发送线程并关闭
        wakeSendTask();
        //关闭心跳线程
        closeHeartBeatTask();
        //关闭socket
        closeSocket();
        //清除数据
        clearData();
        failedMessage("断开连接", SocketUtil.FAILED);
        if (isReConnect) {
            SocketUtil.toWait(this, 15000);
            initSocket();
            System.out.print(TAG + "stopThread: " + Thread.currentThread().getName());
        }
    }

    /**
     * 唤醒后关闭发送线程
     */
    private void wakeSendTask() {
        if (mSocketSendThread != null) {
            mSocketSendThread.wakeSendTask();
        }
    }

    /**
     * 关闭接收线程
     */
    private void closeReceiveTask() {
        if (mSocketReceiveThread != null) {
            mSocketReceiveThread.close();
            mSocketReceiveThread = null;
        }
    }

    /**
     * 关闭心跳线程
     */
    private void closeHeartBeatTask() {
        if (mSocketHeartBeatThread != null) {
            mSocketHeartBeatThread.close();
        }
    }

    /**
     * 关闭socket
     */
    private void closeSocket() {
        if (mSocket != null) {
            if (!mSocket.isClosed() && mSocket.isConnected()) {
                try {
                    mSocket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            isSocketAvailable = false;
            mSocket = null;
        }
    }

    /**
     * 清除数据
     */
    private void clearData() {
        if (mSocketSendThread != null) {
            mSocketSendThread.clearData();
        }
    }

    /**
     * 连接失败回调
     */
    private void failedMessage(String msg, int code) {
        if (socketClientResponseInterface != null) {
            socketClientResponseInterface.onSocketDisable(msg, code);
        }
    }

    @Override
    public void onSocketShutdownInput() {
        if (isSocketAvailable) {
            SocketUtil.inputStreamShutdown(mSocket);
        }
    }

    @Override
    public void onSocketDisconnection() {
        isSocketAvailable = false;
        stopThread();
    }

    /**
     * 设置是否断线重连
     */
    public void setReConnect(boolean reConnect) {
        isReConnect = reConnect;
    }

}
View Code
SocketHeartBeatThread---心跳监听类
package com.whalecloud.uip.client.socket;

import java.io.PrintWriter;
import java.net.Socket;

/**
 * @author lin.hongwen2@iwhalecloud.com
 * @date 2020/3/10 17:01
 * 心跳实现,频率5秒
 */
public class SocketHeartBeatThread extends Thread {

    private static final String TAG = SocketHeartBeatThread.class.getSimpleName();

    private volatile String name;

    private static final int REPEAT_TIME = 120000;
    private boolean isCancel = false;
    private final PrintWriter printWriter;
    private Socket mSocket;

    private SocketCloseInterface socketCloseInterface;

    public SocketHeartBeatThread(String name, PrintWriter printWriter,
                                 Socket mSocket, SocketCloseInterface socketCloseInterface) {
        this.name = name;
        this.printWriter = printWriter;
        this.mSocket = mSocket;
        this.socketCloseInterface = socketCloseInterface;
    }

    @Override
    public void run() {
        final Thread currentThread = Thread.currentThread();
        final String oldName = currentThread.getName();
        currentThread.setName("Processing-" + name);
        try {
            while (!isCancel) {
                if (!isConnected()) {
                    break;
                }

                if (printWriter != null) {
                    synchronized (printWriter) {
                        SocketUtil.write2Stream("ping", printWriter);
                    }
                }
                try {
                    Thread.sleep(REPEAT_TIME);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        } finally {
            //循环结束则退出输入流
            if (printWriter != null) {
                synchronized (printWriter) {
                    SocketUtil.closePrintWriter(printWriter);
                }
            }
            currentThread.setName(oldName);
            System.out.print(TAG+"SocketHeartBeatThread finish");
        }
    }

    /**
     * 判断本地socket连接状态
     */
    private boolean isConnected() {
        if (mSocket.isClosed() || !mSocket.isConnected() ||
            mSocket.isInputShutdown() || mSocket.isOutputShutdown()) {
            if (socketCloseInterface != null) {
                socketCloseInterface.onSocketDisconnection();
            }
            return false;
        }
        return true;
    }

    public void close() {
        isCancel = true;
        if (printWriter != null) {
            synchronized (printWriter) {
                SocketUtil.closePrintWriter(printWriter);
            }
        }
    }

}
View Code
SocketReceiveThread---消息接收类
package com.whalecloud.uip.client.socket;

import java.io.BufferedReader;

/**
 * @author lin.hongwen2@iwhalecloud.com
 * @date 2020/3/10 16:48
 * 数据接收线程
 */
public class SocketReceiveThread extends Thread {
    private static final String TAG = SocketReceiveThread.class.getSimpleName();

    private volatile String name;

    private volatile boolean isCancel = false;

    private BufferedReader bufferedReader;

    private SocketCloseInterface socketCloseInterface;

    private SocketClientResponseInterface socketClientResponseInterface;

    public SocketReceiveThread(String name, BufferedReader bufferedReader,
                               SocketClientResponseInterface socketClientResponseInterface,
                               SocketCloseInterface socketCloseInterface) {
        this.name = name;
        this.bufferedReader = bufferedReader;
        this.socketClientResponseInterface = socketClientResponseInterface;
        this.socketCloseInterface = socketCloseInterface;
    }

    @Override
    public void run() {
        final Thread currentThread = Thread.currentThread();
        final String oldName = currentThread.getName();
        currentThread.setName("Processing-" + name);
        try {
            while (!isCancel) {

                if (bufferedReader != null) {
                    String receiverData = SocketUtil.readFromStream(bufferedReader);
                    if (receiverData != null) {
                        successMessage(receiverData);
                    } else {
                        System.out.print(TAG + "run: receiverData==null");
                        break;
                    }
                }
            }
        } finally {
            //循环结束则退出输入流
            SocketUtil.closeBufferedReader(bufferedReader);
            currentThread.setName(oldName);
            System.out.print(TAG + "SocketReceiveThread finish");
        }
    }

    /**
     * 接收消息回调
     */
    private void successMessage(String data) {
        if (socketClientResponseInterface != null) {
            socketClientResponseInterface.onSocketReceive(data, SocketUtil.SUCCESS);
        }
    }

    public void close() {
        isCancel = true;
        this.interrupt();
        if (bufferedReader != null) {
            if (socketCloseInterface != null) {
                socketCloseInterface.onSocketShutdownInput();
            }
            SocketUtil.closeBufferedReader(bufferedReader);
            bufferedReader = null;
        }
    }
}
View Code
SocketSendThread---发送线程类
package com.whalecloud.uip.client.socket;

import java.io.PrintWriter;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
 * @author lin.hongwen2@iwhalecloud.com
 * @date 2020/3/10 16:59
 * 数据发送线程,当没有发送数据时让线程等待
 */
public class SocketSendThread extends Thread {
    private static final String TAG = SocketSendThread.class.getSimpleName();

    private volatile String name;

    private volatile boolean isCancel = false;
    private boolean closeSendTask;
    private final PrintWriter printWriter;

    protected volatile ConcurrentLinkedQueue<String> dataQueue = new ConcurrentLinkedQueue<>();

    public SocketSendThread(String name, PrintWriter printWriter) {
        this.name = name;
        this.printWriter = printWriter;
    }

    @Override
    public void run() {
        final Thread currentThread = Thread.currentThread();
        final String oldName = currentThread.getName();
        currentThread.setName("Processing-" + name);
        try {
            while (!isCancel) {

                String dataContent = dataQueue.poll();
                if (dataContent == null) {
                    //没有发送数据则等待
                    SocketUtil.toWait(dataQueue, 0);
                    if (closeSendTask) {
                        //notify()调用后,并不是马上就释放对象锁的,所以在此处中断发送线程
                        close();
                    }
                } else if (printWriter != null) {
                    synchronized (printWriter) {
                        SocketUtil.write2Stream(dataContent, printWriter);
                    }
                }
            }
        } finally {
            //循环结束则退出输出流
            if (printWriter != null) {
                synchronized (printWriter) {
                    SocketUtil.closePrintWriter(printWriter);
                }
            }
            currentThread.setName(oldName);
            System.out.print(TAG+"SocketSendThread finish");
        }
    }

    /**
     * 发送消息
     */
    public void sendMsg(String data) {
        dataQueue.add(data);
        //有新增待发送数据,则唤醒发送线程
        SocketUtil.toNotifyAll(dataQueue);
    }

    /**
     * 清除数据
     */
    public void clearData() {
        dataQueue.clear();
    }

    public void close() {
        isCancel = true;
        this.interrupt();
        if (printWriter != null) {
            //防止写数据时停止,写完再停
            synchronized (printWriter) {
                SocketUtil.closePrintWriter(printWriter);
            }
        }
    }

    public void wakeSendTask() {
        closeSendTask = true;
        SocketUtil.toNotifyAll(dataQueue);
    }

    public void setCloseSendTask(boolean closeSendTask) {
        this.closeSendTask = closeSendTask;
    }
}
View Code
SocketUtil---客户端socket工具类
package com.whalecloud.uip.client.socket;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.Socket;
import java.net.SocketException;
import java.util.Enumeration;

/**
 * @author lin.hongwen2@iwhalecloud.com
 * @date 2020/3/10 16:39
 */
public class SocketUtil {
    private static final String TAG = SocketUtil.class.getSimpleName();
    public static String ADDRESS = "127.0.0.1";
    public static int PORT = 10086;

    public static final int SUCCESS = 100;
    public static final int FAILED = -1;

    /**
     * 读数据
     *
     * @param bufferedReader
     */
    public static String readFromStream(BufferedReader bufferedReader) {
        try {
            String s;
            if ((s = bufferedReader.readLine()) != null) {
                return s;
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 写数据
     *
     * @param data
     * @param printWriter
     */
    public static void write2Stream(String data, PrintWriter printWriter) {
        if (data == null) {
            return;
        }
        if (printWriter != null) {
            printWriter.println(data);
        }
    }


    /**
     * 关闭输入流
     *
     * @param socket
     */
    public static void inputStreamShutdown(Socket socket) {
        try {
            if (!socket.isClosed() && !socket.isInputShutdown()) {
                socket.shutdownInput();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 关闭BufferedReader
     *
     * @param br
     */
    public static void closeBufferedReader(BufferedReader br) {
        try {
            if (br != null) {
                br.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 关闭PrintWriter
     *
     * @param pw
     */
    public static void closePrintWriter(PrintWriter pw) {
        if (pw != null) {
            pw.close();
        }
    }

    /**
     * 阻塞线程,millis为0则永久阻塞,知道调用notify()
     */
    public static void toWait(Object o, long millis) {
        synchronized (o) {
            try {
                o.wait(millis);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * notify()调用后,并不是马上就释放对象锁的,而是在相应的synchronized(){}语句块执行结束,自动释放锁后
     *
     * @param o
     */
    public static void toNotifyAll(Object o) {
        synchronized (o) {
            o.notifyAll();
        }
    }

}
View Code

服务端:

ServerMain--服务端测试类
package com.whalecloud.uip.server.socket;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
 * @author lin.hongwen2@iwhalecloud.com
 * @date 2020/3/10 20:05
 */
public class ServerMain {

    private static boolean isStart = true;
    private static ServerReceiveThread serverReceiveThread;

    public static void main(String[] args) {
        ServerSocket serverSocket = null;
        ExecutorService executorService = Executors.newCachedThreadPool();
        System.out.println("服务端 " + SocketUtil.getIP() + " 运行中...
");
        try {
            serverSocket = new ServerSocket(SocketUtil.PORT);
            while (isStart) {
                Socket socket = serverSocket.accept();

                //设定输入流读取阻塞超时时间(180秒收不到客户端消息判定断线)
                socket.setSoTimeout(180000);
                serverReceiveThread = new ServerReceiveThread(socket,
                    new SocketServerResponseInterface() {
                        @Override
                        public void clientOffline() {// 对方不在线
                            System.out.println("offline");
                        }

                        @Override
                        public void clientOnline(String clientIp) {
                            System.out.println(clientIp + " is online");
                            System.out.println("-----------------------------------------");
                        }
                    });

                if (socket.isConnected()) {
                    executorService.execute(serverReceiveThread);
                }
            }

            serverSocket.close();

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (serverSocket != null) {
                try {
                    isStart = false;
                    serverSocket.close();
                    serverReceiveThread.stop();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
View Code

SocketServerResponseInterface--回调接口类
package com.whalecloud.uip.server.socket;

/**
 * @author lin.hongwen2@iwhalecloud.com
 * @date 2020/3/10 19:34
 */
public interface SocketServerResponseInterface {
    /**
     * 客户端断线回调
     */
    void clientOffline();

    /**
     * 客户端上线回调
     *
     * @param clientIp
     */
    void clientOnline(String clientIp);
}
View Code


ServerReceiveThread--消息接送类
package com.whalecloud.uip.server.socket;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.Socket;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author lin.hongwen2@iwhalecloud.com
 * @date 2020/3/10 19:46
 */
public class ServerReceiveThread implements Runnable {
    private ReceiveThread receiveThread;
    private static ServerSendThread serverSendThread;
    private Socket socket;
    private SocketServerResponseInterface socketServerResponseInterface;

    private volatile ConcurrentLinkedQueue<String> dataQueue = new ConcurrentLinkedQueue<>();
    private static ConcurrentHashMap<String, Socket> onLineClient = new ConcurrentHashMap<>();

    private long lastReceiveTime = System.currentTimeMillis();

    private String userIP;

    public String getUserIP() {
        return userIP;
    }

    public ServerReceiveThread(Socket socket, SocketServerResponseInterface socketServerResponseInterface) {
        this.socket = socket;
        this.socketServerResponseInterface = socketServerResponseInterface;
        this.userIP = socket.getInetAddress().getHostAddress();
        onLineClient.put(userIP, socket);
        System.out.println("用户:" + userIP
            + " 加入了聊天室,当前在线人数:" + onLineClient.size());
    }

    @Override
    public void run() {
        try {
            //开启接收线程
            receiveThread = new ReceiveThread();
            receiveThread.bufferedReader = new BufferedReader(
                new InputStreamReader(socket.getInputStream(), "UTF-8"));
            receiveThread.start();

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 断开socket连接
     */
    public void stop() {
        try {
            System.out.println("stop");
            if (receiveThread != null) {
                receiveThread.isCancel = true;
                receiveThread.interrupt();
                if (receiveThread.bufferedReader != null) {
                    SocketUtil.inputStreamShutdown(socket);
                    System.out.println("before closeBufferedReader");
                    SocketUtil.closeBufferedReader(receiveThread.bufferedReader);
                    System.out.println("after closeBufferedReader");
                    receiveThread.bufferedReader = null;
                }
                receiveThread = null;
                System.out.println("stop receiveThread");
                //停止消息发送线程
                serverSendThread.stop();
            }
            onLineClient.remove(userIP);
            System.out.println("用户:" + userIP
                + " 退出,当前在线人数:" + onLineClient.size());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 获取已接连的客户端
     */
    public Socket getConnectdClient(String clientID) {
        return onLineClient.get(clientID);
    }

    /**
     * 打印已经连接的客户端
     */
    public static void printAllClient() {
        if (onLineClient == null) {
            return;
        }
        Iterator<String> inter = onLineClient.keySet().iterator();
        while (inter.hasNext()) {
            System.out.println("client:" + inter.next());
        }
    }

    /**
     * 阻塞线程,millis为0则永久阻塞,知道调用notify()
     */
    public void toWaitAll(Object o) {
        synchronized (o) {
            try {
                o.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * notify()调用后,并不是马上就释放对象锁的,而是在相应的synchronized(){}语句块执行结束,自动释放锁后
     */
    public void toNotifyAll(Object obj) {
        synchronized (obj) {
            obj.notifyAll();
        }
    }

    /**
     * 判断本地socket连接状态
     */
    private boolean isConnected() {
        if (socket.isClosed() || !socket.isConnected()) {
            onLineClient.remove(userIP);
            ServerReceiveThread.this.stop();
            System.out.println("socket closed...");
            return false;
        }
        return true;
    }

    /**
     * 数据接收线程
     */
    public class ReceiveThread extends Thread {

        private BufferedReader bufferedReader;
        private boolean isCancel;

        @Override
        public void run() {
            try {
                while (!isCancel) {
                    if (!isConnected()) {
                        isCancel = true;
                        break;
                    }

                    String msg = SocketUtil.readFromStream(bufferedReader);
                    if (msg != null) {
                        if ("ping".equals(msg)) {
                            System.out.println("收到心跳包");
                            lastReceiveTime = System.currentTimeMillis();
                            socketServerResponseInterface.clientOnline(userIP);
                        } else {
                            msg = "用户" + userIP + " : " + msg;
                            System.out.println("我是服务端,我收到数据:"+msg);
                            //接收到消息 启动发消息的线程
                            this.sendMsg(msg);
                            socketServerResponseInterface.clientOnline(userIP);
                        }
                    } else {
                        System.out.println("client is offline...");
                        ServerReceiveThread.this.stop();
                        socketServerResponseInterface.clientOffline();
                        break;
                    }
                    System.out.println("ReceiveThread");
                }

                SocketUtil.inputStreamShutdown(socket);
                SocketUtil.closeBufferedReader(bufferedReader);
                System.out.println("ReceiveThread is finish");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        public void sendMsg(String msg) {
            //开启发送线程  这里最好优化成用线程池
            ExecutorService executorService = Executors.newCachedThreadPool();
            serverSendThread = new ServerSendThread(socket);
            serverSendThread.addMessage("服务端发送测试数据");

            if (socket.isConnected()) {
                executorService.execute(serverSendThread);
            }
        }
    }



}
View Code
ServerSendThread--消息发送类
package com.whalecloud.uip.server.socket;

import java.io.PrintWriter;
import java.net.Socket;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
 * @author lin.hongwen2@iwhalecloud.com
 * @date 2020/3/12 15:12
 */
public class ServerSendThread implements Runnable  {
    private SendThread sendThread;
    private Socket socket;

    private volatile ConcurrentLinkedQueue<String> dataQueue = new ConcurrentLinkedQueue<>();
    private static ConcurrentHashMap<String, Socket> onLineClient = new ConcurrentHashMap<>();

    private long lastReceiveTime = System.currentTimeMillis();

    private String userIP;

    public String getUserIP() {
        return userIP;
    }

    public ServerSendThread(Socket socket) {
        this.socket = socket;
        this.userIP = socket.getInetAddress().getHostAddress();
    }

    @Override
    public void run() {
        try {
            //开启发送线程
            sendThread = new SendThread();
            sendThread.printWriter = new PrintWriter(socket.getOutputStream(), true);
            sendThread.start();

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 断开socket连接
     */
    public void stop() {
        try {
            System.out.println("stop");
            if (sendThread != null) {
                sendThread.isCancel = true;
                toNotifyAll(sendThread);
                sendThread.interrupt();
                if (sendThread.printWriter != null) {
                    //防止写数据时停止,写完再停
                    synchronized (sendThread.printWriter) {
                        SocketUtil.closePrintWriter(sendThread.printWriter);
                        sendThread.printWriter = null;
                    }
                }
                sendThread = null;
                System.out.println("stop sendThread");
            }
            onLineClient.remove(userIP);
            System.out.println("用户:" + userIP
                + " 退出,当前在线人数:" + onLineClient.size());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 发送消息
     */
    public void addMessage(String data) {
        if (!isConnected()) {
            return;
        }

        dataQueue.offer(data);
        //有新增待发送数据,则唤醒发送线程
        toNotifyAll(dataQueue);
    }

    /**
     * 获取已接连的客户端
     */
    public Socket getConnectdClient(String clientID) {
        return onLineClient.get(clientID);
    }

    /**
     * 打印已经连接的客户端
     */
    public static void printAllClient() {
        if (onLineClient == null) {
            return;
        }
        Iterator<String> inter = onLineClient.keySet().iterator();
        while (inter.hasNext()) {
            System.out.println("client:" + inter.next());
        }
    }

    /**
     * 阻塞线程,millis为0则永久阻塞,知道调用notify()
     */
    public void toWaitAll(Object o) {
        synchronized (o) {
            try {
                o.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * notify()调用后,并不是马上就释放对象锁的,而是在相应的synchronized(){}语句块执行结束,自动释放锁后
     */
    public void toNotifyAll(Object obj) {
        synchronized (obj) {
            obj.notifyAll();
        }
    }

    /**
     * 判断本地socket连接状态
     */
    private boolean isConnected() {
        if (socket.isClosed() || !socket.isConnected()) {
            onLineClient.remove(userIP);
            ServerSendThread.this.stop();
            System.out.println("socket closed...");
            return false;
        }
        return true;
    }

    /**
     * 数据发送线程,当没有发送数据时让线程等待
     */
    public class SendThread extends Thread {

        private PrintWriter printWriter;
        private boolean isCancel;

        @Override
        public void run() {
            try {
                while (!isCancel) {
                    if (!isConnected()) {
                        isCancel = true;
                        break;
                    }

                    String msg = dataQueue.poll();
                    if (msg == null) {
                        toWaitAll(dataQueue);
                    } else if (printWriter != null) {
                        synchronized (printWriter) {
                            SocketUtil.write2Stream(msg, printWriter);
                        }
                    }
                    System.out.println("SendThread");
                }

                SocketUtil.outputStreamShutdown(socket);
                SocketUtil.closePrintWriter(printWriter);
                System.out.println("SendThread is finish");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
View Code

测试后发现服务端可以收到心跳+客户端发送过来的数据

客户端发送的信息:

 我这里尝试了启动两个client。两个client发不同的信息。然后server端根据接收到的信息发送不同的值。

测试后发现发送的数据不会socket互串。只会对应的客户端收到信息

gitLab项目地址:  https://github.com/linHongWenGithub/socketLearn

原文地址:https://www.cnblogs.com/linhongwenBlog/p/12456590.html