Netty客户端连接池ChannelPool应用 【支持https请求】

Netty从4.0版本就提供了连接池ChannelPool,可以解决与多个服务端交互以及与单个服务端建立连接池的问题

1、实现ChannelPoolHandler

首先要写一个类实现ChannelPoolHandler,主要是channelCreated,在channelCreated中添加channelhandler等

package com.bokeyuan.http.pool_client;

import io.netty.channel.Channel;
import io.netty.channel.pool.ChannelPoolHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpContentDecompressor;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.handler.timeout.IdleStateHandler;


/**
 * @author: void
 * @date: 2021-09-10 17:52
 * @description: 实现ChannelPoolHandler
 * @version: 1.0
 */
public class NettyChannelPoolHandler implements ChannelPoolHandler {

    private boolean isSSL;
    public NettyChannelPoolHandler(boolean isSSL) {
        this.isSSL = isSSL;
    }
    @Override
    public void channelReleased(Channel channel) throws Exception {
        System.out.println("channelReleased. Channel ID:"+channel.id());
    }
    @Override
    public void channelAcquired(Channel channel) throws Exception {
        System.out.println("channelAcquired. Channel ID:"+channel.id());
    }
    @Override
    public void channelCreated(Channel channel) throws Exception {
        System.out.println("channelCreated. Channel ID:"+channel.id());
        SocketChannel socketChannel = (SocketChannel) channel;
        socketChannel.config().setKeepAlive(true);
        socketChannel.config().setTcpNoDelay(true);
        if (isSSL) { //配置Https通信
            SslContext context = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
            channel.pipeline().addLast(context.newHandler(channel.alloc()));
        }
        socketChannel.pipeline()
        //包含编码器和解码器
       .addLast(new HttpClientCodec())
        //聚合
        .addLast(new HttpObjectAggregator(1024 * 10 * 1024))
        //解压
       .addLast(new HttpContentDecompressor())
        //添加ChannelHandler
       .addLast(new NettyClientHandler());
        channel.pipeline().addFirst(new IdleStateHandler(5, 5, 10));


    }
}

2、客户端Handler

实现ChannelInboundHandlerAdapter类,覆写channelRead()方法打印服务端响应的内容

package com.bokeyuan.http.pool_client;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.util.CharsetUtil;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author: void
 * @date: 2021-09-13 14:07
 * @description:
 * @version: 1.0
 */
public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    static AtomicInteger count = new AtomicInteger(1);

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        FullHttpResponse response = (FullHttpResponse) msg;
        ByteBuf content = response.content();
        HttpHeaders headers = response.headers();
        System.out.println(count.getAndIncrement()+": content:"+content.toString(CharsetUtil.UTF_8));
        //System.out.println("headers:"+headers.get("content-type").toString());

    }
}

3、客户端从连接池获取连接发起请求

客户端实现连接池其中ChannelPoolMap可用于与多个服务端建立链接,本例中采用FixedChannelPool建立与单个服务端最大连接数为2的连接池。在main函数里通过向连接池获取channel发送了十条消息。

package com.bokeyuan.http.pool_client;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.pool.AbstractChannelPoolMap;
import io.netty.channel.pool.ChannelPoolMap;
import io.netty.channel.pool.FixedChannelPool;
import io.netty.channel.pool.SimpleChannelPool;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import lombok.extern.slf4j.Slf4j;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

/**
 * @author: void
 * @date: 2021-09-13 14:17
 * @description:
 * @version: 1.0
 */
@Slf4j
public class NettyPoolClient {

    final EventLoopGroup group = new NioEventLoopGroup();
    final Bootstrap bootstrap = new Bootstrap();
    ChannelPoolMap<String, SimpleChannelPool> poolMap;

    public void build() throws Exception{
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                //连接超时时长
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
                .option(ChannelOption.TCP_NODELAY,true)
                .option(ChannelOption.SO_KEEPALIVE,true)
                .handler(new LoggingHandler(LogLevel.ERROR));


        poolMap = new AbstractChannelPoolMap<String, SimpleChannelPool>() {
            @Override
            protected SimpleChannelPool newPool(String key) {
                InetSocketAddress inetAddress = null;
                boolean isSSL = key.contains("https");
                try {
                    URI uri = new URI(key);
                    if (Objects.isNull(uri)) {
                        return null;
                    }
                    URL url = new URL(key);
                    String host = url.getHost();
                    InetAddress address = InetAddress.getByName(host);
                    if (!host.equalsIgnoreCase(address.getHostAddress())) {
                        log.warn("域名连接");
                        inetAddress = new InetSocketAddress(address, isSSL ? 443 : 80);
                    } else {
                        log.warn("ip+port连接");
                        int port = url.getPort();
                        inetAddress = InetSocketAddress.createUnresolved(host, port);
                    }
                } catch (Throwable e) {
                    log.error("请求地址不合法:" + e);
                }
                if (Objects.nonNull(inetAddress)){
                    // return new FixedChannelPool(bootstrap.remoteAddress(inetAddress), handler, config.getMaxConnections());
                    return new FixedChannelPool(bootstrap.remoteAddress(inetAddress),new NettyChannelPoolHandler(isSSL),2);

                }
                return null;
            }
        };
    }

    public static void main(String[] args) {
        testJpushHttp();
    }

   public static void testJpushHttp(){ try { NettyPoolClient client = new NettyPoolClient(); client.build(); /**请求报文*/ String msg = client.getMessage(); //报文头 Map<String,String> header = new HashMap<>(); header.put("Authorization","Basic NIU3YzE2ZTgxOWU0YjY0MmVjNjg3NWI3OjllOTU2YjdkZmZhNDBhYWU1ZTg4YzVmOQ=="); //请求接口url地址 String url = "https://bjapi.push.jiguang.cn/v3/push"; for(int i=0; i<10; i++) { HttpRequest fullHttpRequest = buildRequest(msg, url, true, header); SimpleChannelPool pool = client.poolMap.get(url); Future<Channel> f = pool.acquire(); f.addListener((FutureListener<Channel>) f1 -> { if (f1.isSuccess()) { Channel ch = f1.getNow(); ChannelFuture channelFuture = ch.writeAndFlush(fullHttpRequest); //刷出数据的同时进行监听刷出的结果 channelFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { //这里中刷出成功,并不代表客户接收成功,刷出数据成功默认代表已完成发送 //System.out.println("http netty client刷出数据结果为:" + future.isSuccess()); } }); pool.release(ch); } }); } } catch (Exception e) { e.printStackTrace(); } } /** * 获取请求报文 * @return */ private String getMessage(){ String msg = "{ " + ""notification": " + " { " + " "android":{ " + " "alert":"alert-test", " + " "title":"title-test", " + " "style":1, " + " "alert_type":1, " + " "big_text":"big text content" " + " " + " }, " + " "winphone":{"alert":"alert-test","title":"title-test"}, " + " "ios":{"alert":"通知"} " + " " + " }, " + ""audience":{"registration_id":["180fe1da9e6b5af51a0"]}, " + ""options":{"apns_production":false,"time_to_live":86400}, " + ""platform":"all" " + "} "; return msg; } public static HttpRequest buildRequest(String msg, String url, boolean isKeepAlive) throws Exception{ return buildRequest(msg,url,isKeepAlive,null); } public static HttpRequest buildRequest(String msg, String url, boolean isKeepAlive, Map<String,String> headers) throws Exception { URL netUrl = new URL(url); URI uri = new URI(netUrl.getPath()); //构建http请求 DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, uri.toASCIIString(), Unpooled.wrappedBuffer(msg.getBytes(StandardCharsets.UTF_8))); //设置请求的host(这里可以是ip,也可以是域名) request.headers().set(HttpHeaderNames.HOST, netUrl.getHost()); //其他头部信息 if (headers != null && !headers.isEmpty()) { for (Map.Entry<String, String> entry : headers.entrySet()) { request.headers().set(entry.getKey(), entry.getValue()); } } //设置返回Json request.headers().set(HttpHeaderNames.CONTENT_TYPE ,"text/json;charset=UTF-8"); //发送的长度 request.headers().set(HttpHeaderNames.CONTENT_LENGTH, request.content().readableBytes()); //是否是长连接 if (isKeepAlive){ request.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); } //添加其他头部信息 // request.headers().set(HttpHeaderNames.ACCEPT, "*/*"); // request.headers().set(HttpHeaderNames.CACHE_CONTROL, "no-cache"); // request.headers().set(HttpHeaderNames.ACCEPT_ENCODING, "gzip, deflate, br"); // request.headers().set(HttpHeaderNames.USER_AGENT, "PostmanRuntime/7.28.4"); return request; } }

启动服务端,for循环里向服务端发送了10条消息

服务端输出如下,可以看到一共与服务端建立了2个channel,剩下的都是从连接池里请求连接和释放连接。

channelCreated. Channel ID:a648b534
channelCreated. Channel ID:ba8b236d
channelReleased. Channel ID:a648b534
channelReleased. Channel ID:ba8b236d
channelAcquired. Channel ID:ba8b236d
channelAcquired. Channel ID:a648b534
channelReleased. Channel ID:a648b534
channelReleased. Channel ID:ba8b236d
channelAcquired. Channel ID:ba8b236d
channelAcquired. Channel ID:a648b534
channelReleased. Channel ID:ba8b236d
channelAcquired. Channel ID:ba8b236d
channelReleased. Channel ID:ba8b236d
channelAcquired. Channel ID:ba8b236d
channelReleased. Channel ID:a648b534
channelAcquired. Channel ID:a648b534
channelReleased. Channel ID:ba8b236d
channelReleased. Channel ID:a648b534
channelAcquired. Channel ID:ba8b236d
channelReleased. Channel ID:ba8b236d
1: content:{"sendno":"0","msg_id":"67554259084023244"}
2: content:{"sendno":"0","msg_id":"29273659053621966"}
3: content:{"sendno":"0","msg_id":"2252059060236999"}
4: content:{"sendno":"0","msg_id":"47288059060501707"}
5: content:{"sendno":"0","msg_id":"67554259083292593"}
6: content:{"sendno":"0","msg_id":"54043459060484452"}
7: content:{"sendno":"0","msg_id":"2252059060228806"}
8: content:{"sendno":"0","msg_id":"2252059059572541"}
9: content:{"sendno":"0","msg_id":"47288059059705497"}
10: content:{"sendno":"0","msg_id":"47288059060428558"}
作者:小念
本文版权归作者和博客园共有,欢迎转载,但必须给出原文链接,并保留此段声明,否则保留追究法律责任的权利。
原文地址:https://www.cnblogs.com/kiko2014551511/p/15268804.html