Socket接受多包的问题

我采用的是Java socket客户端
协议时这样的
报文结构(3,4,5,6,17,21,25,29,33,38,43,47,51)
通用报文结构分为两部分,包头和包体。
下面是一个完整PDU(协议数据单元)的布局:
PDU 域 长度 类型 说明
包头 Length 4 Integer 此域表示消息包的总长度 = 包头长度 + 包体长度,采用网络字节序。单个PDU的最大长度为30K字节,超过时需要进行分包。
Factorycode 1 Integer 厂商编码,固定,填0xa8。
Progid 1 Integer 进程号,固定,填为0x0c。
Morepkt 1 Integer 是否有后续包,0x01:有,0x00:无。
Cmd_id 10 String 命令字,以NULL结尾。
Start_num 4 Integer 起始记录号,固定,填0x00000000。
End_num 4 Integer 终止记录号,固定,填0x00000000。
Request_id 4 Integer 请求ID,采用网络字节序。由客服系统生成,第三方系统应答时返回原值,以此唯一标识业务调用。
Answer_id 4 Integer 应答ID,固定,填0x00000000。
Sequence 4 Integer 分包序号,客服发包时填0x00;第三方系统返回数据小于30k时只回一个包,填0x01;第三方系返回数据大于30k时进行分包,依次填0x01、0x02、...0x**。
Rec_seperator 5 String 记录分隔符,固定,填";",即0x3b00000000。
Field_seperator 5 String 字段分隔符,固定,填"~",即0x7e00000000。
Reserved1 4 Integer 保留,固定,填0x00000000。
Reserved2 4 Integer 保留,固定,填0x00000000。
Errorcode 2 Integer 保留,固定,填0x0000。
包体 Datatrans n String 请求消息或应答消息,长度不定,以NULL结尾。具体格式和长度参见业务定义。

1.2 协议说明
1. 包为变长,总长度由Length域指定。
2. 发送包和接受包都使用此结构。
3. 进程号由客服系统产生,第三方系统不可更改。第三方系统前置机根据连接的IP地址和进程号唯一识别一个客户端,判断将哪些数据包回送给哪个客户端。
4. 请求ID为客服系统生成,第三方系统回送数据时将相应的请求ID传回来。
5. 是否有后续包Morepkt:第三方系统返回数据小于30k时只回一个包,填0x00表示无后续包(此为最后一包);第三方系返回数据大于30k时进行分包,最后一包填0x00表示无后续包,其他包填0x01表示还有后续包。
6.记录分隔符和字段分隔符由客服系统指定,用于控制传送内容。第三方系统回送报文时,Rec_seperator 和Field_seperator原样传回,并按照Rec_seperator 和Field_seperator 组织回送内容,详见Datatrans论述部分。
7. Datatrans 为实际发送或接收的数据内容,实际上为一个变长数组,数据内容依次向后排列。
8.  Errorcode 为返回码,表示请求是否被成功地处理,保留未用。
9. 客服系统发送数据包Datatrans格式 :
参数1~参数2~参数3~...参数n;
10. 第三方系统回送数据包Datatrans格式:
返回值~返回说明;记录1;...记录n;
记录格式:字段1~字段2~...字段m。
注意事项1:每条记录的字段数必须相等。
注意事项2:每一个字段的实际传送都以字符串方式传送。
注意事项3:在多包情况下,不可出现一条记录被截断而出现在两个包中的情况。
1.2 业务处理方式
一次查询全部结果,发回给客服系统。
返回数据小于30k时,返回一包,不可以分包。//因此按记录判断是否超长datatrans部分是否超长。
返回数据大于等于30k时,返回多包,必须分包。
在多包情况下,不可出现一条记录被截断而出现在两个包中的情况。
我的程序老是不在接受服务端给我发包时,有时候能接全,有时候失败,而且失败的机会很大希望各位大侠帮助!
我把我的程序发给大家

package com.zte.usmip.adapter.HeNanTelWK;

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;

import com.zte.socket.ContainerClient;
import com.zte.utility.AutoResetEvent;
import com.zte.utility.ConfigHelper;
import com.zte.utility.DataUtil;
import com.zte.utility.LogHelper;
import com.zte.utility.StringHelper;

public class SocketClient implements Runnable
{
// 构造函数
public SocketClient(SocketContainer cc ){
this.setContainerClient(cc);
}
// 容器的引用
private SocketContainer containerClient = null;
public SocketContainer getContainerClient() {
return this.containerClient;
}
public void setContainerClient(SocketContainer cc) {
this.containerClient = cc;
}
// 接收缓冲区大小
private int bufferSize = 30720;
public int getBufferSize() {
return this.bufferSize;
}
public void setBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
}
// Socket 对应的终端号
private String termid = "";
public String getTermid() {
return this.termid;
}
public void setTermid(String termid) {
this.termid = termid;
}


private boolean ConnectOK = false;
/**
* @return the loginOK
*/
public boolean getConnectOK() {
return this.ConnectOK;
}
/**
* @param loginOK the loginOK to set
*/
public void setConnectOK(boolean ConnectOK) {
this.ConnectOK = ConnectOK;
}

// 通讯 Socket Channel
protected SocketChannel m_channel = null;

/**
* 连接到服务器
* @param strIP 服务器IP地址(支持DNS域名)
* @param nPort 服务器端口号
* @return
*/
public boolean Connect(
String strIP, int nPort){
if(this.m_channel!=null){
return false;
}
// 构造endpoint
InetSocketAddress address =
    new InetSocketAddress(strIP,nPort);
// 连接到服务器
try{
this.m_channel = SocketChannel.open(address);
this.setConnectOK(true);
}catch(Exception ex){
ex.printStackTrace();
this.m_channel = null;
return false;
}
// 启动接收线程
this.StartReciveThread();
// 返回成功
return true;
}

// 关闭Socekt
public void Close(){
if(this.m_channel==null)return;
try {
this.m_channel.socket().shutdownInput();
} catch (IOException e) {
System.out.println("无法关闭Socket输入流");
e.printStackTrace();
}
try {
this.m_channel.socket().shutdownOutput();
} catch (IOException e) {
System.out.println("无法关闭Socket输出流");
e.printStackTrace();
}
try {
this.m_channel.close();
} catch (IOException e) {
System.out.println("无法关闭Socket");
e.printStackTrace();
}
this.m_channel = null;
}

// 最后收到的数据
protected byte[] m_receive_data = null;
// 分包数据
protected byte[] m_temp_data = null;
// 分包数据
protected byte[] m_temp_data1 = null;
// 最后的数据同步事件
protected AutoResetEvent m_event_received = new AutoResetEvent();

// 接收线程对象
private Thread m_thread = null;
protected void StartReciveThread(){
// 启动接收线程
this.m_thread = new Thread(this);
this.m_thread.start();
}

/**
* 线程函数
*/
public void run() {
SocketContainer cc = this.getContainerClient();
if(this.m_channel!=null)
{
int nSize = 0;
// 创建并分配缓冲区
ByteBuffer bb = ByteBuffer.allocate(this.getBufferSize());
while(true)
{
// 无连接则退出线程
if(this.m_channel==null)break;
try {
bb.clear();
nSize = this.m_channel.read(bb);
//整理缓冲区使之可用
bb.flip();
if(nSize>0)
{
    // 收到数据
// 复制输出数据块
this.m_receive_data = new byte[nSize];
bb.get(this.m_receive_data, 0, nSize);
bb.clear();
  // 设置时间标记,通知等待接受的线程
m_event_received.Set();
    if(m_receive_data[6]==0x00 && m_receive_data[4]==-88 && m_receive_data[5]==0x0c)
    {
    if(cc!=null)
    cc.freeConnect(this);
System.out.println("soket关闭");
    }
if(cc!=null)
cc.OnReceive(this,this.m_receive_data);

}

} catch (Exception e)
{
// 发生异常,结束接收线程
cc.freeConnect(this);
System.out.println("发生异常,结束接收线程"+e.getMessage());
e.printStackTrace();
break;
}
}
}

if(cc!=null){
cc.freeConnect(this);
System.out.println("soket退出关闭");
}
this.m_thread = null;
System.out.println("Socekt 接收线程退出");

}

public int Send(ByteBuffer bb){
int nSize = 0;
if((this.m_channel!=null) &&
this.m_channel.isConnected() &&
(bb!=null)){
try {
nSize = this.m_channel.write(bb);
} catch (IOException e) {
e.printStackTrace();
nSize = 0;
}
}
return nSize;
}
public int Send(byte[] array){
int nSize = 0;
if(array!=null){
ByteBuffer bb = ByteBuffer.wrap(array);
nSize = this.Send(bb);
}
return nSize;
}
public byte[] getReceiveBuff(){
byte[] data = null;
if((this.m_channel!=null) &&
this.m_channel.isConnected()){
if(this.m_event_received.WaitEvent(15000)){
// 等到数据
data = this.m_receive_data;
}
}
return data;
}

/**
* Socket断开事件
*/
public void OnDisconnect(){
// ContainerClient cc = this.getContainerClient();
//if(cc!=null){
// cc.OnDisconnect(this);
//}
}
}
原文地址:https://www.cnblogs.com/liufei88866/p/1769753.html