<Scalable IO in Java>学习

原文地址<Scalable IO in Java>

concurrent包的大佬Doug Lea写的.介绍了下NIO的演进史?反正有助于学习NIO.
为了加深印象,挑了一些自己翻读一遍. 然而我翻了半天很多都没有道词典优雅,结果许多翻译都直接用有道词典...

脑图

Scalable network services

Classic Service Design

经典服务设计

Classic Service Design

Each handler may be started in its own thread

每个处理器(handler)可能会启动一个独占的线程.

Classic ServerSocket Loop

经典ServerSocket循环

class Server implements Runnable {
        public void run() {
            try {
                //启动监听端口
                ServerSocket ss = new ServerSocket(PORT);
                //死循环直至线程中止
                while (!Thread.interrupted())
                    //每当Server监听收到一个连接请求,启动一个线程处理连接. accept()为阻塞方法.
                    new Thread(new Handler(ss.accept())).start();
                //    or,    single-threaded,    or    a    thread    pool
            } catch (IOException ex) {    /*    ...    */ }
        }

        static class Handler implements Runnable {
            final Socket socket;

            Handler(Socket s) {
                socket = s;
            }

            public void run() {
                try {
                    byte[] input = new byte[MAX_INPUT];
                    //读数据(阻塞)
                    socket.getInputStream().read(input);
                    byte[] output = process(input);
                    //写数据(阻塞)
                    socket.getOutputStream().write(output);
                } catch (IOException ex) {    /*    ...    */ }
            }

            private byte[] process(byte[] cmd) {    /*    ...    */ }
        }
    }

Scalability Goals

实现弹性要完成的目标


  • Graceful degradation under increasing load (more clients)

  • Continuous improvement with increasing resources (CPU, memory, disk, bandwidth)

  • Also meet availability and performance goals

    • Short latencies
    • Meeting peak demand
    • Tunable quality of service
  • Divide-and-conquer is usually the best approach for achieving any scalability goal


  • 在负载增加(更多客户端接入)的情况下,可以很好的降级. 这里应该指响应上的降级而不是像以前一样暴死

  • 能随着资源(cpu,内存,磁盘,带宽)的增加提升. 可能指性能上提升

  • 同时满足可用性和性能目标.

    • 低延迟
    • 满足高峰需求
    • 服务质量可调节
  • 分治法一直都是满足任何弹性目标最好的实现

Divide and Conquer

分而治之


  • Divide processing into small tasks Each task performs an action without blocking
  • Execute each task when it is enabled Here, an IO event usually serves as trigger

image.png

  • Basic mechanisms supported in java.nio

    • Non-blocking reads and writes
    • Dispatch tasks associated with sensed IO events
  • Endless variation possible

    • A family of event-driven designs

  • 把处理过程分成小任务,每个任务执行一个没有阻塞的操作. 比如常见的{read request,decode request,process service,encode reply,send reply}

  • 当任务可用时执行,如图就是个将IO事件作为触发器的例子.

  • java.nio中支持的基础机制

    • 非阻塞读写
    • 任务事件的分发与IO事件的感知产生联系 这里应该具体指selector了,感知IO事件然后我们可以分发处理各事件.
  • 可能是不断变化的. 永远不变的是变化

    • 一系列事件驱动的设计

Event-driven Designs

事件驱动 设计


  • Usually more efficient than alternatives

  • Fewer resources : Don't usually need a thread per client.

    • Less overhead : Less context switching, often less locking.
    • But dispatching can be slower : Must manually bind actions to events
  • Usually harder to program

    • Must break up into simple non-blocking actions

      • Similar to GUI event-driven actions
      • Cannot eliminate all blocking: GC, page faults, etc
    • Must keep track of logical state of service


  • 通常比其他选择更有效率

    • 更少的资源占用 : 不用总是一个client一个线程.
    • 更少的开销 : 减少了上下文切换,更少的锁.
    • 但是任务调度可能会更慢 : 必须手动绑定事件的行为.
  • 通常编码更困难

    • 必须拆解成简单的非阻塞行为.

      • 类似于GUI事件驱动操作
      • 无法消除所有阻塞 : GC , 磁盘page错误 等等
    • 必须跟踪服务的逻辑状态. 这段没理解

Reactor Pattern

reactor模式


  • Reactor responds to IO events by dispatching the appropriate handler

    • Similar to AWT thread
  • Handlers perform non-blocking actions

    • Similar to AWT ActionListeners
  • Manage by binding handlers to events

    • Similar to AWT addActionListener

  • Reactor 负责响应IO事件并将其分发到适当的handler上.

    • 类似于AWT线程
  • Handlers 负责执行非阻塞操作

    • 类似于AWT操作监听器
  • 通过将handlers绑定到事件上进行管理

    • 类似于AWT的添加操作监听器接口

java.nio Support


  • ChannelsConnections to files, sockets etc that support non-blocking reads

  • BuffersArray-like objects that can be directly read or written by Channels

  • SelectorsTell which of a set of Channels have IO events

  • SelectionKeysMaintain IO event status and bindings


  • Channels连接到文件,sockets等,支持费阻塞读

  • Buffers类似数组的对象,可由通道直接读取或写入

  • Selectors告诉哪一组通道有IO事件

  • SelectionKeys维护IO事件状态和绑定信息

Basic Reactor Design

Single threaded version

单线程版本

image.png

public class BasicReactorDesign {
    final Selector selector;
    final ServerSocketChannel serverSocket;

    /**
     * Reactor : Setup
     *
     * @param port 端口号
     * @throws IOException IO异常
     */
    BasicReactorDesign(int port) throws IOException {
        //reactor启动,打开selector
        selector = Selector.open();
        //打开ServerSocket监听端口
        serverSocket = ServerSocketChannel.open();
        serverSocket.socket().bind(new InetSocketAddress(port));
        //设置为非阻塞模式
        serverSocket.configureBlocking(false);
        //selectionKey
        SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
        sk.attach(new Acceptor());
    }
    /*
    Alternatively, use explicit SPI provider:
    SelectorProvider p = SelectorProvider.provider();
    selector = p.openSelector();
    serverSocket = p.openServerSocketChannel();
    */

    /**
     * Reactor : Dispatch Loop
     */
    public void run() { // normally in a new Thread
        try {
            while (!Thread.interrupted()) {
                selector.select();
                Set<SelectionKey> selected = selector.selectedKeys();
                for (SelectionKey selectionKey : selected) {
                    dispatch(selectionKey);
                }
                selected.clear();
            }
        } catch (IOException ex) { /* ... */ }
    }

    void dispatch(SelectionKey k) {
        Runnable r = (Runnable) (k.attachment());
        if (r != null) {
            r.run();
        }
    }

    /**
     * Reactor : Acceptor
     */
    class Acceptor implements Runnable { // inner

        @Override
        public void run() {
            try {
                SocketChannel c = serverSocket.accept();
                if (c != null) {
                    new Handler(selector, c);
                }
            } catch (IOException ex) { /* ... */ }
        }
    }

    /**
     * Reactor : Handler setup
     */
    final class Handler implements Runnable {

        public static final int MAXIN = 1024;
        public static final int MAXOUT = 1024;

        final SocketChannel socket;
        final SelectionKey sk;
        ByteBuffer input = ByteBuffer.allocate(MAXIN);
        ByteBuffer output = ByteBuffer.allocate(MAXOUT);
        static final int READING = 0, SENDING = 1;
        int state = READING;

        Handler(Selector sel, SocketChannel c) throws IOException {
            socket = c;
            c.configureBlocking(false);
            // Optionally try first read now
            sk = socket.register(sel, 0);
            sk.attach(this);
            sk.interestOps(SelectionKey.OP_READ);
            sel.wakeup();
        }

        boolean inputIsComplete() {/* ... */}

        boolean outputIsComplete() {/* ... */}

        void process() {/* ... */}

        //Reactor 5: Request handling
        @Override
        public void run() {
            try {
                if (state == READING) {
                    read();
                } else if (state == SENDING) {
                    send();
                }
            } catch (IOException ex) { /* ... */ }
        }

        void read() throws IOException {
            socket.read(input);
            if (inputIsComplete()) {
                process();
                state = SENDING;
                // Normally also do first write now
                sk.interestOps(SelectionKey.OP_WRITE);
            }
        }

        void send() throws IOException {
            socket.write(output);
            if (outputIsComplete()) {
                sk.cancel();
            }
        }

    }

}

Multithreaded Designs

多线程设计


  • Strategically add threads for scalability

    • Mainly applicable to multiprocessors
  • Worker Threads

    • Reactors should quickly trigger handlers

      • Handler processing slows down Reactor
    • Offload non-IO processing to other threads

  • Multiple Reactor Threads

    • Reactor threads can saturate doing IO

    • Distribute load to other reactors

      • Load-balance to match CPU and IO rates

  • 有策略地添加线程以实现可伸缩性

    • 主要适用于多处理器(多核)
  • 工作者线程组

    • rector线程组应该可以快速触发handler线程组.

      • handler线程组根据处理能力可以降级reactor. 我的理解
    • 将非IO处理操作转移至其他线程组

  • 复数reactor线程

    • reactor线程组能做IO操作至饱和

    • 将负载分摊给其他reactor. 不知是否如此翻译

      • 负载均衡CPU与IO的比率

Worker Threads

worker线程组


  • Offload non-IO processing to speed up reactor thread

    • Similar to POSA2 Proactor designs
  • Simpler than reworking compute-bound processing into event-driven form

    • Should still be pure nonblocking computation

      • Enough processing to outweigh overhead
  • But harder to overlap processing with IO

    • Best when can first read all input into a buffer
  • Use thread pool so can tune and control

    • Normally need many fewer threads than clients

  • 卸下非IO处理来提速reactor线程

    • 类似于POSA2的Proactor设计
  • 比无界计算处理重构成事件驱动形式更简单

    • 应该仍然是纯粹的非阻塞计算

      • 足够的处理能力比因此引入的开销更有价值
  • 但难与IO重叠处理

    • 最好在第一次读取所有输入到缓冲区
  • 使用线程池,这样可以调优和控制

    • 通常需要的线程比客户机少得多

Worker Thread Pools

image.png

Handler with Thread Pool

可拿此handler带入原basic design版本.直接变成上图形式.

  class Handler implements Runnable {
        // uses util.concurrent thread pool
        static PooledExecutor pool = new PooledExecutor(...);
        static final int PROCESSING = 3;
        // ...
        synchronized void read() { // ...
            socket.read(input);
            if (inputIsComplete()) {
                state = PROCESSING;
                pool.execute(new Processer());
            }
        }
        synchronized void processAndHandOff() {
            process();
            state = SENDING; // or rebind attachment
            sk.interest(SelectionKey.OP_WRITE);
        }
        class Processer implements Runnable {
            public void run() { processAndHandOff(); }
        }
    }

Coordinating Tasks

协调任务?


  • Handoffs

    • Each task enables, triggers, or calls next one Usually fastest but can be brittle
  • Callbacks to per-handler dispatcher

    • Sets state, attachment, etc
    • A variant of GoF Mediator pattern
  • Queues

    • For example, passing buffers across stages
  • Futures

    • When each task produces a result
    • Coordination layered on top of join or wait/notify

  • Handoffs 咋翻译?

    • 每个任务启用、触发或调用下一个任务.
    • 通常是最快的,但也可能很脆弱
  • 对每个per-handler dispatch都有回调

    • 设置状态,附件等
    • GoF Mediator模式的一种变体
  • 队列

    • 举个例子: 将缓冲区buffer传递过各个阶段.
  • Futures

    • 当每个任务产生一个结果
    • 协调层在join或wait/notify的顶部

Using PooledExecutor

使用线程池. 从这篇文章看来,这部分是介绍线程池的使用?


  • A tunable worker thread pool

  • Main method execute(Runnable r)

  • Controls for:

    • The kind of task queue (any Channel)

    • Maximum number of threads

    • Minimum number of threads

    • "Warm" versus on-demand threads

    • Keep-alive interval until idle threads die

      • to be later replaced by new ones if necessary
    • Saturation policy

    • block, drop, producer-runs, etc


  • 一个可调节工作线程池

  • 主要方法为execute(Runnable r)

  • (需要?)控制:

    • 任务队列的类型(任何Channel)

    • 最大线程数

    • 最小线程数

    • "预热"以应对随机应变的线程

    • 保持活动时间间隔,直到空闲线程死亡

      • 如有必要,以后再用新的代替
    • 饱和策略

      • 阻塞,抛弃,producer-runs(这东西怎么翻译?)等.

Multiple Reactor Threads

多Reactor线程


  • Using Reactor Pools

    • Use to match CPU and IO rates

    • Static or dynamic construction

      • Each with own Selector, Thread, dispatch loop
    • Main acceptor distributes to other reactors


  • 使用reactor池

    • 用于匹配CPU和IO速率(这个IO rates不知道怎么翻译?)

    • 静态或动态构造器(还是该翻译成结构呢?)

      • 每个(应该指每个reactor线程)都有自己的selector,线程,调度loop(调度组件.)
    • 主接收器(指负责accept的reactor吧)分发给其他reactor

    Selector[] selectors; // also create threads
    int next = 0;
    class Acceptor { // ...
        public synchronized void run() { ...
            Socket connection = serverSocket.accept();
            if (connection != null)
                new Handler(selectors[next], connection);
            if (++next == selectors.length) next = 0;
        }
    }

Using Multiple Reactors

image.png

Using other java.nio features

使用其他java.nio特性


  • Multiple Selectors per Reactor

    • To bind different handlers to different IO events
    • May need careful synchronization to coordinate
  • File transfer

    • Automated file-to-net or net-to-file copying
  • Memory-mapped files

    • Access files via buffers
  • Direct buffers

    • Can sometimes achieve zero-copy transfer
    • But have setup and finalization overhead
    • Best for applications with long-lived connections

  • 每个reactor有多个selector

    • 将不同的处理程序绑定到不同的IO事件
    • 可能需要小心地使用同步来协调
  • 文件转换

    • 自动[文件到网络]或[网络到文件]的复制
  • 内存映射文件

    • 通过缓冲区访问文件
  • 直接缓冲区

    • 是否可以实现零拷贝
    • 但是有初始化和销毁的开销
    • 最适合具有长期连接的应用程序

Connection-Based Extensions

基于连接的扩展

这部分并不是很明白在讲啥.可能在讲更多的扩展模型


  • Instead of a single service request

    • Client connects
    • Client sends a series of messages/requests
    • Client disconnects
  • Examples

    • Databases and Transaction monitors
    • Multi-participant games, chat, etc
  • Can extend basic network service patterns

    • Handle many relatively long-lived clients
    • Track client and session state (including drops)
    • Distribute services across multiple hosts

  • 并不是单个服务请求

    • 客户端连接
    • 客户端发送一系列的消息/请求
    • 客户端断开连接
  • 样例

    • 数据库和事务监视器
    • 多人游戏、聊天等
  • 可以扩展基本的网络服务模式

    • 处理许多寿命相对较长的客户机
    • 跟踪客户端和会话状态(包括丢弃)
    • 跨多个主机分发服务
原文地址:https://www.cnblogs.com/jiangxiewei/p/13253860.html