基于netty的异步http请求

package com.pt.utils;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;

import java.net.URI;
import java.util.Map;

/**
 * @author panteng
 * @description
 * @date 17-3-20.
 */
public class NonBlockHttpClient {
    public static EventLoopGroup workerGroup = new NioEventLoopGroup(1);
    public static Bootstrap b = new Bootstrap();
    public static final EventExecutorGroup executor = new DefaultEventExecutorGroup(2);
    static {
        b.group(workerGroup);
        b.channel(NioSocketChannel.class);
        b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
    }
    public static Object lock = new Object();
    /**
     * 异步GET请求
     * 
     * @param url
     * @param head
     * @param handler
     * @return
     */
    public static Boolean get(String url, Map<String, String> head, final HttpHandler handler) {
        try {
            URI uri = new URI(url);
            String domain = uri.getHost();
            Integer port = uri.getPort() < 0 ? 80 : uri.getPort();
            DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.toASCIIString());
            if (head == null) {
                request.headers().add("Host", domain);
                request.headers().add("User-Agent", "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:44.0) Gecko/20100101 Firefox/44.0");
                request.headers().add("Accept", "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8");
                request.headers().add("Accept-Language", "zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3");
                request.headers().add("Connection", "keep-alive");
                request.headers().add("Cache-Control", "max-age=0");
            } else {
                for (Map.Entry entry : head.entrySet()) {
                    request.headers().add((String) entry.getKey(), entry.getValue());
                }
            }
            ChannelInitializer channelInitializer = new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    // 客户端接收到的是httpResponse响应,所以要使用HttpResponseDecoder进行解码
                    socketChannel.pipeline().addLast(new HttpResponseDecoder());
                    // 客户端发送的是httprequest,所以要使用HttpRequestEncoder进行编码
                    socketChannel.pipeline().addLast(new HttpRequestEncoder());
                    socketChannel.pipeline().addLast(executor, new GeneralHandler(handler));
                }
            };
            ChannelFuture f;
            synchronized (lock) {
                b.handler(channelInitializer);
                f = b.connect(domain, port).sync();
            }
            f.channel().writeAndFlush(request);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }
    public static void close() {
        try {
            executor.shutdownGracefully();
            workerGroup.shutdownGracefully();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
核心类1
package com.pt.utils;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpResponse;

import java.util.HashMap;
import java.util.Map;

/**
 * @author panteng
 * @description
 * @date 17-3-20.
 */
public class GeneralHandler extends ChannelInboundHandlerAdapter {
    com.pt.utils.HttpHandler httpHandler;
    Integer respLength = Integer.MAX_VALUE; // 响应报文长度
    Map<String, String> head = new HashMap<String, String>();
    String respContent = "";

    public GeneralHandler(com.pt.utils.HttpHandler handler) {
        this.httpHandler = handler;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof HttpResponse) {
            HttpResponse response = (HttpResponse) msg;
            for (Map.Entry entry : response.headers().entries()) {
                head.put((String) entry.getKey(), (String) entry.getValue());
            }
            if (response.headers().get("Content-Length") != null) {
                respLength = Integer.parseInt(response.headers().get("Content-Length"));
            }
        }

        if (msg instanceof HttpContent) {
            HttpContent content = (HttpContent) msg;
            ByteBuf buf = content.content();
            respContent += buf.toString(httpHandler.getCharset());
            ((HttpContent) msg).release();
            if (respContent.getBytes().length >= respLength || !buf.isReadable()) {
                ctx.channel().close();
                httpHandler.handler(head, respContent);
            }
        }
    }
}
核心类2
package com.pt.utils;

import java.nio.charset.Charset;
import java.util.Map;

/**
 * @author panteng
 * @description http响应的异步回调
 * @date 17-3-20.
 */
public interface HttpHandler {
    public void handler(Map<String, String> headMap, String body);
    public Charset getCharset();
}
用户自定义处理接口

使用用例:

package com.pt.utils.test;

import com.pt.utils.HttpHandler;

import java.nio.charset.Charset;
import java.util.Map;

/**
 * @author panteng
 * @description
 * @date 17-3-20.
 */
public class MyHandler implements HttpHandler {
    boolean isFinish = false;
    String id;

    public MyHandler(String id) {
        this.id = id;
    }

    public void handler(Map<String, String> headMap, String body) {
        try {
            Thread.sleep(3000);
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println(id + "自己处理:" + body);
        this.setIsFinish(true);
    }

    public Charset getCharset() {
        return Charset.forName("UTF-8");
    }

    public boolean isFinish() {
        return isFinish;
    }

    public void setIsFinish(boolean isFinish) {
        this.isFinish = isFinish;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }
}
用户定义处理的实现
package com.pt.utils.test;

import com.pt.utils.NonBlockHttpClient;

/**
 * @author panteng
 * @description
 * @date 17-3-22.
 */
public class NonBlockHttpClientTest {
    public static void main(String[] arges) {
        MyHandler myHandler = new MyHandler("A");
        MyHandler myHandler1 = new MyHandler("B");
        MyHandler myHandler2 = new MyHandler("C");
        MyHandler myHandler3 = new MyHandler("D");
        NonBlockHttpClient
                .get(url1,
                        null, myHandler);
        NonBlockHttpClient
                .get(url2,
                        null, myHandler1);
        NonBlockHttpClient
                .get(url3,
                        null, myHandler2);
        NonBlockHttpClient
                .get(url4,
                        null, myHandler3);
        System.out.println("做别的事情");
        try {
            Thread.sleep(2000);
        } catch (Exception e) {
            e.printStackTrace();
        }
        while (!(myHandler.isFinish() && myHandler1.isFinish() && myHandler2.isFinish() && myHandler3.isFinish())) {
            try {
                Thread.sleep(10);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        NonBlockHttpClient.close();
        System.out.println("退出主函数... ...");
    }
}
原文地址:https://www.cnblogs.com/tengpan-cn/p/6881969.html