Netty实现Http服务端

1、HttpServer   创建http 服务端

package com.bokeyuan.http.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;

/**
 * netty server
 * 2021/9/9
 */
@Slf4j
public class HttpServer {

    int port ;
    /**
     * 是否有配置SSL
     */
    private static final boolean SSL = System.getProperty("ssl") != null;


    public HttpServer(int port){
        this.port = port;
    }

    public void start() throws Exception{

        ServerBootstrap bootstrap = new ServerBootstrap();
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup work = new NioEventLoopGroup();

        bootstrap.group(boss,work)
                .localAddress(new InetSocketAddress(port))//暴露本地的端口供请求
                .option(ChannelOption.SO_BACKLOG, 1024) //存放已完成三次握手的请求的等待队列的最大长度,一定要设置option非childOption
                .childOption(ChannelOption.SO_REUSEADDR, true)//复用地址,快速复用端口
                .childOption(ChannelOption.TCP_NODELAY, true) //数据马上发送不需要延迟,一定要设置childOption非option
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) //使用池化的字节分配器
                .handler(new LoggingHandler(LogLevel.INFO))
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer< SocketChannel >(){
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        if (SSL) { //配置Https通信
                            SelfSignedCertificate ssc = new SelfSignedCertificate();
                            SslContext sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
                            socketChannel.pipeline().addLast(sslCtx.newHandler(socketChannel.alloc()));
                        }
                        //通道设置,顺序别乱写
                        socketChannel.pipeline().addFirst(new IdleStateHandler(5, 5, 10));//心跳模式配置,当一个通道多长时间未进行读写时断开,读超时时长为5s
//                        socketChannel.pipeline().addLast(new HttpRequestDecoder());//in,因为基于Http协议,使用http的解码器
//                        socketChannel.pipeline().addLast(new HttpResponseEncoder()); //out,因为基于Http协议,使用http的编码器
                        socketChannel.pipeline().addLast(new HttpServerCodec());// http 编解码
                        socketChannel.pipeline().addLast("httpAggregator",new HttpObjectAggregator(512*1024)); // http消息聚合器,设置512*1024为接收的最大contentlength
                        socketChannel.pipeline().addLast(new ChunkedWriteHandler());//in+out,Netty提供了ChunkedWriteHandler来解决大文件或者码流传输过程中可能发生的内存溢出问题

                        //socketChannel.pipeline().addLast(new HttpRequestHandler());
                        socketChannel.pipeline().addLast(new HttpReqHandler());
                    }
                });

        ChannelFuture f = bootstrap.bind().sync();
        //ChannelFuture f = bootstrap.bind(new InetSocketAddress(port)).sync();
        System.out.println(" server start up on port : " + port);
        f.channel().closeFuture().sync();

    }

    public static void main(String[] args) {
        HttpServer httpServer = new HttpServer(8899);
        try {
            httpServer.start();
            System.out.println("启动netty-http服务器成功.....");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

2HttpReqHandler  处理http请求

package com.bokeyuan.http.server;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;

/**
 * @author: void
 * @date: 2021-09-09 16:41
 * @description: http请求处理器
 * @version: 1.0
 */
public class HttpReqHandler extends ChannelInboundHandlerAdapter {

    //是否需要保持长连接
    private boolean isKeepAlive = true;

    //是否允许监听式刷出,为false时只刷出,为true时监听到刷出后再进行业务处理
    private boolean enableListenerFlush =true;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String respMsg = "";
        if(msg instanceof FullHttpRequest) {
            FullHttpRequest fullHttpRequest = (FullHttpRequest) msg;//转换成一个Http请求对象
            String uri = fullHttpRequest.uri();
            String method = fullHttpRequest.method().name();
            String bodyParam = getBodyParam(fullHttpRequest);
            String contentType = fullHttpRequest.headers().get(HttpHeaderNames.CONTENT_TYPE);
            System.out.println("请求uri:"+uri+",请求方式:"+method+",请求参数:"+bodyParam+",contentType:"+contentType);
            //业务处理产生响应内容
            respMsg = "响应内容";
        }else{
            respMsg = "bad request";
        }
        //正常数据刷出
        FullHttpResponse response = buildResponse(respMsg);
        //服务端设定长连接,防止客户端断开连接产生异常
        response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
        if (enableListenerFlush) {
            ctx.channel().writeAndFlush(response)
                    .addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            //数据刷出后的处理
                            if (future.isSuccess()) { //刷出成功
                                System.out.println("数据刷出成功");
                            }
                            if (!isKeepAlive) {//如果是短连接,刷出数据之后不管成功与否,server端都应该主动关闭这个通道
                                future.addListener(ChannelFutureListener.CLOSE);//监听链,刷出之后再监听
                            }
                        }
                    });

        } else {
            if (isKeepAlive) {
                ctx.channel().writeAndFlush(response);
            } else {
                ctx.channel().writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
            }
        }
    }


    private String getBodyParam(FullHttpRequest fullHttpRequest) {
        ByteBuf byteBuf = fullHttpRequest.content();
        if (byteBuf != null) {
            String body = byteBuf.toString(CharsetUtil.UTF_8);
            return body;
        }
        return null;
    }


    private FullHttpResponse buildResponse(String msg) {
        FullHttpResponse response = new DefaultFullHttpResponse(
                HttpVersion.HTTP_1_1,
                HttpResponseStatus.OK,
                Unpooled.wrappedBuffer(msg.getBytes()));
        response.headers().set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON + ";" + HttpHeaderValues.CHARSET + "=UTF-8");
        response.headers().set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, "*");
        response.headers().set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_HEADERS, "Origin, X-Requested-With, Content-Type, Accept,Accept-Charset");
        response.headers().set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_METHODS, "GET,POST,PUT,DELETE");
        response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
        return response;
    }
}
作者:小念
本文版权归作者和博客园共有,欢迎转载,但必须给出原文链接,并保留此段声明,否则保留追究法律责任的权利。
原文地址:https://www.cnblogs.com/kiko2014551511/p/15250757.html