WebSocket 的使用

Java 控制台程序实现类似广播功能

服务器端代码

添加 maven 依赖

<dependency>
    <groupId>javax.websocket</groupId>
    <artifactId>javax.websocket-api</artifactId>
    <version>1.1</version>
    <scope>provided</scope>
</dependency>

服务器端代码

package com.seliote.web.http;

import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;

/**
 * 每次有 WebSocket 连接请求都会创建一个该类的实例
 */
@ServerEndpoint(value = "/broadcast")
public class SocketServer {
    private static final List<Session> onlinePeople = new ArrayList<>();

    @OnOpen
    public void onOpen(Session aSession) {
        System.out.println(System.currentTimeMillis() + ": OnOpen:::" + onlinePeople.size() + 1);
        if (!onlinePeople.contains(aSession)) {
            onlinePeople.add(aSession);
        }
    }

    // 该方法是用于被动接收信息的 
    @OnMessage
    public void onMessage(Session aSession, String aS) throws IOException {
        System.out.println(System.currentTimeMillis() + ": OnMessage:::" + aS);
        for (Session session : onlinePeople) {
            session.getBasicRemote().sendText(aS);
        }
    }

    // OnMessage 可以有多个不同签名的
    @OnMessage
    public void onMessage(Session aSession, InputStream aInputStream) {
        System.out.println(System.currentTimeMillis() + ": OnMessage");
        // TODO
    }

    /**
     * 每次有客户端异常关闭该方法也会调用
     * @param aSession
     * @param aCloseReason
     */
    @OnClose
    public void onClose(Session aSession, CloseReason aCloseReason) {
        System.out.println(System.currentTimeMillis() + ": OnClose:::" + aCloseReason.getReasonPhrase());
        if (onlinePeople.contains(aSession)) {
            onlinePeople.remove(aSession);
        }
    }

    @OnError
    public void onError(Session aSession, Throwable aThrowable) {
        System.out.println(System.currentTimeMillis() + ": OnError");
        aThrowable.printStackTrace();
    }
}

如果连接时需要携带客户端信息,那么可以在路径中加入参数,如客户端路径加入用户 Token 变为 127.0.0.1/broadcast/123456,服务器端的标注就可改为 @ServerEndpoint(value = "/broadcast/{token}") ,之后的 @OnOpen 方法中就可以有一个 @PathParam("token") String aToken 代表客户端传入的 Token

客户端代码

添加 maven 依赖,注意这里使用的是 tyrus-standalone-client 而非 javax.websocket-client-api 后者会报错

<dependency>
    <groupId>org.glassfish.tyrus.bundles</groupId>
    <artifactId>tyrus-standalone-client</artifactId>
    <version>1.3.3</version>
    <scope>compile</scope>
</dependency>

客户端代码

package com.seliote;

import javax.websocket.*;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Scanner;

@ClientEndpoint
public class Demo {
    private static Session sSession;

    public static void main(String... args) throws URISyntaxException, DeploymentException, IOException {
        // https 协议对应使用 wss 
        URI uri = new URI("ws", "127.0.0.1:8080", "/broadcast", null, null);
        // 通过 ContainerProvider 的 static 方法 getWebSocketContainer() 获得 WebSocketContainer
        sSession = ContainerProvider.getWebSocketContainer().connectToServer(Demo.class, uri);
        try (Scanner scanner = new Scanner(System.in)) {
            String broadcastMsg = "";
            while (true) {
                broadcastMsg = scanner.nextLine();
                // 通过 Session 对象主动发送信息
                sSession.getBasicRemote().sendText(broadcastMsg);
                //sSession.getBasicRemote().getSendStream().write(....);
            }
        }
    }

    @OnOpen
    public void onOpen() {
        System.out.println(System.currentTimeMillis() + ": OnOpen ");
    }

    // 该方法是用于被动接收信息的 
    @OnMessage
    public void onMessage(String aS) {
        System.out.println(System.currentTimeMillis() + ": OnMessage::: " + aS);
    }
}

在一个客户端输入信息后服务器会及时收到信息并广播给所有在线的客户端

------------------------------------------2019.01.09 更新

如果需要支持相应的实体类型,WebSocket 服务器端大概长 这样,而客户端配置如下

Maven 依赖(这里用了 JSONObject 而不是服务器端的 Jackson)

<dependency>
    <groupId>org.glassfish.tyrus.bundles</groupId>
    <artifactId>tyrus-standalone-client</artifactId>
    <version>1.3.3</version>
    <scope>compile</scope>
</dependency>
<dependency>
    <groupId>org.json</groupId>
    <artifactId>json</artifactId>
    <version>20180813</version>
    <scope>compile</scope>
</dependency>
package com.seliote.demo;

/**
 * @author seliote
 * @date 2019-01-09
 * @description WebSocket 信息实体
 */
@SuppressWarnings({"unused", "WeakerAccess"})
public class BroadcastMsg {
    private String mSessionId;
    private String mTimestamp;
    private String mMsg;

    public BroadcastMsg() {}

    public BroadcastMsg(String aSessionId, String aTimestamp, String aMsg) {
        mSessionId = aSessionId;
        mTimestamp = aTimestamp;
        mMsg = aMsg;
    }

    public String getSessionId() {
        return mSessionId;
    }

    public void setSessionId(String aSessionId) {
        mSessionId = aSessionId;
    }

    public String getTimestamp() {
        return mTimestamp;
    }

    public void setTimestamp(String aTimestamp) {
        mTimestamp = aTimestamp;
    }

    public String getMsg() {
        return mMsg;
    }

    public void setMsg(String aMsg) {
        mMsg = aMsg;
    }

    @Override
    public String toString() {
        return mSessionId + " - " + mTimestamp + " - " + mMsg;
    }
}
package com.seliote.demo;

import org.json.JSONObject;

import javax.websocket.Decoder;
import javax.websocket.Encoder;
import javax.websocket.EndpointConfig;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;

/**
 * @author seliote
 * @date 2019-01-09
 * @description BroadcastMsg 用于 WebSocket 的编码与解码器
 */
public class BroadcastMsgCoder implements Encoder.BinaryStream<BroadcastMsg>, Decoder.BinaryStream<BroadcastMsg> {
    
    @Override
    public void init(EndpointConfig aEndpointConfig) {

    }

    @Override
    public void destroy() {

    }
    
    @Override
    public void encode(BroadcastMsg aBroadcastMsg, OutputStream aOutputStream) throws IOException {
        aOutputStream.write(new JSONObject(aBroadcastMsg).toString().getBytes(StandardCharsets.UTF_8));
    }

    @Override
    public BroadcastMsg decode(InputStream aInputStream) throws IOException {
        byte[] buffer = new byte[1024];
        int length;
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        while ((length = aInputStream.read(buffer)) != -1) {
            byteArrayOutputStream.write(buffer, 0, length);
        }
        String json = new String(byteArrayOutputStream.toByteArray(), StandardCharsets.UTF_8);
        JSONObject jsonObject = new JSONObject(json);
        return new BroadcastMsg(
                jsonObject.getString("sessionId"),
                jsonObject.getString("timestamp"),
                jsonObject.getString("msg")
                );
    }
}
package com.seliote.demo;

import javax.websocket.ClientEndpoint;
import javax.websocket.ContainerProvider;
import javax.websocket.DeploymentException;
import javax.websocket.EncodeException;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Scanner;

@ClientEndpoint(
        encoders = BroadcastMsgCoder.class,
        decoders = BroadcastMsgCoder.class
)
public class Demo {

    public static void main(String... args) throws URISyntaxException, DeploymentException, IOException {
        // https 协议对应使用 wss
        URI uri = new URI("ws", "127.0.0.1:8080", "/time/1", null, null);
        // 通过 ContainerProvider 的 static 方法 getWebSocketContainer() 获得 WebSocketContainer
        Session session = ContainerProvider.getWebSocketContainer().connectToServer(Demo.class, uri);
        try (Scanner scanner = new Scanner(System.in)) {
            //noinspection InfiniteLoopStatement
            while (true) {
                // 通过 Session 对象主动发送信息
                try {
                    String msg = scanner.nextLine();
                    BroadcastMsg broadcastMsg = new BroadcastMsg(
                            session.getId(),
                            System.currentTimeMillis() + "",
                            msg
                    );
                    session.getBasicRemote().sendObject(broadcastMsg);
                } catch (EncodeException exp) {
                    exp.printStackTrace();
                }
                //sSession.getBasicRemote().getSendStream().write(....);
            }
        }
    }

    @OnOpen
    public void onOpen() {
        System.out.println(System.currentTimeMillis() + ": OnOpen ");
    }

    @OnMessage
    public void onMessage(String aS) {
        System.out.println(System.currentTimeMillis() + ": OnMessage::: " + aS);
    }

    @OnMessage
    public void onMessage(BroadcastMsg aBroadcastMsg) {
        System.out.println(aBroadcastMsg);
    }
}
原文地址:https://www.cnblogs.com/seliote/p/9761571.html