NIO之socket交互。

参考: https://www.cnblogs.com/yanghuahui/p/3686054.html

https://www.jianshu.com/p/c26a25feb77e

参考上面的代码,写了份demo。雷同率高达98.888888888888~%

本来想加个服务端断掉,客户端尝试重连的功能。发现难度有点大。如有解决方案,希望可以告知,感激不尽。

测试代码

IChat.java

package com.boot.demo.test.io.socket.chat;

/**
 * @author braska
 * @date 2020/3/23
 **/
public interface IChat {
    void run() throws Exception;
}

 

AbstractChat.java

package com.boot.demo.test.io.socket.chat;

import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;

/**
 * @author braska
 * @date 2020/3/23
 **/
public abstract class AbstractChat implements IChat {
    protected final String USER_CONTENT_SPLIT = "#@#";
    protected final String USER_EXISTS_ERROR = "system message: user exist, please change a name";
    protected final String WELCOME = "welcome %s to chat room! Online numbers: %s";

    protected Charset charset = Charset.forName("UTF-8");

    protected int port;
    protected Selector selector;
    protected SelectionKey currentKey;

    @Override
    public void run() throws Exception {
        while (this.selector.select() > 0) {
            for (Iterator<SelectionKey> iter = this.selector.selectedKeys().iterator(); iter.hasNext(); ) {
                currentKey = iter.next();
                iter.remove();
                keyHandler(currentKey);
            }
        }
    }

    protected abstract void keyHandler(SelectionKey key) throws Exception;

    protected SocketChannel getChannel() {
        return (SocketChannel) currentKey.channel();
    }

    protected String content() throws Exception {
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        StringBuilder builder = new StringBuilder(256);
        while (getChannel().read(byteBuffer) > 0) {
            byteBuffer.flip();
            builder.append(charset.decode(byteBuffer));
            byteBuffer.clear();
        }
        return builder.toString();
    }
}

  

ClientServer.java

package com.boot.demo.test.io.socket.chat;

import com.google.common.base.Strings;
import com.google.common.collect.Maps;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.*;
import java.util.Map;

/**
 * @author braska
 * @date 2020/3/23
 **/
public class ChatServer extends AbstractChat {
    private int port;
    private final Map<String, SocketChannel> users;

    public ChatServer(int port) throws Exception {
        this.users = Maps.newConcurrentMap();
        this.port = port;
        ServerSocketChannel channel = ServerSocketChannel.open();
        channel.socket().bind(new InetSocketAddress(this.port));
        channel.configureBlocking(false);
        this.selector = Selector.open();
        channel.register(selector, SelectionKey.OP_ACCEPT);
    }

    @Override
    public void keyHandler(SelectionKey key) {
        try {
            if (key.isAcceptable()) {    // 客户端连入
                ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                SocketChannel sc = ssc.accept();
                sc.configureBlocking(false);
                sc.register(this.selector, SelectionKey.OP_READ);
                key.interestOps(SelectionKey.OP_ACCEPT);
                System.out.println("Server is listening from client :" + sc.getRemoteAddress());
                sc.write(charset.encode("Please input your name:"));
            } else if (key.isReadable()) {  // 接收客户端消息
                SocketChannel channel = getChannel();
                key.interestOps(SelectionKey.OP_READ);
                String content = content();
                if (!Strings.isNullOrEmpty(content)) {
                    String[] nameAndContent = content.split(USER_CONTENT_SPLIT);
                    if (nameAndContent != null && nameAndContent.length == 1) {
                        if (users.containsKey(nameAndContent[0]) && users.get(nameAndContent[0]).isConnected()) {
                            channel.write(charset.encode(USER_EXISTS_ERROR));
                        } else {
                            users.put(nameAndContent[0], channel);
                            dispatch(String.format(WELCOME, nameAndContent[0], countUsers()), null);
                        }
                    } else {
                        content = nameAndContent[0] + " said: " +
                                content.replace(String.format("%s%s", nameAndContent[0], USER_CONTENT_SPLIT), "");
                        if (users.containsKey(nameAndContent[0])) {
                            dispatch(content, channel);
                        }
                    }
                }
            }
        } catch (Exception e) {
            if (key != null) {
                key.cancel();
                if (key.channel() != null) {
                    try {
                        for (Map.Entry<String, SocketChannel> entry : users.entrySet()) {
                            if (entry.getValue().equals(key.channel())) {
                                dispatch(entry.getKey() + " has gone.", key.channel());
                                break;
                            }
                        }
                        key.channel().close();
                    } catch (IOException e1) {
                        e1.printStackTrace();
                    }
                }
            }
        }
    }

    private long countUsers() {
        return this.selector.keys().stream().filter(key -> key.channel() instanceof SocketChannel).count();
    }

    private void dispatch(String content, SelectableChannel channel) {
        selector.keys().stream()
                .filter(key -> key.channel() instanceof SocketChannel)
                .filter(key -> key.channel() != channel)
                .forEach(key -> {
                    SocketChannel c = (SocketChannel) key.channel();
                    try {
                        c.write(charset.encode(content));
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                });
    }

    public static void main(String[] args) throws Exception {
        new ChatServer(3000).run();
    }
}

  

ChatClient.java

package com.boot.demo.test.io.socket.chat;

import org.apache.logging.log4j.util.Strings;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Scanner;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author braska
 * @date 2020/3/23
 **/
public class ChatClient extends AbstractChat {
    private String username;
    private Integer retry = 20;
    private int interval = 1000;
    private AtomicInteger count = new AtomicInteger(0);
    private Semaphore semaphore = new Semaphore(1);

    public ChatClient(int port) throws Exception {
        this.port = port;
        connect();
    }

    public void setRetry(Integer retry) {
        this.retry = retry;
    }

    public void setInteval(int inteval) {
        this.interval = inteval;
    }

    private void connect() throws Exception {
        this.selector = Selector.open();
        SocketChannel channel = SocketChannel.open();
        channel.configureBlocking(false);   
        channel.connect(new InetSocketAddress(this.port));
        channel.register(selector, SelectionKey.OP_CONNECT);

        if (semaphore.tryAcquire() && count.get() != 0 && count.get() <= retry) {
            System.out.println("try to connect chat server " + count.get() + "time");
            connect();
        }
    }

    @Override
    public void keyHandler(SelectionKey key) {
        try {
            SocketChannel channel = getChannel();
            if (key.isValid() && key.isConnectable()) {
                if (channel.finishConnect()) {
                    count.set(0);
                    username = "";
                }
                channel.register(this.selector, SelectionKey.OP_READ);
                new Thread(() -> {
                    Scanner scanner = new Scanner(System.in);
                    try {
                        while (scanner.hasNextLine()) {
                            String content = scanner.nextLine();
                            if ("".equals(content)) continue;
                            if (!Strings.isNotBlank(username)) {
                                username = content;
                                content += USER_CONTENT_SPLIT;
                            } else {
                                content = username + USER_CONTENT_SPLIT + content;
                            }
                            try {
                                ChatClient.this.getChannel().write(charset.encode(content));
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                    } finally {
                        scanner.close();
                    }
                }).start();
            } else if (key.isReadable()) {
                key.interestOps(SelectionKey.OP_READ);
                String content = content();
                if (content.equals(USER_EXISTS_ERROR)) {
                    username = "";
                }
                System.out.println(content);
            }
        } catch (Exception e) {
            if (key != null) {
                key.cancel();
                if (key.channel() != null) {
                    try {
                        key.channel().close();
                        // todo 调用连接方法,目前有问题注释掉。
/*                        semaphore.release();
                        count.incrementAndGet();
                        connect();
                        TimeUnit.MILLISECONDS.sleep(interval);*/
                    } catch (Exception e1) {
                        e1.printStackTrace();
                    }
                }
            }
        }
    }

    public static void main(String[] args) throws Exception {
        new ChatClient(3000).run();
    }
}

  

重连方法也不是不可用,就是有大bug:服务端启动以后,控制台日志会打印出两条客户端连进来的日志。在线人数就翻倍了。

因为有问题,所以注释掉了。

查看别人的客户端重连代码,发现根本不可用。可能是我应用的场景不对。

@Test
    public void test_finishConnect_connect4() throws Exception {
        SocketChannel socketChannel = SocketChannel.open();
        long begin = 0;
        long end = 0;
        try {
            TimeUnit.SECONDS.sleep(1);
            socketChannel.configureBlocking(false);
            boolean result = socketChannel.connect(new InetSocketAddress(3000));
            if(!result){
                while (!socketChannel.finishConnect()){
                    System.out.println("一直在尝试连接");
                    TimeUnit.SECONDS.sleep(1);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

  

原文地址:https://www.cnblogs.com/braska/p/12559439.html