基于JSR-356实现的Tyrus WebSocket框架的消息传递机制初步了解

对阻塞、非阻塞,同步、异步初步了解了,不是太明白,其中大多数将的是对于CPU的使用率及对一个事件的处理效率。 阻塞与非阻塞各有优缺点,在知乎上也看到了下面的结论:

在处理 IO 的时候,阻塞和非阻塞都是同步 IO。 只有使用了特殊的 API 才是异步 IO。

一、Tyrus发送消息的两种方式:

The interface javax.websocket.RemoteEndpoint, part of Java API for
WebSocket, is designed to represent the other end of the communication
(related to the endpoint), so the developer uses it to send the

  1. There are two basic interfaces the user may use -
    javax.websocket.RemoteEndpoint$Basic

andjavax.websocket.RemoteEndpoint$Async.

javax.websocket.RemoteEndpoint接口,是JAVA API中的WebSocket的一部分,它被设计为与连接的另一端进行通信,开发人员可以使用它发送消息。有两种基本的接口可以使用:接口javax.websocket.RemoteEndpoint$Basic(同步消息)和javax.websocket.RemoteEndpoint$Async(异步消息)

1:同步消息传递(javax.websocket.RemoteEndpoint$Basic)

is used to send synchronous messages The point of completion of the
send is defined when all the supplied data has been written to the
underlying connection. The methods for sending messages on the
javax.websocket.RemoteEndpoint$Basic block until this point of
completion is reached, except
forjavax.websocket.RemoteEndpoint$Basic#getSendStream() and
javax.websocket.RemoteEndpoint$Basic#getSendWriter() which present
traditional blocking I/O streams to write messages. See the example
"Sending message via RemoteEndpoint.Basic instance" to see how the
whole text message is send. The following example demonstrates a
method which sends the partial text method to the peer:

javax.websocket.RemoteEndpoint$Basic接口被用来发送同步消息,当所有提供的数据都被写入到底层连接时,完成发送的标识被确定。通过javax.websocket.RemoteEndpoint$Basic接口发送消息的方法会被阻塞,直到完成发送的标识到达。javax.websocket.RemoteEndpoint$Basic#getSendStream()方法和javax.websocket.RemoteEndpoint$Basic#getSendWriter()方法 通过传统的传统的阻塞I / O流写入消息。

这里的完成发送的标识应该是方法中的isLast参数(Boolean),这里的了解的还不深入,之前一直在找WebSocket客户端处理消息的方式是什么,通过自己的几个小测试应该是多线程的。

(1)在发送同步消息时,可能一条消息被分为几次发送,相应的方法为:

例1:发送一条消息的部分消息

public void sendPartialTextMessage(String message, Boolean isLast, Session session){  
    try {  
        session.getBasicRemote().sendText(message, isLast);  
    } catch (IOException e) {  
        e.printStackTrace();  
    }  
}  

(2)发送同步消息,发送整条消息

例2:发送整条消息

@OnMessage  
public void echo(String message, Session session) {  
    session.getBasicRemote().sendText(message);  
}  

这里没有说明一条完整的消息发送完是阻塞的方式还是非阻塞的方式。 如果是阻塞的方式在server-client这类型的系统中,client对于单个server在client处理相关事务时,server发送的消息就没有使用消息队列等相关的将并行消息转为串行消息的处理。

在sever-client系统中 假设一条完整的消息发送完是阻塞的,但是对于client接收到了相关消息并作出了处理,处理后又给server发送的结果消息,在消息没有标识的情况下,server是怎么知道client发送的结果消息是对应之前哪条消息的呢?

2.异步消息传递(javax.websocket.RemoteEndpoint$Async) This representation of

the peer of a web socket conversation has the ability to send messages

  1. The point of completion of the send is defined when

  2. the supplied data has been written to the underlying connection.

The completion handlers for the asynchronous methods are always called
with a different thread from that which initiated the send.

每一个WebSocket的端点都有异步发送消息的能力。当所有提供的数据都被写入到底层连接时,完成发送的标识被确定。异步消息处理通常是调用不同的线程来开始发送消息。

例3:使用Future异步发送消息

public void sendWholeAsyncMessage(String message, Session session){  
    Future<Void> future = session.getAsyncRemote().sendText(message);  
}  

这里讲的都是发送消息的方式,在另一端处理接收的消息并没有进行介绍,不过WebSocket是全双工通信机制,两端的接收、发送消息的方式正好相反的方式。

二、Tyrus接收消息的方式

MessagHandler(消息处理器)

Implementing the javax.websocket.MessageHandler interface is one of
the ways how to receive messages on endpoints (both server and
client). It is aimed primarily on programmatic endpoints, as the
annotated ones use the method level annotation
javax.websocket.OnMessage to denote the method which receives
messages.

实现javax.websocket.MessageHandler接口是接收消息的一种方式(服务端和客户端)。。它主要针对的编程性端点,作为注解的人使用的方法级别的注释javax.websocket.OnMessage表示接收消息的方法。

The MessageHandlers get registered on the Session instance

MessageHandlers 通过Session实例注册

There are two orthogonal criterions which classify MessageHandlers.
According the WebSocket Protocol (RFC 6455) the message may be sent
either complete, or in chunks. In Java API for WebSocket this fact is
reflected by the interface which the handler implements. Whole
messages are processed by handler which
implementsjavax.websocket.MessageHandler.Whole interface. Partial
messages are processed by handlers that implement
javax.websocket.MessageHandler.Partial interface. However, if user
registers just the whole message handler, it doesn't mean that the
handler will process solely whole messages. If partial message is
received, the parts are cached by Tyrus until the final part is

  1. Then the whole message is passed to the handler. Similarly,

  2. the user registers just the partial message handler and whole

message is received, it is passed directly to the handler.

MessageHandlers有两个规则。根据WebSocket协议(RFC6455)的消息可能会被发送要么完成,或发送一部分。在Java API中的WebSocket,这个规则是处理器实现的接口反射实现的。整个消息由它实现javax.websocket.MessageHandler.Whole接口处理程序进行处理。部分消息由实现javax.websocket.MessageHandler.Partial接口处理程序进行处理。但是,如果用户注册的只是整个消息的处理程序,但这并不意味着该处理器将只处理整个消息。如果接收部分消息中,部分消息由Tyrus缓存接收直到接收到整体的所有消息。然后整个消息传递给处理程序。同样地,如果用户注册的只是部分消息的处理器,这个处理器可以处理部分消息也可以处理整个消息,如果接收到的是整个的消息会直接发送给处理器。

The second criterion is the data type of the message. WebSocket
Protocol (RFC 6455) defines four message data type - text message,
According to Java API for

WebSocket the text messages will be processed by MessageHandlers with
the following types

java.lang.String

java.io.Reader

any developer object for which there is a corresponding
javax.websocket.Decoder.Text or javax.websocket.Decoder.TextStream.

The binary messages will be processed by MessageHandlers with the
following types:

java.nio.ByteBuffer

java.io.InputStream

any developer object for which there is a corresponding
javax.websocket.Decoder.Binary or
javax.websocket.Decoder.BinaryStream.

第二个规则是关于消息类型的。WebSocket协议(RFC-6455)规定了四种消息类型-文本消息,根据Java API中的WebSocket定义,文本消息能被消息处理器处理的类型如下:

java.lang.String

java.io.Reader

任意开发者开发的对象类型,可以通过javax.websocket.Decoder.Text 或 javax.websocket.Decoder.TextStream进行编码解码处理后进行传输、解析

能被消息处理器处理的二进制类型如下:

java.nio.ByteBuffer

java.io.InputStream

任意开发者开发的对象类型,可以通过 javax.websocket.Decoder.Binary 或 javax.websocket.Decoder.BinaryStream进行编码解码处理后进行传输、解析

注意:只能为每个类型的消息注册一个消息处理器消息类型分别为: text messages, binary messages, pong messages

到这里根据官方文档的解释及个人的理解为以上内容,介绍了发送消息的两种方式,一种为同步方式、另一种为异步方式,其中同步方式又分为将消息分为几个部分进行发送,每发送一部分消息就进行阻塞,另一种同步方式是发送一个完整的消息。

接收消息这里只介绍了每个sever端或client端中注册的消息处理器处理的消息的类型的规则。

还没有找到介绍接收消息时,对接收的消息的处理方式,多线程还是其他处理方式是否Tyrus可以进行相关的设置?

在官方文档中发现了shared client container 这一小节,这小节介绍了client端的线程池的默认设置,默认设置为一个选择线程,两个工作线程

By default, WebSocket client implementation in Tyrus re-creates client
runtime whenever WebSocketContainer#connectToServer is invoked. This
approach gives us some perks like out-of-the-box isolation and
relatively low thread count (currently we have 1 selector thread and 2
worker threads). Also it gives you the ability to stop the client
runtime – one Session instance is tied to exactly one client runtime,
so we can stop it when Session is closed. This seems as a good
solution for most of WebSocket client use cases – you usually use java
client from application which uses it for communicating with server
side and you typically don’t need more than 10 instances (my personal
estimate is that more than 90% applications won’t use more than 1
connection). There are several reasons for it – of it is just a
client, it needs to preserve server resources – one WebSocket
connection means one TCP connection and we don’t really want clients
to consume more than needed. Previous statement may be invalidated by
WebSocket multiplexing extension, but for now, it is still valid.

一个WebSocket连接就是一个TCP连接,在没有特别繁忙的消息接收的场景下不需要设置过大的线程数,只有当你确认你的程序真的需要更多的线程,推荐你设置最大线程数 设置最大线程池数的方法

client.getProperties().put(GrizzlyClientProperties.SELECTOR_THREAD_POOL_CONFIG,
ThreadPoolConfig.defaultConfig().setMaxPoolSize(3));
client.getProperties().put(GrizzlyClientProperties.WORKER_THREAD_POOL_CONFIG,
ThreadPoolConfig.defaultConfig().setMaxPoolSize(10));

(此段有待查证) 通过实验,server端单线程下按顺序发送数条消息后,client端的处理应该是 先将所有消息进行在缓冲区存储,然后逐条消息处理; server端默认不支持多线程,如果在没有修改线程池的最大并发数时,默认为1。

原文地址:https://www.cnblogs.com/twodog/p/12140798.html