NIO系列之MINA

一。MINA架构

   Apache Mina是一个能够帮助用户开发高性能和高伸缩性网络应用程序的框架。它通过Java nio技术基于TCP/IP和UDP/IP协议提供了抽象的、事件驱动的、异步的API

MINA是应用程序(服务端和客户端)和网络层间(TCP UDP 内存通信)的桥梁


深入mina内部架构


mina分为以下三层:

  •    IOService 用于接受连接 处理io请求
  •    IOFilterChain 用于网络间字节数组和数据结构之间的转换  (读:将字节转换为对象传给应用层 ,写:将应用对象转换字节写入流)
  •   IOHandler 用于自己开发的应用业务逻辑 可以监听各种io事件(缓冲区有数据读,创建session,打开session,session空闲等)

开发mina程序 需要以下步骤:

  •   创建 IoService 选择已经存在的实现 (*Acceptor)  或者自定义实现
  •   创建IOFilterChian选择已经实现的或者自定义实现的
  •   创建IoHandler用于处理消息

二。MINA简单实例

所有的文字翻译和编码 参考官网(http://mina.apache.org/mina-project/userguide/user-guide-toc.html)

服务端:

 使用IO Acceptor接受来自网络层连接 经过Filter过滤后 将转换的对象传给用户自定义Handler


添加maven依赖

<dependency>
	  <groupId>org.apache.mina</groupId>
	  <artifactId>mina-core</artifactId>
	  <version>2.0.6</version>
	</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.6.6</version>
</dependency>
这里注意mina中带有slf4j-api和log4j版本对应 否则日志过滤器没有作用

src/main/resource目录下添加log4j.properties

 ### set log levels ###
log4j.rootLogger = debug ,  stdout

### u8F93u51FAu5230u63A7u5236u53F0 ###
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern =  %d{ABSOLUTE} %5p %c{1}:%L - %m%n

代码实现:

package cn.et;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.util.Date;

import org.apache.mina.core.service.IoHandler;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
/**
 * 服务端
 * @author jiaozi
 *
 */
public class MinaServer {
	//端口
	private static final int PORT = 8899;

	public static void main(String[] args) throws IOException {
		//NioSocketAcceptor是IOService实现类
		NioSocketAcceptor server=new NioSocketAcceptor();
		//绑定过滤器 一般如果传送的是文本 就是讲字节数组转换文本的过滤器  必须提供字符集  codec这样的名字可以随便写
		server.getFilterChain().addLast("codec", new ProtocolCodecFilter(
					new TextLineCodecFactory(Charset.forName("UTF-8"))
				));
		//添加日志打印过滤器
		server.getFilterChain().addLast("logger",  new LoggingFilter());
		//每次获取一个连接就会产生一个session session可以设置一些参数 比如数据缓冲区大小 等待时间间隔
		server.getSessionConfig().setReadBufferSize( 2048 );
		server.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10 );
		//设置回调函数 用于处理消息
		server.setHandler(new IoHandler() {
			//session被打开事件 
			public void sessionOpened(IoSession session) throws Exception {
				System.out.println("session打开");
				
			}
			//如果没有消息读写 线程休眠一段时间 处理休眠时触发
			public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
				System.out.println("session空闲");
			}
			
			public void sessionCreated(IoSession session) throws Exception {
				System.out.println("开始创建session");
			}
			
			public void sessionClosed(IoSession session) throws Exception {
				System.out.println("session被关闭");
			}
			
			public void messageSent(IoSession session, Object message) throws Exception {
				System.out.println("有消息被发送出去");
			}
			
			public void messageReceived(IoSession session, Object message) throws Exception {
				System.out.println("接受到一个消息 :"+message);
				session.write(new Date());
			}
			
			public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
				cause.printStackTrace();
				
			}
		});
		server.bind(new InetSocketAddress(PORT));
	}

}

客户端:

 使用IO Connector连接服务端 同理将来自服务端消息 通过转换传给自定义handler

使用telnet模拟客户端  window执行命令
telnet localhost 8899

输入任何消息 发现 服务端控制台获取到了输入的消息  同时session相关的事件都触发 系统日志打印一些事件

telnet上输出了服务器打印的日期  关闭telnet客户端 触发服务端session关闭事件

编码实现:

package cn.et;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.util.Date;

import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.service.IoHandler;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
/**
 * 服务端
 * @author jiaozi
 *
 */
public class MinaClient {
	//端口
	private static final int PORT = 8899;

	public static void main(String[] args) throws IOException {
		//NioSocketAcceptor是IOService实现类
		NioSocketConnector client=new NioSocketConnector();
		//绑定过滤器 一般如果传送的是文本 就是讲字节数组转换文本的过滤器  必须提供字符集  codec这样的名字可以随便写
		client.getFilterChain().addLast("codec", new ProtocolCodecFilter(
					new TextLineCodecFactory(Charset.forName("UTF-8"))
				));
		//添加日志打印过滤器
		client.getFilterChain().addLast("logger",  new LoggingFilter());
		//每次获取一个连接就会产生一个session session可以设置一些参数 比如数据缓冲区大小 等待时间间隔
		client.getSessionConfig().setReadBufferSize( 2048 );
		client.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10 );
		//设置回调函数 用于处理消息
		client.setHandler(new IoHandler() {
			//session被打开事件  就是连接上 发送消息给session
			public void sessionOpened(IoSession session) throws Exception {
				session.write("hello server");
				
			}
			//如果没有消息读写 线程休眠一段时间 处理休眠时触发
			public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
				System.out.println("session空闲");
			}
			
			public void sessionCreated(IoSession session) throws Exception {
				System.out.println("开始创建session");
			}
			
			public void sessionClosed(IoSession session) throws Exception {
				System.out.println("session被关闭");
			}
			
			public void messageSent(IoSession session, Object message) throws Exception {
				System.out.println("有消息被发送出去");
			}
			
			public void messageReceived(IoSession session, Object message) throws Exception {
				System.out.println("接受到一个消息 :"+message);
				
			}
			
			public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
				cause.printStackTrace();
				
			}
		});
		ConnectFuture future = client.connect(new InetSocketAddress("localhost",8899));
		future.awaitUninterruptibly();
        IoSession session = future.getSession();
        
	}

}

三。过滤器简介

过滤器相关内容参考官网 http://mina.apache.org/mina-project/userguide/ch5-filters/ch5-filters.html

过滤器表示在handler进行操作之前过滤拦截 必须实现

IoFilterAdapter 

iofilter是一个非常重要的Mina核心结构。它过滤ioservice和iohandler之间的所有I/o事件和请求。如果您有web应用程序编程的经验,您可以放心地认为它是servlet过滤器的近亲。许多开箱即用过滤器是通过使用Out-the-box过滤器 例如:

  • 所有事件和请求的loggingfilter日志
  • ProtocolCodecFilter 编解码器可以装ByteBuffer和应用层对象消息
  • 压缩过滤器压缩所有数据。
  • sslfilter添加了SSL-TLS-STARTTLS支持。等等! 

1.实现简单的日志过滤器

package cn.et;

import org.apache.mina.core.filterchain.IoFilterAdapter;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.core.write.WriteRequest;

public class MyLogFilter extends IoFilterAdapter {

	@Override
	public void sessionCreated(NextFilter nextFilter, IoSession session) throws Exception {
		//过滤器必须调用下一个过滤器的相同方法   super方法中就是 你也可以自己写
		//nextFilter.sessionCreated(session);
		System.out.println("自定义过滤器:session被创建"+session.getId());
		super.sessionCreated(nextFilter, session);
	}

	@Override
	public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception {
		System.out.println("自定义过滤器:session被关闭"+session.getId());
		super.sessionClosed(nextFilter, session);
	}

	@Override
	public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {
		System.out.println("自定义过滤器:接受到消息"+message);
		super.messageReceived(nextFilter, session, message);
	}

	@Override
	public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
		System.out.println("自定义过滤器:发送消息"+ writeRequest.getMessage());
		super.messageSent(nextFilter, session, writeRequest);
	}

 	
}

需要将过滤器的实例添加到IOService 的过滤器链中

server.getFilterChain().addLast("mylogger", new MyLogFilter());

运行 实例检测发现session打开关闭  接发消息都会输出对应消息


2.通过编解码器实现自定义协议

编解码原理参考下图 

数据写入时 自定义handler写入对象类型 编码器将对象转换成IOBuffer对象  

数据读取时 将网络传入的IOBuffer对象解码成对象传入handler 供用户操作

实现编解码器实现类图:


编码实现 需要实现编解码工厂 编码器和解码器 (考虑只读到了部分包的问题  半包【留到下一次直到读完】 粘包【两个不同的包 互相有一半数据】)

这里实现一个发送文件和文件消息的编解码器(解码 文件接收暂时未实现 有时间再补上)

编码器

package cn.et.codec;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolEncoderAdapter;
import org.apache.mina.filter.codec.ProtocolEncoderOutput;
/**
 * 实现编码器
 * @author jiaozi
 *
 */
public class ImMsgEncode extends ProtocolEncoderAdapter {
	//使用utf-8字符集
	public static CharsetEncoder chaset=Charset.forName("UTF-8").newEncoder();
	//文件写入单位
	public static final int BLOCK_SIZE=1024*1024;
	/**
	 * 用于将handler传入的对象转换成字节写入底层网络
	 */
	public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
		ImMsg imMsg=(ImMsg)message;
		//其中 int类型的内容长度和 内容类型占了 5个字节  加上内容
		IoBuffer buffer=IoBuffer.allocate(imMsg.getContentLength()+5).setAutoExpand(true);
		//IOBuffer支持读写各种数据类型的
		buffer.put(imMsg.getMsgType());
		//如果是文本消息 直接全部发送
	    if(1==imMsg.getMsgType()) {
	    	//获取字符串的字节长度
	    	buffer.putInt(imMsg.getContent().getBytes(chaset.charset()).length);
	    	//写入字符串
	    	buffer.putString(imMsg.getContent(), chaset);
	    	buffer.flip();
	    	out.write(buffer);
	    	out.flush();
	    }
		
	}

}
解码器代码:
package cn.et.codec;

import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.AttributeKey;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolDecoderAdapter;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
/**
 * 解码 将底层网路字节数组转换成java对象 
 * decode方法可能被调用多次 可能有半包 第一次发送过来一部分 第二次发送过来一部分 不管几次 都是同一个session
 * 可以将数据缓存成一个对象 供下次使用 处理完
 * 0 5 abcde 0 3 123
 * 
 * 
 * 
 * @author jiaozi
 *
 */
public class ImMsgDecode extends ProtocolDecoderAdapter {
	//使用utf-8字符集
	public static CharsetDecoder chaset=Charset.forName("UTF-8").newDecoder();
	//文件写入单位
	public static final int BLOCK_SIZE=1024*1024;
	private int bufferLength = 128;
	//存储在session中的key
	private final AttributeKey CONTEXT = new AttributeKey(getClass(), "context");
	public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
		Context ctx = getContext(session);
		ctx.append(in);
		ctx.getBuffer().flip();
		//ctx一直处于写模式 remaining表示剩余多少空间可写  容量-剩余 就是使用的也就是长度
		int size=ctx.getBuffer().remaining();
		//表示大于5个字节就可以读取头和体了
		if(size>=5 ) {
			//没有读取过头数据  可以读取head数据
			if(ctx.getImMsg().getContentLength()==-1) {
				//读取类型 
				byte type=ctx.getBuffer().get();
				ctx.getImMsg().setMsgType(type);
				//读取内容
				int contentLength=ctx.getBuffer().getInt();
				ctx.getImMsg().setContentLength(contentLength);
			}
			//判断读取到了头 
			if(ctx.getImMsg().getContentLength()!=-1) {
				//读取完成了 写入到上层应用handler 调用out.write
				if(size>=ctx.getImMsg().getContentLength()+5) {
					String content=new String(ctx.getBuffer().array(),5,ctx.getImMsg().getContentLength(),chaset.charset());
					ctx.getImMsg().setContent(content);
					out.write(ctx.getImMsg());
					ctx.getBuffer().clear();
					//如果存在遗留字节 就是粘包 就是下一次的数据 清空Context留给下一次
					int leaveLength=size-(ctx.getImMsg().getContentLength()+5);
					if(leaveLength>0) {
						session.removeAttribute(CONTEXT);
						byte[] b=new byte[leaveLength];
						ctx.getBuffer().get(b, ctx.getImMsg().getContentLength()+5, leaveLength);
						ctx.getBuffer().clear();
						ctx.getBuffer().put(b);
					}
				}
				
			}
			
		}
		//调用过flip 切换回写模式
		ctx.writeMode();
	}
	public Context getContext(IoSession session) {
		Context ctx = (Context) session.getAttribute(CONTEXT);
		if(ctx==null) {
			 ctx=new Context(bufferLength);
			 session.setAttribute(CONTEXT, ctx);
		}
		return ctx;
	}
	//用于保存当前这次解析的上下文对象
	class Context{		
		private ImMsg imMsg;
		private IoBuffer buffer;
		
		public IoBuffer getBuffer() {
			return buffer;
		}
		//调用了flip后切换回写入模式
		public void writeMode() {
			buffer.position(buffer.limit());
			buffer.limit(buffer.capacity());
		}
		private Context(int bufferLength) {
			buffer = IoBuffer.allocate(bufferLength).setAutoExpand(true);
			imMsg=new ImMsg();
        }
		public void append(IoBuffer in) {
			buffer.put(in);
		}
		public ImMsg getImMsg() {
			return imMsg;
		}
		
	}

}
编解码工厂
package cn.et.codec;

import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderAdapter;
import org.apache.mina.filter.codec.ProtocolEncoder;
import org.apache.mina.filter.codec.ProtocolEncoderAdapter;
/**
 * 编解码工厂
 * @author jiaozi
 *
 */
public class ImMsgProtocolCodecFactory implements ProtocolCodecFactory {
	private ProtocolDecoderAdapter decoder;
	private ProtocolEncoderAdapter encoder;
	
	

	public ImMsgProtocolCodecFactory(ProtocolDecoderAdapter decoder, ProtocolEncoderAdapter encoder) {
		this.decoder = decoder;
		this.encoder = encoder;
	}

	public ProtocolEncoder getEncoder(IoSession session) throws Exception {
		// TODO Auto-generated method stub
		return this.encoder;
	}

	public ProtocolDecoder getDecoder(IoSession session) throws Exception {
		// TODO Auto-generated method stub
		return this.decoder;
	}

}

ImMsg协议包实体类

package cn.et.codec;

import org.apache.mina.core.buffer.IoBuffer;
/**
 * 发送im消息封装包
 * @author jiaozi
 *
 */
public class ImMsg {
	//内容的长度 可以是普通字符串消息可以是个文件
	private int contentLength=-1;
	//消息类型 1 普通文本消息  2 文件
	private byte msgType;
	//文本内容
	private String content;
	//本地文件路径 或者保存的文件路径
	private String filePath;
	public int getContentLength() {
		return contentLength;
	}
	public void setContentLength(int contentLength) {
		this.contentLength = contentLength;
	}
	public byte getMsgType() {
		return msgType;
	}
	public void setMsgType(byte msgType) {
		this.msgType = msgType;
	}
	public String getContent() {
		return content;
	}
	public void setContent(String content) {
		this.content = content;
	}
	public String getFilePath() {
		return filePath;
	}
	public void setFilePath(String filePath) {
		this.filePath = filePath;
	}
	
}
mina服务端代码
package cn.et.codec;

import java.io.IOException;
import java.net.InetSocketAddress;

import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;

public class MyServer {
	private static final int PORT = 8899;

	public static void main(String[] args) throws IOException {
		NioSocketAcceptor server=new NioSocketAcceptor();
		//绑定过滤器 一般如果传送的是文本 就是讲字节数组转换文本的过滤器  必须提供字符集 codec这样的名字可以随便写
//		server.getFilterChain().addLast("codec", new ProtocolCodecFilter(
//					new TextLineCodecFactory(Charset.forName("UTF-8"))
//				));
		server.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ImMsgProtocolCodecFactory(new ImMsgDecode(),
				new ImMsgEncode())));
		//添加日志打印过滤器
		server.getFilterChain().addLast("logger",  new LoggingFilter());
		//每次获取一个连接就会产生一个session session可以设置一些参数 比如数据缓冲区大小 等待时间间隔
		server.getSessionConfig().setReadBufferSize( 2048 );
		server.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10 );
		server.setHandler(new IoHandlerAdapter() {
			@Override
			public void messageReceived(IoSession session, Object message) throws Exception {
				ImMsg imMsg=(ImMsg)message;
				if(imMsg.getMsgType()==1)
					System.out.println("接收到文字内容: "+imMsg.getContent());
				else if(imMsg.getMsgType()==2)
					System.out.println("接受到文件 保存路径:"+imMsg.getFilePath());
				else
					System.out.println("接受错误");
			}
		});
		server.bind(new InetSocketAddress(PORT));
	}
}
mina客户端代码
package cn.et.codec;

import java.net.InetSocketAddress;
import java.nio.charset.Charset;

import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketConnector;

public class MyClient {
	//端口
	private static final int PORT = 8899;
	public static void main(String[] args) {
		//NioSocketAcceptor是IOService实现类
		NioSocketConnector client=new NioSocketConnector();
		//绑定过滤器 一般如果传送的是文本 就是讲字节数组转换文本的过滤器  必须提供字符集
		client.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ImMsgProtocolCodecFactory(new ImMsgDecode(),
				new ImMsgEncode())));
		//添加日志打印过滤器
		client.getFilterChain().addLast("logger",  new LoggingFilter());
		//每次获取一个连接就会产生一个session session可以设置一些参数 比如数据缓冲区大小 等待时间间隔
		client.getSessionConfig().setReadBufferSize( 2048 );
		client.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10 );
		client.setHandler(new IoHandlerAdapter() {});
		ConnectFuture future = client.connect(new InetSocketAddress("localhost",8899));
		future.awaitUninterruptibly();
        IoSession session = future.getSession();
        ImMsg imMsg=new ImMsg();
        imMsg.setContent("hello server");
        imMsg.setMsgType((byte)1);
        session.write(imMsg);
        client.dispose();
	}
}
运行minaserver  运行minaclient 服务端可以接收到客户端消息

四。IOBuffer简介

IoBuffer是MINA内部使用的一个byte buffer,MINA并没有直接使用NIO 的ByteBuffer。不过IoBuffer 是对 ByteBuffer 的一个封装。IoBuffer 中的很多方法都是对 ByteBuffer 的直接继承。只是对 ByteBuffer 添加了一些扩展了更加实用的方法 使用IOBuffer替代ByteBuffer的原因是:
  .它没有提供有用的getter和putter,例如Fill、get/putstring和get/putasciiint()。
  .因为固定容量 无法写入变长的数据

于IoBuffer是对Nio的ByteBuffer 的封装,所以基本概念还是相同的,下面简单介绍一下:
1、capacity:该属性描述这个缓冲区最多能缓冲多少个元素,也是Buffer最大存储元素数,这个值是在创建Buffer的时候指定的,且不能修改。
2、Limit:在从Buffer中向Channel中写数据时,limit变量指示了还剩多少数据可以读取,在从Channel中读取数据到Buffer中时,limit变量指示了还剩多少空间可供存放数据。position正常情况下小于或者等于limit。
3、Position:Buffer实际上也就是个array。当你从Channel中读数据时,你把从Channel中读出来的数据放进底层array,position变量用来跟踪截止目前为止已经写了多少数据。更精确的讲,它指示如果下次写Buffer时数据应该进入array的哪个位置。因此如果已经从Channel中读出了3个字节,Buffer的position会被置为3,指向array中第四个位置。

i、初始状态下:假设此时position为0,limit和capacity都被设为9;


ii、从Channel中读入4个字节数据到Buffer,这时position指向4(第5个):


iii、在做写操作之前,我们必须调用一次flip()方法,这个方法做了两件重要的事情: 
1. 将limit设置到当前的position处。 
2. 设置position为0。


iiii、执行写操作后;


iv、执行clear后,position设为0,limit设为capition,mark则丢弃;






原文地址:https://www.cnblogs.com/liaomin416100569/p/9331141.html