使用netty实现im聊天

简书地址图文更清晰: https://www.jianshu.com/p/f455814f3c40

1、新建maven工程
2、引入maven依赖

<dependencies>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.49.Final</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.68</version>
        </dependency>

    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

3、定义消息体MsgBody

public class MsgBody {


    //发送人名称
    private String sendUserName;


    private String msg;


    public String getSendUserName() {
        return sendUserName;
    }

    public void setSendUserName(String sendUserName) {
        this.sendUserName = sendUserName;
    }


    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }
}

4、新建服务器端的NettyServer和ServerHandler

/**
 * netty的服务器
 * @Author: yeyongjian
 * @Date: 2020-05-03 23:34
 */
public class NettyServer {

    private int port;

    public NettyServer(int port) {
        this.port = port;
        bind();
    }

    private void bind() {
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();

        try {
            ServerBootstrap bootstrap = new ServerBootstrap();

            bootstrap.group(boss, worker);
            bootstrap.channel(NioServerSocketChannel.class);
            bootstrap.option(ChannelOption.SO_BACKLOG, 1024); // 连接数
            bootstrap.option(ChannelOption.TCP_NODELAY, true); // 不延迟,消息立即发送
            bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); // 长连接
            bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel){
                    ChannelPipeline p = socketChannel.pipeline();
                    ServerHandler serverHandler = new ServerHandler();
                    p.addLast(serverHandler);// 添加NettyServerHandler,用来处理Server端接收和处理消息的逻辑
                }
            });
            ChannelFuture channelFuture = bootstrap.bind(port).sync();
            if (channelFuture.isSuccess()) {
                System.err.println("启动Netty服务成功,端口号:" + this.port);
            }
            // 关闭连接
            channelFuture.channel().closeFuture().sync();

        } catch (Exception e) {
            System.err.println("启动Netty服务异常,异常信息:" + e.getMessage());
            e.printStackTrace();
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
    public static void main(String[] args) throws InterruptedException {
        new NettyServer(10086);
    }
}
import com.alibaba.fastjson.JSONObject;
import com.eujian.im.MsgBody;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.SimpleChannelInboundHandler;

import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.Map;

/**
 * 服务器的处理器
 * @Author: yeyongjian
 * @Date: 2020-05-03 23:35
 */
public class ServerHandler  extends SimpleChannelInboundHandler {

    //连接id与容器的关系
    private static Map<String, ChannelHandlerContext> map = new HashMap<>();


    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
        Channel channel = ctx.channel();
        ChannelId id = channel.id();
        map.put(id.toString(),ctx);
        ByteBuf buf = (ByteBuf) msg;
        String recieved = getMessage(buf);
        MsgBody msgBody = JSONObject.parseObject(recieved, MsgBody.class);
        String format = String.format("服务器接收到客户端消息,发送人:%s,发送信息:%s", msgBody.getSendUserName(), msgBody.getMsg());
        System.err.println(format);

        map.forEach((k,v)->{
            try {
                if(id.toString().equals(k)){
                    return;
                }

                MsgBody sendMsgBody = new MsgBody();
                sendMsgBody.setSendUserName(msgBody.getSendUserName());
                sendMsgBody.setMsg(msgBody.getMsg());
                v.writeAndFlush(getSendByteBuf(JSONObject.toJSONString(sendMsgBody)));
                System.err.println("服务器回复消息:"+JSONObject.toJSONString(sendMsgBody));
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        });
    }

    /*
     * 从ByteBuf中获取信息 使用UTF-8编码返回
     */
    private String getMessage(ByteBuf buf) {

        byte[] con = new byte[buf.readableBytes()];
        buf.readBytes(con);
        try {
            return new String(con, "UTF8");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
            return null;
        }
    }

    private ByteBuf getSendByteBuf(String message)
            throws UnsupportedEncodingException {

        byte[] req = message.getBytes("UTF-8");
        ByteBuf pingMessage = Unpooled.buffer();
        pingMessage.writeBytes(req);

        return pingMessage;
    }
}

5、新建客户端代码

import com.alibaba.fastjson.JSONObject;
import com.eujian.im.MsgBody;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.io.UnsupportedEncodingException;

public class NettyClientHandler extends SimpleChannelInboundHandler {
    private ByteBuf firstMessage;
    private ChannelHandlerContext ctx;

    private String userName;

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public void sendMsg(String str){
        byte[] data = str.getBytes();
        firstMessage = Unpooled.buffer();
        firstMessage.writeBytes(data);
        ctx.writeAndFlush(firstMessage);
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        this.ctx= ctx;
        MsgBody msgBody = new MsgBody();
        msgBody.setSendUserName(userName);
        msgBody.setMsg("进入聊天室");
        byte[] data = JSONObject.toJSONString(msgBody).getBytes();
        firstMessage = Unpooled.buffer();
        firstMessage.writeBytes(data);
        ctx.writeAndFlush(firstMessage);
    }


    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object msg){
        ByteBuf buf = (ByteBuf) msg;
        String rev = getMessage(buf);
        MsgBody msgBody = JSONObject.parseObject(rev, MsgBody.class);
        String format = String.format("客户端收到服务器消息,发送人:%s,发送信息:%s", msgBody.getSendUserName(), msgBody.getMsg());
        System.err.println(format);
    }

    private String getMessage(ByteBuf buf) {
        byte[] con = new byte[buf.readableBytes()];
        buf.readBytes(con);
        try {
            return new String(con, "UTF8");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
            return null;
        }
    }
}
import com.alibaba.fastjson.JSONObject;
import com.eujian.im.MsgBody;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.util.Scanner;

public class NettyClient {
    public NettyClientHandler nettyClientHandler;
    /*
     * 服务器端口号
     */
    private int port;

    private String sendUserName;
    /*
     * 服务器IP
     */
    private String host;

    public NettyClient(int port, String host, String sendUserName) throws InterruptedException {
        this.port = port;
        this.host = host;
        this.sendUserName = sendUserName;
        start();
    }

    private void start() throws InterruptedException {

        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

        try {

            Bootstrap bootstrap = new Bootstrap();
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
            bootstrap.group(eventLoopGroup);
            bootstrap.remoteAddress(host, port);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel)
                        throws Exception {
                    nettyClientHandler = new NettyClientHandler();
                    nettyClientHandler.setUserName(sendUserName);
                    socketChannel.pipeline().addLast(nettyClientHandler);
                }
            });
            ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
            if (channelFuture.isSuccess()) {
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        Scanner sc = new Scanner(System.in);
                        while (sc.hasNext()){

                            MsgBody msgBody = new MsgBody();
                            msgBody.setSendUserName(sendUserName);
                            msgBody.setMsg(sc.next());
                            nettyClientHandler.sendMsg(JSONObject.toJSONString(msgBody));
                        }
                    }
                }).start();
                System.err.println(sendUserName+"连接服务器成功");
            }
            channelFuture.channel().closeFuture().sync();
        } finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
}

6、新建2个main函数,模拟两个客户端

    public static void main(String[] args) throws InterruptedException {
        new NettyClient(10086, "localhost","tom");
    }
}
    public static void main(String[] args) throws InterruptedException {
        new NettyClient(10086, "localhost","jack");
    }
}

7、启动nettyServer,main1和main2
在main1输入 jack,你好,
在main2输入 tom,hello,我很好
server显示

 

main1显示

 
 


main2显示

码云: https://gitee.com/guoeryyj/netty-im.git

欢迎关注我的微信公众号:进阶者euj

原文地址:https://www.cnblogs.com/yeyongjian/p/12824955.html