Kafka网络模型基础-Reactor(下)

接上篇《Kafka网络模型基础-Reactor(上)

Reactor Java NIO 的实现

从Java 1.4 开始,提供了NIO(非阻塞IO)。NIO 提供了 SocketChannel 和 ServerSocketChannel 两种不同的套接字实现。这两种新增的通道都支持阻塞和非阻塞两种模式。当然我们通常使用非阻塞的 NIO 来处理网络请求响应。NIO 里面新增的 Selector 等相关的API 可以提供给我们实现 Reactor 模式的网络请求响应模型。

Server 端实现

首先对于 Server 端的实现时序图如下所示:

(1)首先打开 ServerSocketChannel,用于监听客户端的连接。

(2)绑定监听端口,设置为非阻塞模式。

(3)创建 Reactor 线程,创建多路复用器并启动线程。

(4)将 ServerSocketChannel 注册到 Reactor 线程的多路复用器 Selector 上,监听 ACCEPT 事件。

(5)多路复用器在线程中,无限循环准备就绪的 Key。

(6)多路复用器监听到有新的客户端接入,则处理新的接入请求,完成TCP 3次握手,建立物理连接。

(7)设置客户端链路为非阻塞。

(8)将新接入的客户端连接注册到 Reactor 线程的多路复用器上,监听读操作,用来读取客户端发送的消息。

(9)异步读取客户端消息到 ByteBuffer 中。

(10)对ByteBuffer 进行解码,读取对应的消息数据。

其中,Server 端的代码实现如下:

MultiplexerTimeServer

public class MultiplexerTimeServer implements Runnable {

    private Selector selector;
    private ServerSocketChannel serverChannel;
    private volatile boolean stop;

    public MultiplexerTimeServer(int port) {
        try {
            selector = Selector.open();
            serverChannel = ServerSocketChannel.open();
            serverChannel.configureBlocking(false);
            serverChannel.socket().bind(new InetSocketAddress(port), 1024);
            serverChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("The time server is start in port : " + port);
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    public void stop() {
        this.stop = true;
    }

    @Override
    public void run() {
        while (!stop) {
            try {
                selector.select(1000);
                Set selectedKeys = selector.selectedKeys();
                Iterator iterator = selectedKeys.iterator();
                SelectionKey key = null;
                while (iterator.hasNext()) {
                    key = iterator.next();
                    iterator.remove();
                    try {
                        handleInput(key);
                    } catch (Exception e) {
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null) {
                                key.channel().close();
                            }
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        if (selector != null) {
            try {
                selector.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private void handleInput(SelectionKey key) throws Exception {
        if (key.isValid()) {
            // 处理新接入的请求消息
            if (key.isAcceptable()) {
                ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                SocketChannel sc = ssc.accept();
                sc.configureBlocking(false);
                sc.register(selector, SelectionKey.OP_READ);
            }
            if (key.isReadable()) {
                // read data
                SocketChannel sc = (SocketChannel) key.channel();
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                int readBytes = sc.read(readBuffer);
                if (readBytes > 0) {
                    readBuffer.flip();
                    byte[] bytes = new byte[readBuffer.remaining()];
                    readBuffer.get(bytes);
                    String body = new String(bytes, "UTF-8");
                    System.out.println("The time server receive order : " + body);
                    String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
                    doWrite(sc, currentTime);
                } else if (readBytes < 0) {
                    // 对端链路关闭
                    key.cancel();
                    sc.close();
                } else {
                    // 忽略
                }
            }
        }
    }

    private void doWrite(SocketChannel channel, String response) throws Exception {
        if (response != null && response.trim().length() > 0) {
            byte[] bytes = response.getBytes();
            ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
            writeBuffer.put(bytes);
            writeBuffer.flip();
            channel.write(writeBuffer);
        }
    }

}

TimeServer

public class TimeServer {

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (Exception e) {
            }
        }

        MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
        new Thread(timeServer, "NIO-MultiplexerTimeServer-001").start();
    }

}

Client 端实现

Client 端的时序图如下所示:

(1)首先打开SocketChannel。

(2)设置SocketChannel为非阻塞,同时设置客户端连接的TCP 参数。

(3)异步连接服务端。

(4)判断是否连接成功,如果连接成功,直接注册读事件到Selector 中;如果没有连接成功,则注册连接事件,重新连接服务端。

(5)向Reactor 线程的多路复用器注册 OP_CONNECT 状态位,监听服务端的TCP 应答。

(6)创建Reactor 线程,创建多路复用器并启动线程。

(7)多路复用器在线程中,无限循环准备就绪的 Key。

(8)接收connect 事件进行处理。

(9)判断连接结果,如果连接成功,则注册读事件到Selector 上面。

(10)读取服务端消息到ByteBuffer中。

(11)对ByteBuffer中的数据进行解码操作。

其中,Client 端的代码如下所示:

TimeClientHandle

public class TimeClientHandle implements Runnable {

    private String host;
    private int port;
    private Selector selector;
    private SocketChannel socketChannel;
    private volatile boolean stop;

    public TimeClientHandle(String host, int port) {
        this.host = host == null ? "127.0.0.1" : host;
        this.port = port;
        try {
            selector = Selector.open();
            socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    @Override
    public void run() {
        try {
            doConnect();
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(1);
        }

        while (!stop) {
            try {
                selector.select(1000);
                Set selectionKeys = selector.selectedKeys();
                Iterator iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    iterator.remove();
                    try {
                        handleInput(key);
                    } catch (Exception e1) {
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null) {
                                key.channel().close();
                            }
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
                System.exit(1);
            }
        }

        // 多路复用器关闭后, 所有注册在上面的Channel 和 Pipe 等资源都会自动去注册并关闭, 所以不需要重复释放资源
        if (selector != null) {
            try {
                selector.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private void handleInput(SelectionKey key) throws Exception {
        if (key.isValid()) {
            // 判断是否连接成功
            SocketChannel sc = (SocketChannel) key.channel();
            if (key.isConnectable()) {
                if (sc.finishConnect()) {
                    sc.register(selector, SelectionKey.OP_READ);
                }
                doWrite(sc);
            } else {
                System.exit(1);
            }

            if (key.isReadable()) {
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                int readBytes = sc.read(readBuffer);
                if (readBytes > 0) {
                    readBuffer.flip();
                    byte[] bytes = new byte[readBuffer.remaining()];
                    readBuffer.get(bytes);
                    String body = new String(bytes, "UTF-8");
                    System.out.println("Now is : " + body);
                    this.stop = true;
                } else if (readBytes < 0) {
                    // 对端链路关闭
                    key.cancel();
                    sc.close();
                } else {
                    // 忽略
                }
            }
        }

    }

    private void doConnect() throws Exception {
        // 如果连接成功, 则注册到多路复用器上, 发送请求消息, 读取应答消息
        if (socketChannel.connect(new InetSocketAddress(host, port))) {
            socketChannel.register(selector, SelectionKey.OP_READ);
            doWrite(socketChannel);
        } else {
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
        }
    }

    private void doWrite(SocketChannel socketChannel) throws Exception {
        byte[] req = "QUERY TIME ORDER".getBytes();
        ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
        writeBuffer.put(req);
        writeBuffer.flip();
        socketChannel.write(writeBuffer);
        if (!writeBuffer.hasRemaining()) {
            System.out.println("Send order 2 server succeed.");
        }
    }

}

TimeClient

public class TimeClient {

    public static void main(String[] args) {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (Exception e) {
            }
        }

        new Thread(new TimeClientHandle("127.0.0.1", port), "TimeClient-001").start();
    }

}

Reactor 模式的三种实现

Reactor 模式可以大致分为:单Reactor单线程、单Reactor多线程、多Reactor多进程。上面我们用Java NIO实现的Reactor 模式中,在Server端多路复用器轮询到网络事件后,使用的当前线程处理的业务逻辑。此时就是前面提到的 单Reactor单线程。

单Reactor单线程


优点:模型简单,没有多线程之间的竞争问题。

缺点:只有一个线程,无法发挥多核CPU的硬件优势;在Handle 处理某个事件时候,无法同时处理其他就绪的事件。

单Reactor多线程


优点:能够充分利用多核CPU的硬件能力。

缺点:多线程之间的竞争问题提高了实现的复杂度;单Reactor 在高并发的场景下可能会是网络处理的瓶颈。

多Reactor多进程


主进程和子进程的职责非常明确,主进程只负责接收新连接,子进程负责完成后续的业务处理;主进程和子进程的交互很简单,主进程只需要把新的连接传递给子进程,子进程无需返回数据;子进程之间是相互独立的,无需同步共享之类的处理(这里仅限于网络模型相关的 select,read,send等无须同步共享,"业务处理"还是有可能需要同步共享的)。


参考:《Netty权威指南》、https://www.jianshu.com/p/188ef8462100