Netty5-应答服务器

需求:

  服务端:接收客户端请求,返回当前系统时间

  客户端:发起时间请求

服务端

package org.zln.netty.five.timer;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 时间服务器服务端
 * Created by sherry on 16/11/5.
 */
public class TimerServer {
    /**
     * 服务端绑定端口号
     */
    private int PORT;

    public TimerServer(int PORT) {
        this.PORT = PORT;
    }

    /**
     * 日志
     */
    private static Logger logger = LoggerFactory.getLogger(TimerServer.class);

    public void bind() {
        /*
        NioEventLoopGroup是线程池组
        包含了一组NIO线程,专门用于网络事件的处理
        bossGroup:服务端,接收客户端连接
        workGroup:进行SocketChannel的网络读写
         */
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            /*
            ServerBootstrap:用于启动NIO服务的辅助类,目的是降低服务端的开发复杂度
             */
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)//配置TCP参数,能够设置很多,这里就只设置了backlog=1024,
                    .childHandler(new TimerServerInitializer());//绑定I/O事件处理类
            logger.debug("绑定端口号:" + PORT + ",等待同步成功");
            /*
            bind:绑定端口
            sync:同步阻塞方法,等待绑定完成,完成后返回 ChannelFuture ,主要用于通知回调
             */
            ChannelFuture channelFuture = serverBootstrap.bind(PORT).sync();
            logger.debug("等待服务端监听窗口关闭");
            /*
             closeFuture().sync():为了阻塞,服务端链路关闭后才退出.也是一个同步阻塞方法
             */
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            logger.error(e.getMessage(), e);
        } finally {
            logger.debug("优雅退出,释放线程池资源");
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
}
package org.zln.netty.five.timer;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;

/**
 * Created by sherry on 16/11/5.
 */
public class TimerServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        socketChannel.pipeline()
                .addLast(new TimerServerHandler());
    }
}
package org.zln.netty.five.timer;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.UnsupportedEncodingException;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * Handler主要用于对网络事件进行读写操作,是真正的业务类
 * 通常只需要关注 channelRead 和 exceptionCaught 方法
 * Created by sherry on 16/11/5.
 */
public class TimerServerHandler extends ChannelHandlerAdapter {

    /**
     * 日志
     */
    private Logger logger = LoggerFactory.getLogger(TimerServerHandler.class);

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //ByteBuf,类似于NIO中的ByteBuffer,但是更强大
        ByteBuf reqBuf = (ByteBuf) msg;
        //获取请求字符串
        String req = getReq(reqBuf);
        logger.debug("From:"+ctx.channel().remoteAddress());
        logger.debug("服务端收到:" + req);

        if ("GET TIME".equals(req)){
            String timeNow = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss SSS").format(new Date());
            String resStr = "当前时间:" + timeNow;

            //获取发送给客户端的数据
            ByteBuf resBuf = getRes(resStr);

            logger.debug("服务端应答数据:
" + resStr);
            ctx.write(resBuf);
        }else {
            //丢弃
            logger.debug("丢弃");
            ReferenceCountUtil.release(msg);
        }


    }

    /**
     * 获取发送给客户端的数据
     *
     * @param resStr
     * @return
     */
    private ByteBuf getRes(String resStr) throws UnsupportedEncodingException {
        byte[] req = resStr.getBytes("UTF-8");
        ByteBuf pingMessage = Unpooled.buffer(req.length);
        //将字节数组信息写入到ByteBuf
        pingMessage.writeBytes(req);

        return pingMessage;
    }

    /**
     * 获取请求字符串
     *
     * @param buf
     * @return
     */
    private String getReq(ByteBuf buf) {
        byte[] con = new byte[buf.readableBytes()];
        //将ByteByf信息写出到字节数组
        buf.readBytes(con);
        try {
            return new String(con, "UTF-8");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
            return null;
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //将消息发送队列中的消息写入到SocketChannel中发送给对方
        logger.debug("channelReadComplete");
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //发生异常时,关闭 ChannelHandlerContext,释放ChannelHandlerContext 相关的句柄等资源
        logger.error("exceptionCaught");
        ctx.close();
    }
}

客户端

package org.zln.netty.five.timer;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 时间服务器客户端
 * Created by sherry on 16/11/5.
 */
public class TimerClient {
    /**
     * 日志
     */
    private Logger logger = LoggerFactory.getLogger(TimerServer.class);

    private String HOST;
    private int PORT;

    public TimerClient(String HOST, int PORT) {
        this.HOST = HOST;
        this.PORT = PORT;
    }

    public void connect(){
        //配置客户端NIO线程组
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY,true)
                    .handler(new TimerClientInitializer());
            //发起异步连接操作
            logger.debug("发起异步连接操作 - start");
            ChannelFuture channelFuture = bootstrap.connect(HOST,PORT).sync();
            logger.debug("发起异步连接操作 - end");
            //等待客户端链路关闭
            logger.debug("等待客户端链路关闭 - start");
            channelFuture.channel().closeFuture().sync();
            logger.debug("等待客户端链路关闭 - end");
        } catch (InterruptedException e) {
            logger.error(e.getMessage(),e);
        }finally {
            //优雅的关闭
            eventLoopGroup.shutdownGracefully();
        }
    }
}
package org.zln.netty.five.timer;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;

/**
 * Created by sherry on 16/11/5.
 */
public class TimerClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new TimerClientHandler());
    }
}
package org.zln.netty.five.timer;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.UnsupportedEncodingException;

/**
 * Created by sherry on 16/11/5.
 */
public class TimerClientHandler extends ChannelHandlerAdapter {

    /**
     * 日志
     */
    private Logger logger = LoggerFactory.getLogger(TimerClientHandler.class);

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        logger.debug("客户端连接上了服务端");

        //发送请求
        ByteBuf reqBuf = getReq("GET TIME");

        ctx.writeAndFlush(reqBuf);
    }

    /**
     * 将字符串包装成ByteBuf
     * @param s
     * @return
     */
    private ByteBuf getReq(String s) throws UnsupportedEncodingException {
        byte[] data = s.getBytes("UTF-8");
        ByteBuf reqBuf = Unpooled.buffer(data.length);
        reqBuf.writeBytes(data);
        return reqBuf;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        String resStr = getRes(byteBuf);
        logger.debug("客户端收到:"+resStr);
    }

    private String getRes(ByteBuf buf) {
        byte[] con = new byte[buf.readableBytes()];
        buf.readBytes(con);
        try {
            return new String(con, "UTF-8");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
            return null;
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

关于ByteBuf的读写,面向的都是ByteBuf,所以对于 read,从ByteBuf中读出来,将数据给字节数组,对于写,将数据从字节数组写入到ByteBuf中

将字符串转化为ByteBuf,有更简单的方法

ByteBuf byteBuf = Unpooled.copiedBuffer(resStr.getBytes("UTF-8"));
原文地址:https://www.cnblogs.com/sherrykid/p/6034337.html