ConcurrentWebSocketSessionDecorator(并发安全的websocket) The remote endpoint was in state [TEXT_PARTIAL_WRITING]

1,解决的问题:ConcurrentWebSocketSessionDecorator The remote endpoint was in state [TEXT_PARTIAL_WRITING],

 注:websocket sendMessage 发送基本逻辑: 在发送消息前,会校验一下state,如果是State.OPEN或 State.TEXT_PARTIAL_READY,则可以发送,并把stat设置为TEXT_PARTIAL_WRITING,否则报错。等发送完成,则又设置state为open,如此往复

2,现象

2020-10-14 11:16:08 880 [,] [WsClientSendThreadPool-9] WebsocketUtil.java 119 webSocketSendMessage ERROR
c.e.wsgate.util.WebsocketUtil - userSessionInfo:sessionId=42aa19eb-65d3-e472-b244-6c59ac9479ad,room=3,uid=2,localAddr=[10.3.246.135:8081],remoteAddr=10.2.45.220:65057, WebSocketSession sendMessage err={}
java.lang.IllegalStateException: The remote endpoint was in state [TEXT_PARTIAL_WRITING] which is an invalid state for called method
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase$StateMachine.checkState(WsRemoteEndpointImplBase.java:1229)
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase$StateMachine.textPartialStart(WsRemoteEndpointImplBase.java:1186)
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase.sendPartialString(WsRemoteEndpointImplBase.java:222)
at org.apache.tomcat.websocket.WsRemoteEndpointBasic.sendText(WsRemoteEndpointBasic.java:49)
at org.springframework.web.socket.adapter.standard.StandardWebSocketSession.sendTextMessage(StandardWebSocketSession.java:215)
at org.springframework.web.socket.adapter.AbstractWebSocketSession.sendMessage(AbstractWebSocketSession.java:106)
at org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator.tryFlushMessageBuffer(ConcurrentWebSocketSessionDecorator.java:171)
at org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator.sendMessage(ConcurrentWebSocketSessionDecorator.java:144)
at com.example.wsgate.util.WebsocketUtil.webSocketSendMessage(WebsocketUtil.java:110)
at com.example.wsgate.util.RoomStockUser.sendToAllUserClient(RoomStockUser.java:310)
at com.example.wsgate.util.RoomStockUser.sendToClient(RoomStockUser.java:358)
at com.example.wsgate.config.WsClientSendThreadPool.lambda$sendWorker$0(WsClientSendThreadPool.java:36)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

3,问题描述:在100QPS压测下,在浏览器中创建一个websocket用户用以观察数据情况,最后在日志中发现频繁报错:java.lang.IllegalStateException: The remote endpoint was in state [TEXT_PARTIAL_WRITING] which is an invalid state for called method, 但是用java代码创建的200个websocket链接正常

4,发送逻辑

if (webSocketSession.isOpen()) {
webSocketSession.sendMessage(textMessage);
}

5,疑问

5.1 疑问1:webSocketSession.isOpen()是判断了stat状态了,为什么还会有这个state的异常?
释疑:看代码 org.apache.tomcat.websocket.WsRemoteEndpointImplBase$StateMachine.checkState(WsRemoteEndpointImplBase.java:1229),发现这个state非彼state,isOpen判断的是websocket下的是state,感觉更多是指服务器端的websocket链接状态
而报错的state 是WsRemoteEndpointImplBase StateMachine 下的state,是服务器端用来标注和客户端交互的一个状态,跟多的是表示当前客户端链接的状态

5.2 疑问2:发送线程是单线程,且是支持并发的ConcurrentWebSocketSessionDecorator,为什么还会出现这样的异常呢?
释疑:如下先展示两段核心代码
ConcurrentWebSocketSessionDecorator:这是一个线程安全的

private boolean tryFlushMessageBuffer() throws IOException {
if (this.flushLock.tryLock()) {//获取锁,获取即执行发送,没获取,就算了,等下次再问一下,反正数据已经存入到消息队列中了,和lock有区别
try {
while (true) {
WebSocketMessage<?> message = this.buffer.poll();
if (message == null || shouldNotSend()) {
break;
}
this.bufferSize.addAndGet(-message.getPayloadLength());
this.sendStartTime = System.currentTimeMillis();
getDelegate().sendMessage(message);//用来发送消息的
this.sendStartTime = 0;
}
}
finally {
this.sendStartTime = 0;
this.flushLock.unlock();
}
return true;
}
return false;
}


以及根据getDelegate().sendMessage(message)跟到的
WsRemoteEndpointImplBase:这个是在发送数据之前,检查state,并设置state为TEXT_PARTIAL_WRITING

public synchronized void textPartialStart() {
checkState(State.OPEN, State.TEXT_PARTIAL_READY);
state = State.TEXT_PARTIAL_WRITING;
}

WsRemoteEndpointImplBase:这一段应该是一个阻塞式

void sendMessageBlock(CharBuffer part, boolean last) throws IOException {
long timeoutExpiry = getTimeoutExpiry();
boolean isDone = false;
while (!isDone) {
encoderBuffer.clear();
CoderResult cr = encoder.encode(part, encoderBuffer, true);
if (cr.isError()) {
throw new IllegalArgumentException(cr.toString());
}
isDone = !cr.isOverflow();
encoderBuffer.flip();
sendMessageBlock(Constants.OPCODE_TEXT, encoderBuffer, last && isDone, timeoutExpiry);//发送消息
}
stateMachine.complete(last);//发送完毕,将state设置为open
}

从代码上思考,想不出来有什么问题。
最后发现一个淹没在 java.lang.IllegalStateException中的一个java.lang.NullPointerException: null异常,如下

2020-10-14 14:06:18 495 [,] [WsClientSendThreadPool-11] WebsocketUtil.java 121 webSocketSendMessage ERROR
c.e.wsgate.util.WebsocketUtil - userSessionInfo:sessionId=cde27712-c591-f3e9-104b-1b4aa962f24e,room=3,uid=1,localAddr=[10.3.246.134:8081],remoteAddr=10.2.45.220:58031,type=true,threadId=199,threadName=WsClientSendThreadPool-11, WebSocketSession sendMessage err={}
java.lang.NullPointerException: null
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase.sendMessageBlock(WsRemoteEndpointImplBase.java:310)
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase.sendMessageBlock(WsRemoteEndpointImplBase.java:250) //就是上一个方法中的发送消息的方法
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase.sendPartialString(WsRemoteEndpointImplBase.java:223)
at org.apache.tomcat.websocket.WsRemoteEndpointBasic.sendText(WsRemoteEndpointBasic.java:49)
at org.springframework.web.socket.adapter.standard.StandardWebSocketSession.sendTextMessage(StandardWebSocketSession.java:215)
at org.springframework.web.socket.adapter.AbstractWebSocketSession.sendMessage(AbstractWebSocketSession.java:106)
at org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator.tryFlushMessageBuffer(ConcurrentWebSocketSessionDecorator.java:171)
at org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator.sendMessage(ConcurrentWebSocketSessionDecorator.java:144)
at com.example.wsgate.util.WebsocketUtil.webSocketSendMessage(WebsocketUtil.java:110)
at com.example.wsgate.util.RoomStockUser.sendToAllUserClient(RoomStockUser.java:314)
at com.example.wsgate.util.RoomStockUser.sendToClient(RoomStockUser.java:362)
at com.example.wsgate.config.WsClientSendThreadPool.lambda$sendWorker$0(WsClientSendThreadPool.java:36)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

根据错误栈,找到:!bsh.getSendResult().isOK() 这一句报空指针异常
//SendResult:The result of asynchronously sending a web socket message. A SendResult is either ok indicating there was no problem, or is not OK in which case there was a problem and it carries an exception to indicate what the problem was

private void sendMessageBlock(byte opCode, ByteBuffer payload, boolean last,
long timeoutExpiry) throws IOException {
wsSession.updateLastActive();

...

for (MessagePart mp : messageParts) {
writeMessagePart(mp);
if (!bsh.getSendResult().isOK()) { //
messagePartInProgress.release();
Throwable t = bsh.getSendResult().getException();
wsSession.doClose(new CloseReason(CloseCodes.GOING_AWAY, t.getMessage()),
new CloseReason(CloseCodes.CLOSED_ABNORMALLY, t.getMessage()));
throw new IOException (t);
}
// The BlockingSendHandler doesn't call end message so update the
// flags.
fragmented = nextFragmented;
text = nextText;
}

if (payload != null) {
payload.clear();
}

endMessage(null, null);
}

6,异常现象的结论就是:
1,服务器端给客户端发送数据,出现了某种问题(可能是底层的一个实现逻辑,根据api的解释,主要标注发送消息是否成功或失败,参照:SendResult),报错。导致WsRemoteEndpointImplBase 下sendMessageBlock函数中stateMachine.complete(last)(将state设置为open)没法执行,使state一直为TEXT_PARTIAL_WRITING,
后来的sender线程在调用这个websocket时,就一直报错,

7,总的结论:

1,是tomcat的一个bug,Bug 64061 ,参照https://bz.apache.org/bugzilla/show_bug.cgi?id=64061

8,解决方案

1,在pom中指定新的tomcat版本

<properties>
<tomcat.version>9.0.30</tomcat.version>
</properties>
<dependency>
<groupId>org.apache.tomcat</groupId>
<artifactId>tomcat-juli</artifactId>
<version>${tomcat.version}</version>
</dependency>

2,在sendMessage catch IllegalStateException 并close条这个链接,使客户端重连


9,带来的问题:
tomcat升级之后,同样压测,会有java.io.IOException: java.net.SocketTimeoutException 导致的 websocket handleTransportError,触发退群逻辑,关闭掉链接,客户端会再重连

2020-10-14 17:37:04 763 [,] [WsClientSendThreadPool-16] LiveRoomWebSocketHandler.java 166 handleTransportError ERROR
c.e.w.h.LiveRoomWebSocketHandler - handleTransportError. sessionId=eb92bb44-0d6d-3882-2f95-63030d49b10a,localAddr=[10.3.246.134:8081],remoteAddr=10.2.45.220:52145,group=3,uid=1,joinedRoom=true,exception={}
java.io.IOException: java.net.SocketTimeoutException
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase.sendMessageBlock(WsRemoteEndpointImplBase.java:315)
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase.sendMessageBlock(WsRemoteEndpointImplBase.java:258)
at org.apache.tomcat.websocket.WsSession.sendCloseMessage(WsSession.java:612)
at org.apache.tomcat.websocket.WsSession.doClose(WsSession.java:497)
at org.apache.tomcat.websocket.WsSession.doClose(WsSession.java:459)
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase.sendMessageBlock(WsRemoteEndpointImplBase.java:313)
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase.sendMessageBlock(WsRemoteEndpointImplBase.java:250)
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase.sendPartialString(WsRemoteEndpointImplBase.java:223)
at org.apache.tomcat.websocket.WsRemoteEndpointBasic.sendText(WsRemoteEndpointBasic.java:49)
at org.springframework.web.socket.adapter.standard.StandardWebSocketSession.sendTextMessage(StandardWebSocketSession.java:215)
at org.springframework.web.socket.adapter.AbstractWebSocketSession.sendMessage(AbstractWebSocketSession.java:106)
at org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator.tryFlushMessageBuffer(ConcurrentWebSocketSessionDecorator.java:171)
at org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator.sendMessage(ConcurrentWebSocketSessionDecorator.java:144)
at com.example.wsgate.util.WebsocketUtil.webSocketSendMessage(WebsocketUtil.java:110)
at com.example.wsgate.util.RoomStockUser.sendToAllUserClient(RoomStockUser.java:314)
at com.example.wsgate.util.RoomStockUser.sendToClient(RoomStockUser.java:362)
at com.example.wsgate.config.WsClientSendThreadPool.lambda$sendWorker$0(WsClientSendThreadPool.java:36)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.SocketTimeoutException: null
at org.apache.tomcat.util.net.NioEndpoint$Poller.timeout(NioEndpoint.java:977)
at org.apache.tomcat.util.net.NioEndpoint$Poller.run(NioEndpoint.java:753)
... 1 common frames omitted
2020-10-14 17:37:04 772 [,] [WsClientSendThreadPool-16] WebsocketUtil.java 123 webSocketSendMessage ERROR
c.e.wsgate.util.WebsocketUtil - userSessionInfo:sessionId=eb92bb44-0d6d-3882-2f95-63030d49b10a,room=3,uid=1,localAddr=[10.3.246.134:8081],remoteAddr=10.2.45.220:52145,threadId=154,threadName=WsClientSendThreadPool-16, WebSocketSession sendMessage err={}
java.io.IOException: java.net.SocketTimeoutException
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase.sendMessageBlock(WsRemoteEndpointImplBase.java:315)
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase.sendMessageBlock(WsRemoteEndpointImplBase.java:250)
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase.sendPartialString(WsRemoteEndpointImplBase.java:223)
at org.apache.tomcat.websocket.WsRemoteEndpointBasic.sendText(WsRemoteEndpointBasic.java:49)
at org.springframework.web.socket.adapter.standard.StandardWebSocketSession.sendTextMessage(StandardWebSocketSession.java:215)
at org.springframework.web.socket.adapter.AbstractWebSocketSession.sendMessage(AbstractWebSocketSession.java:106)
at org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator.tryFlushMessageBuffer(ConcurrentWebSocketSessionDecorator.java:171)
at org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator.sendMessage(ConcurrentWebSocketSessionDecorator.java:144)
at com.example.wsgate.util.WebsocketUtil.webSocketSendMessage(WebsocketUtil.java:110)
at com.example.wsgate.util.RoomStockUser.sendToAllUserClient(RoomStockUser.java:314)
at com.example.wsgate.util.RoomStockUser.sendToClient(RoomStockUser.java:362)
at com.example.wsgate.config.WsClientSendThreadPool.lambda$sendWorker$0(WsClientSendThreadPool.java:36)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.SocketTimeoutException: null
at org.apache.tomcat.util.net.SocketWrapperBase.vectoredOperation(SocketWrapperBase.java:1435)
at org.apache.tomcat.util.net.SocketWrapperBase.write(SocketWrapperBase.java:1353)
at org.apache.tomcat.util.net.SocketWrapperBase.write(SocketWrapperBase.java:1324)
at org.apache.tomcat.websocket.server.WsRemoteEndpointImplServer.doWrite(WsRemoteEndpointImplServer.java:90)
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase.writeMessagePart(WsRemoteEndpointImplBase.java:499)
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase.sendMessageBlock(WsRemoteEndpointImplBase.java


10,思考:
1,用java代码创建了200个链接,一个浏览器链接,为什么唯独浏览器这个链接会有问题呢?浏览器这个在交互中有什么不同,会触发这个问题。

原文地址:https://www.cnblogs.com/woshare/p/13816316.html