0 引言

最近找了许多 RocketMQ 的资料,以为会很齐全,但翻了一下市面上的资料,包括极客时间上的消息队列课程、阿里云的文档以及网上零零散散的博文,总觉得有些意犹未尽,还是觉得再翻翻源码,自己消化整理一下理解会更透彻。这种时候就觉得有些贪心,恨不得阅尽天下书。

开卷有益,那从功利主义的角度来讨论,阅读 NameServer 能够获得什么知识?我觉得主要包括以下几个方面:

  • 了解路由中心架构设计的取舍:RocketMQ 做为 Kafka 的后继者,肯定有对前辈的借鉴。RocketMQ 的 路由中心出于轻量化的考虑,设计了 NameServer,并没有直接使用 ZooKeeper。
  • 了解常用 JVM 参数作用:RocektMQ 对不同的JDK 版本使用了不同的垃圾收集器,在启动脚本中会做判断。
  • 了解 NameSever 的核心配置与启动流程

1 NameServer 设计者的思考

先看看 RocektMQ 的组件架构图吧,直接参考官方 doc:https://rocketmq.apache.org/docs/rmq-arc/,原图截取下来比较糊,我就直接贴地址了。

RocektMQ 没有使用业界常用的 ZooKeeper管理路由信息,而是使用了自研的NameServer,看《RocketMQ 技术内幕》的作者们在阐述NameServer设计理念时讲道:追求最终一致,容忍分钟级的不一致, NameServer 之间不通信,极大降低 NameSever 的实现复杂度。

了解 ZooKeeper 的同学应该都知道 paxos 和 zab协议的复杂性,实现过程很复杂,而且消息广播时,网络的开销也很大,简洁的系统优雅且易维护,从这个角度上来说,NameSever 的设计无疑是正确的。

在正式开始阅读源码之前,我一般会习惯性自己捋一下思路,如果让我自己设计一个新的注册中心,我该如何设计。首先是 NameServer,作为一个路由中心,首先是在单机模式下,需要具备什么功能:路由表、与数据的生产者通信、与数据消费者通信、还有与存储服务器间的通信,其中可能又涉及到心跳机制。然后再分布式系统中,如何保证可用性呢,肯定需要集群部署,好了,分布式系统的经典问题CAP又出现了,需要像 zk 一样吗?采用什么协议呢?如果某个存储机器挂了,怎么进行路由更新?总之就是一大堆问题。还是接着看 NameSever 的实现源码吧。

2 隔靴搔痒不如直面源码

下载 RocketMQ 最新的 release 版本(目前最新的release 是 4.9.3),namesever 对应的模块:

看到代码狂喜,NameServer 的代码量好像很少呀,先跑起来看看。NameServer 的启动类是 org.apache.rocketmq.namesrv.NameServerStartup,配置好环境之后,就可以正常启动了。

Connected to the target VM, address: '127.0.0.1:60554', transport: 'socket'
The Name Server boot success. serializeType=JSON
  • 注:nameserver 启动之前需要配置一下初始环境,包括一些日志文件和默认的本机配置,因为网络上随便搜索都能找到 step-by-step 的教程,这些不是本文的侧重点,故不在此赘述。

接着启动 broker 模块下的 BrokerStartup 类,启动成功之后,控制台也会打印成功的日志:

Connected to the target VM, address: '127.0.0.1:55019', transport: 'socket'
The broker[broker-a, 127.0.0.1:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876

Broker 和 NameServer 启动之后,可以创建一个debug 用的 topic,我本地命名为“TopicTest”。我这里使用控制台新建了一个Topic。

修改 org.apache.rocketmq.example.quickstart.Producer 类,我们发送一条消息到 broker 里面去。

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {

        /*
         * Instantiate with a producer group name.
         */
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

        /*
         * Specify name server addresses.
         * <p/>
         *
         * Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
         * <pre>
         * {@code
         * producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
         * }
         * </pre>
         */

        /*
         * Launch the instance.
         */
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();

        for (int i = 0; i < 1; i++) {
            try {

                /*
                 * Create a message instance, specifying topic, tag and message body.
                 */
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );

                /*
                 * Call send message to deliver message to one of brokers.
                 */
                SendResult sendResult = producer.send(msg);
                /*
                 * There are different ways to send message, if you don't care about the send result,you can use this way
                 * {@code
                 * producer.sendOneway(msg);
                 * }
                 */

                /*
                 * if you want to get the send result in a synchronize way, you can use this send method
                 * {@code
                 * SendResult sendResult = producer.send(msg);
                 * System.out.printf("%s%n", sendResult);
                 * }
                 */

                /*
                 * if you want to get the send result in a asynchronize way, you can use this send method
                 * {@code
                 *
                 *  producer.send(msg, new SendCallback() {
                 *  @Override
                 *  public void onSuccess(SendResult sendResult) {
                 *      // do something
                 *  }
                 *
                 *  @Override
                 *  public void onException(Throwable e) {
                 *      // do something
                 *  }
                 *});
                 *
                 *}
                 */

                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        /*
         * Shut down once the producer instance is not longer in use.
         */
        producer.shutdown();
    }
}

接着便会看到rocketMq 的工作目录多了许多文件:

熟悉RocketMQ 的同学一定一下就看出了相应文件的作用,可能会在下一篇与 RocketMQ 存储相关的文章里再展开详述,这里不赘述。
同理,修改 consumer:

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        /*
         * Instantiate with specified consumer group name.
         */
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");

        /*
         * Specify name server addresses.
         * <p/>
         *
         * Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
         * <pre>
         * {@code
         * consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
         * }
         * </pre>
         */

        consumer.setNamesrvAddr("127.0.0.1:9876");

        /*
         * Specify where to start in case the specific consumer group is a brand-new one.
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        /*
         * Subscribe one more topic to consume.
         */
        consumer.subscribe("TopicTest", "*");

        /*
         *  Register callback to execute on arrival of messages fetched from brokers.
         */
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        /*
         *  Launch the consumer instance.
         */
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

至此,我们在本地搭建好了一套消息生产、路由、存储、消费的流程。我们可以开始真正的源码 debug 流程。

3 聊聊启动脚本中的JVM 知识

NameServer 的启动会调用 distribution 模块下的 runserver.sh ,以下是脚本中的 JVM 启动参数,我们来看看每一个 JVM 参数的含义。

choose_gc_options()
{
    # Example of JAVA_MAJOR_VERSION value : '1', '9', '10', '11', ...
    # '1' means releases befor Java 9
    JAVA_MAJOR_VERSION=$("$JAVA" -version 2>&1 | sed -r -n 's/.* version "([0-9]*).*$/\1/p')
    if [ -z "$JAVA_MAJOR_VERSION" ] || [ "$JAVA_MAJOR_VERSION" -lt "9" ] ; then
      JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
      JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC"
      JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps"
      JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
    else
      JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
      JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0"
      JAVA_OPT="${JAVA_OPT} -Xlog:gc*:file=${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log:time,tags:filecount=5,filesize=30M"
    fi
}
  • -server: 在 server 模式下启动,性能会更好一些,具体的信息可以参考:Real differences between "java -server" and "java -client"?
  • -Xms4g -Xmx4g -Xmn2g:分别是初始堆大小、最大堆大小、年轻代大小(包括eden 和两个 survivor),为了防止申请内存抖动,一般都会把初始堆和最大堆大小设置为等大。
  • -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m:分别是元空间的 FullGC 阈值、元空间占用内存的最大值,具体可以参考:JVM参数MetaspaceSize的误解
  • -XX:+UseConcMarkSweepGC :老年代使用 CMS 垃圾收集器
  • -XX:+UseCMSCompactAtFullCollection :CMS 是基于“标记-清除”算法的,也就意味着会出现内存碎片,导致明明老年代有足够的空间,但无法找到足够大的连续空间来分配对象,不得不提前触发一次 FullGC,开启该开关后,可以再 FullGC 前先进行一次内存碎片的合并整理。
  • -XX:CMSInitiatingOccupancyFraction=70 :还是由于 CMS 浮动垃圾的存在,导致不能等老年代满了再进行垃圾回收,必须预留一部分空间放置浮动垃圾, 可以根据业务需要调整该数值。
  • -XX:+CMSParallelRemarkEnabled:CMS 包括初始标记、并发标记、重新标记、并发清理四个阶段,开启该参数,可以加快重新标记阶段的速度。
  • -XX:SoftRefLRUPolicyMSPerMB=0:代表每MB空闲内存空间可以允许SoftReference对象存活多久(对象不可达之后开始计时)。
  • -XX:+CMSClassUnloadingEnabled :对永久代进行回收
  • -XX:SurvivorRatio=8 :最常见的配置,设置新生代各区域比例,eden : survivor = 8
  • -XX:-UseParNewGC:新生代使用 ParNewGC
  • -XX:+UseG1GC:从 shell 脚本中可以看到 JDK 9之后改为使用 G1垃圾收集器了。
  • -XX:G1HeapRegionSize=16m:每一个 region 设置为16MB
  • -XX:G1ReservePercent=25:空闲空间的预留内存百分比,防止溢出
  • -XX:InitiatingHeapOccupancyPercent=30 可以参考关于G1收集器参数InitiatingHeapOccupancyPercent的正确认知

呼,看完了一大堆JVM 参数,我们回过头来继续阅读 NameServer 的源码。

4 NameServerController——通信的核心

服务的执行入口是在 org.apache.rocketmq.namesrv.NamesrvStartup

/**
     * NameServer 启动其实就是启动 NamesrvController
     * @param args
     * @return
     */
    public static NamesrvController main0(String[] args) {

        try {
            // 1. 创建 NamesrvController
            NamesrvController controller = createNamesrvController(args);
            start(controller);
            String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
            log.info(tip);
            System.out.printf("%s%n", tip);
            return controller;
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(-1);
        }

        return null;
    }
    ```
NameServer 做为路由中心,需要接收很多网络请求,单从 `Controller` 后缀就可以推知是该类大概就是路由的核心类了。

```java
    public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
        //PackageConflictDetect.detectFastjson();

        Options options = ServerUtil.buildCommandlineOptions(new Options());
        commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
        if (null == commandLine) {
            System.exit(-1);
            return null;
        }

        final NamesrvConfig namesrvConfig = new NamesrvConfig();
        final NettyServerConfig nettyServerConfig = new NettyServerConfig();
        nettyServerConfig.setListenPort(9876);
        //  读取 -c 对应的配置文件,解析到  namesrvConfig 和 nettyServerConfig 中去
        if (commandLine.hasOption('c')) {
            String file = commandLine.getOptionValue('c');
            if (file != null) {
                InputStream in = new BufferedInputStream(new FileInputStream(file));
                properties = new Properties();
                properties.load(in);
                MixAll.properties2Object(properties, namesrvConfig);
                MixAll.properties2Object(properties, nettyServerConfig);

                namesrvConfig.setConfigStorePath(file);

                System.out.printf("load config properties file OK, %s%n", file);
                in.close();
            }
        }

        if (commandLine.hasOption('p')) {
            InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
            MixAll.printObjectProperties(console, namesrvConfig);
            MixAll.printObjectProperties(console, nettyServerConfig);
            System.exit(0);
        }

        MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);

        if (null == namesrvConfig.getRocketmqHome()) {
            System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
            System.exit(-2);
        }

        LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
        JoranConfigurator configurator = new JoranConfigurator();
        configurator.setContext(lc);
        lc.reset();
        configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");

        log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

        MixAll.printObjectProperties(log, namesrvConfig);
        MixAll.printObjectProperties(log, nettyServerConfig);

        //重点关注:  配置完 nettyServerConfig 之后,就可以监听 9876 端口。之后 broker 和 客户端有请求过来,就可以处理了。
        final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

        // remember all configs to prevent discard
        controller.getConfiguration().registerConfig(properties);

        return controller;
    }

创建 NamesrvController 的方法涉及到许多配置的加载,我觉得读源码不要线性阅读,因为这样很容易陷入无穷无尽的细节,导致自己大脑堆栈溢出,看到后面完全忘记上下文了。我们先来关注最核心的部分: 一个 NamesrvController 由哪几个部分组成,方法出口已经我已经加了注释:

final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

一共两个核心配置,一个是 NameServerConfig,一个是 NettyServerConfig。
NameServerConfig 的配置如下:

public class NamesrvConfig {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

    // ROCKET_HOME 环境变量
    private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
    // kv 配置的路径
    private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
    // NameServer 自己的配置存储路径
    private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
    // 生产环境的名称
    private String productEnvName = "center";
    // 是否启动了 clusterTest 测试集群
    private boolean clusterTest = false;
    // 是否支持有序消息
    private boolean orderMessageEnable = false;

    // 省略 get set 
    ……
}

NettyServerConfig 的配置如下:

public class NettyServerConfig implements Cloneable {

    // NettyServer 默认的监听端口号 8888
    private int listenPort = 8888;
    // NettyServer 工作线程的数量,默认是8
    private int serverWorkerThreads = 8;
    // netty 回调线程池数量
    private int serverCallbackExecutorThreads = 0;
    // netty io 线程数量,默认为3,负责解析网络请求, 解析之后会交由work线程处理
    private int serverSelectorThreads = 3;
    // broker 端参数,broker 端基于 netty 构建网络服务器的时候会使用以下两参数
    private int serverOnewaySemaphoreValue = 256;
    private int serverAsyncSemaphoreValue = 64;

    // 如果一个网络连接空闲时间超过120s,就会被关闭
    private int serverChannelMaxIdleTimeSeconds = 120;

    // socket send buffer 缓冲区 以及 receive buffer 缓冲区的大小
    private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
    private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;


    private int writeBufferHighWaterMark = NettySystemConfig.writeBufferHighWaterMark;
    private int writeBufferLowWaterMark = NettySystemConfig.writeBufferLowWaterMark;
    private int serverSocketBacklog = NettySystemConfig.socketBacklog;
    private boolean serverPooledByteBufAllocatorEnable = true;

    /**
     * make make install
     *
     *
     * ../glibc-2.10.1/configure \ --prefix=/usr \ --with-headers=/usr/include \
     * --host=x86_64-linux-gnu \ --build=x86_64-pc-linux-gnu \ --without-gd
     */
    // 是否启动 epoll io 模型,默认是不开启的。
    private boolean useEpollNativeSelector = false;
}

配置有点多,看到这里可以大概总结一下:
NettyServerConfig 和 NameServerConfig 组成了 NamesrvController,NameServer负责对外提供Broker 的注册功能。

以上就是 NameSeverController 的配置部分。我们接着往下看:

start(controller);
public static NamesrvController start(final NamesrvController controller) throws Exception {

        if (null == controller) {
            throw new IllegalArgumentException("NamesrvController is null");
        }

        //  here : 方法里面创建了 new ServerBootstrap(),也就是 netty 的核心组件
        boolean initResult = controller.initialize();
        if (!initResult) {
            controller.shutdown();
            System.exit(-3);
        }

        Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
            @Override
            public Void call() throws Exception {
                controller.shutdown();
                return null;
            }
        }));

        controller.start();

        return controller;
    }
    ```


    ```java

    public boolean initialize() {
        // 前文所说的 kv 配置
        this.kvConfigManager.load();
        // 初始化 netty 服务器
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
         
         // netty 服务器的工作线程
        this.remotingExecutor =
            Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

        // 把工作线程池给netty服务器
        this.registerProcessor();

        // 核心线程: 定时任务线程,定时扫描哪些broker 没发送心跳
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);
        
        // 启动一个后台线程执行定时任务,定时打印 kv 配置
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.kvConfigManager.printAllPeriodically();
            }
        }, 1, 10, TimeUnit.MINUTES);

        if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
            // Register a listener to reload SslContext
            try {
                fileWatchService = new FileWatchService(
                    new String[] {
                        TlsSystemConfig.tlsServerCertPath,
                        TlsSystemConfig.tlsServerKeyPath,
                        TlsSystemConfig.tlsServerTrustCertPath
                    },
                    new FileWatchService.Listener() {
                        boolean certChanged, keyChanged = false;
                        @Override
                        public void onChanged(String path) {
                            if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
                                log.info("The trust certificate changed, reload the ssl context");
                                reloadServerSslContext();
                            }
                            if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
                                certChanged = true;
                            }
                            if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
                                keyChanged = true;
                            }
                            if (certChanged && keyChanged) {
                                log.info("The certificate and private key changed, reload the ssl context");
                                certChanged = keyChanged = false;
                                reloadServerSslContext();
                            }
                        }
                        private void reloadServerSslContext() {
                            ((NettyRemotingServer) remotingServer).loadSslContext();
                        }
                    });
            } catch (Exception e) {
                log.warn("FileWatchService created error, can't load the certificate dynamically");
            }
        }

        return true;
    }
    ```

    看完初始化的代码之后,我们再回过头来看看:
    ```
    ```java
public static NamesrvController start(final NamesrvController controller) throws Exception {

        if (null == controller) {
            throw new IllegalArgumentException("NamesrvController is null");
        }

        //  here : 方法里面创建了 new ServerBootstrap(),也就是 netty 的核心组件
        boolean initResult = controller.initialize();
        if (!initResult) {
            controller.shutdown();
            System.exit(-3);
        }

        // 钩子,JVM 关闭的时候会回调该函数
        Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
            @Override
            public Void call() throws Exception {
                controller.shutdown();
                return null;
            }
        }));

        controller.start();

        return controller;
    }

其实这是一种很常见的编程技巧,通过注册一个 JVM 钩子函数,在 JVM 关闭之前先关闭线程池,释放那个资源。

public void shutdown() {
        this.remotingServer.shutdown();
        this.remotingExecutor.shutdown();
        this.scheduledExecutorService.shutdown();

        if (this.fileWatchService != null) {
            this.fileWatchService.shutdown();
        }
}

最后再看一下 NettyRemotingServer 的启动:

    public void start() {
        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            nettyServerConfig.getServerWorkerThreads(),
            new ThreadFactory() {

                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                }
            });

        prepareSharableHandlers();
        // netty 配置相关
        ServerBootstrap childHandler =
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, nettyServerConfig.getServerSocketBacklog())
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                    // 设置了一大堆的网络连接请求处理器
                    // HandShakeHandler 负责连接握手, NettyDecoder是负责编解码的,
                    // IdleStateHandler是负责网络连接管理的,serverHandler 是负责最关键的网络请求处理的。
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline()
                            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                            .addLast(defaultEventExecutorGroup,
                                encoder,
                                new NettyDecoder(),
                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                connectionManageHandler,
                                serverHandler
                            );
                    }
                });
        if (nettyServerConfig.getServerSocketSndBufSize() > 0) {
            log.info("server set SO_SNDBUF to {}", nettyServerConfig.getServerSocketSndBufSize());
            childHandler.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize());
        }
        if (nettyServerConfig.getServerSocketRcvBufSize() > 0) {
            log.info("server set SO_RCVBUF to {}", nettyServerConfig.getServerSocketRcvBufSize());
            childHandler.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize());
        }
        if (nettyServerConfig.getWriteBufferLowWaterMark() > 0 && nettyServerConfig.getWriteBufferHighWaterMark() > 0) {
            log.info("server set netty WRITE_BUFFER_WATER_MARK to {},{}",
                    nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark());
            childHandler.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
                    nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark()));
        }

        if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
            childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        }

        try {
            // 设置netty 要监听的端口,默认是9876
            ChannelFuture sync = this.serverBootstrap.bind().sync();
            InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
            this.port = addr.getPort();
        } catch (InterruptedException e1) {
            throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
        }

        if (this.channelEventListener != null) {
            this.nettyEventExecutor.start();
        }

        this.timer.scheduleAtFixedRate(new TimerTask() {

            @Override
            public void run() {
                try {
                    NettyRemotingServer.this.scanResponseTable();
                } catch (Throwable e) {
                    log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 3, 1000);
    }