RocketMQ NameServer 源码阅读
0 引言
最近找了许多 RocketMQ 的资料,以为会很齐全,但翻了一下市面上的资料,包括极客时间上的消息队列课程、阿里云的文档以及网上零零散散的博文,总觉得有些意犹未尽,还是觉得再翻翻源码,自己消化整理一下理解会更透彻。这种时候就觉得有些贪心,恨不得阅尽天下书。
开卷有益,那从功利主义的角度来讨论,阅读 NameServer 能够获得什么知识?我觉得主要包括以下几个方面:
- 了解路由中心架构设计的取舍:RocketMQ 做为 Kafka 的后继者,肯定有对前辈的借鉴。RocketMQ 的 路由中心出于轻量化的考虑,设计了 NameServer,并没有直接使用 ZooKeeper。
- 了解常用 JVM 参数作用:RocektMQ 对不同的JDK 版本使用了不同的垃圾收集器,在启动脚本中会做判断。
- 了解 NameSever 的核心配置与启动流程
1 NameServer 设计者的思考
先看看 RocektMQ 的组件架构图吧,直接参考官方 doc:https://rocketmq.apache.org/docs/rmq-arc/,原图截取下来比较糊,我就直接贴地址了。
RocektMQ 没有使用业界常用的 ZooKeeper管理路由信息,而是使用了自研的NameServer,看《RocketMQ 技术内幕》的作者们在阐述NameServer设计理念时讲道:追求最终一致,容忍分钟级的不一致, NameServer 之间不通信,极大降低 NameSever 的实现复杂度。
了解 ZooKeeper 的同学应该都知道 paxos 和 zab协议的复杂性,实现过程很复杂,而且消息广播时,网络的开销也很大,简洁的系统优雅且易维护,从这个角度上来说,NameSever 的设计无疑是正确的。
在正式开始阅读源码之前,我一般会习惯性自己捋一下思路,如果让我自己设计一个新的注册中心,我该如何设计。首先是 NameServer,作为一个路由中心,首先是在单机模式下,需要具备什么功能:路由表、与数据的生产者通信、与数据消费者通信、还有与存储服务器间的通信,其中可能又涉及到心跳机制。然后再分布式系统中,如何保证可用性呢,肯定需要集群部署,好了,分布式系统的经典问题CAP又出现了,需要像 zk 一样吗?采用什么协议呢?如果某个存储机器挂了,怎么进行路由更新?总之就是一大堆问题。还是接着看 NameSever 的实现源码吧。
2 隔靴搔痒不如直面源码
下载 RocketMQ 最新的 release 版本(目前最新的release 是 4.9.3),namesever 对应的模块:
看到代码狂喜,NameServer 的代码量好像很少呀,先跑起来看看。NameServer 的启动类是 org.apache.rocketmq.namesrv.NameServerStartup,配置好环境之后,就可以正常启动了。
Connected to the target VM, address: '127.0.0.1:60554', transport: 'socket'
The Name Server boot success. serializeType=JSON
- 注:nameserver 启动之前需要配置一下初始环境,包括一些日志文件和默认的本机配置,因为网络上随便搜索都能找到 step-by-step 的教程,这些不是本文的侧重点,故不在此赘述。
接着启动 broker 模块下的 BrokerStartup 类,启动成功之后,控制台也会打印成功的日志:
Connected to the target VM, address: '127.0.0.1:55019', transport: 'socket'
The broker[broker-a, 127.0.0.1:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876
Broker 和 NameServer 启动之后,可以创建一个debug 用的 topic,我本地命名为“TopicTest”。我这里使用控制台新建了一个Topic。
修改 org.apache.rocketmq.example.quickstart.Producer
类,我们发送一条消息到 broker 里面去。
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
/*
* Instantiate with a producer group name.
*/
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
/*
* Specify name server addresses.
* <p/>
*
* Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
* <pre>
* {@code
* producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
* }
* </pre>
*/
/*
* Launch the instance.
*/
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 1; i++) {
try {
/*
* Create a message instance, specifying topic, tag and message body.
*/
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
/*
* Call send message to deliver message to one of brokers.
*/
SendResult sendResult = producer.send(msg);
/*
* There are different ways to send message, if you don't care about the send result,you can use this way
* {@code
* producer.sendOneway(msg);
* }
*/
/*
* if you want to get the send result in a synchronize way, you can use this send method
* {@code
* SendResult sendResult = producer.send(msg);
* System.out.printf("%s%n", sendResult);
* }
*/
/*
* if you want to get the send result in a asynchronize way, you can use this send method
* {@code
*
* producer.send(msg, new SendCallback() {
* @Override
* public void onSuccess(SendResult sendResult) {
* // do something
* }
*
* @Override
* public void onException(Throwable e) {
* // do something
* }
*});
*
*}
*/
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
/*
* Shut down once the producer instance is not longer in use.
*/
producer.shutdown();
}
}
接着便会看到rocketMq 的工作目录多了许多文件:
熟悉RocketMQ 的同学一定一下就看出了相应文件的作用,可能会在下一篇与 RocketMQ 存储相关的文章里再展开详述,这里不赘述。
同理,修改 consumer:
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
/*
* Instantiate with specified consumer group name.
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
/*
* Specify name server addresses.
* <p/>
*
* Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
* <pre>
* {@code
* consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
* }
* </pre>
*/
consumer.setNamesrvAddr("127.0.0.1:9876");
/*
* Specify where to start in case the specific consumer group is a brand-new one.
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
/*
* Subscribe one more topic to consume.
*/
consumer.subscribe("TopicTest", "*");
/*
* Register callback to execute on arrival of messages fetched from brokers.
*/
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
/*
* Launch the consumer instance.
*/
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
至此,我们在本地搭建好了一套消息生产、路由、存储、消费的流程。我们可以开始真正的源码 debug 流程。
3 聊聊启动脚本中的JVM 知识
NameServer 的启动会调用 distribution 模块下的 runserver.sh ,以下是脚本中的 JVM 启动参数,我们来看看每一个 JVM 参数的含义。
choose_gc_options()
{
# Example of JAVA_MAJOR_VERSION value : '1', '9', '10', '11', ...
# '1' means releases befor Java 9
JAVA_MAJOR_VERSION=$("$JAVA" -version 2>&1 | sed -r -n 's/.* version "([0-9]*).*$/\1/p')
if [ -z "$JAVA_MAJOR_VERSION" ] || [ "$JAVA_MAJOR_VERSION" -lt "9" ] ; then
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC"
JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps"
JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
else
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0"
JAVA_OPT="${JAVA_OPT} -Xlog:gc*:file=${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log:time,tags:filecount=5,filesize=30M"
fi
}
- -server: 在 server 模式下启动,性能会更好一些,具体的信息可以参考:Real differences between "java -server" and "java -client"?
- -Xms4g -Xmx4g -Xmn2g:分别是初始堆大小、最大堆大小、年轻代大小(包括eden 和两个 survivor),为了防止申请内存抖动,一般都会把初始堆和最大堆大小设置为等大。
- -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m:分别是元空间的 FullGC 阈值、元空间占用内存的最大值,具体可以参考:JVM参数MetaspaceSize的误解
- -XX:+UseConcMarkSweepGC :老年代使用 CMS 垃圾收集器
- -XX:+UseCMSCompactAtFullCollection :CMS 是基于“标记-清除”算法的,也就意味着会出现内存碎片,导致明明老年代有足够的空间,但无法找到足够大的连续空间来分配对象,不得不提前触发一次 FullGC,开启该开关后,可以再 FullGC 前先进行一次内存碎片的合并整理。
- -XX:CMSInitiatingOccupancyFraction=70 :还是由于 CMS 浮动垃圾的存在,导致不能等老年代满了再进行垃圾回收,必须预留一部分空间放置浮动垃圾, 可以根据业务需要调整该数值。
- -XX:+CMSParallelRemarkEnabled:CMS 包括初始标记、并发标记、重新标记、并发清理四个阶段,开启该参数,可以加快重新标记阶段的速度。
- -XX:SoftRefLRUPolicyMSPerMB=0:代表每MB空闲内存空间可以允许SoftReference对象存活多久(对象不可达之后开始计时)。
- -XX:+CMSClassUnloadingEnabled :对永久代进行回收
- -XX:SurvivorRatio=8 :最常见的配置,设置新生代各区域比例,eden : survivor = 8
- -XX:-UseParNewGC:新生代使用 ParNewGC
- -XX:+UseG1GC:从 shell 脚本中可以看到 JDK 9之后改为使用 G1垃圾收集器了。
- -XX:G1HeapRegionSize=16m:每一个 region 设置为16MB
- -XX:G1ReservePercent=25:空闲空间的预留内存百分比,防止溢出
- -XX:InitiatingHeapOccupancyPercent=30 可以参考关于G1收集器参数InitiatingHeapOccupancyPercent的正确认知
呼,看完了一大堆JVM 参数,我们回过头来继续阅读 NameServer 的源码。
4 NameServerController——通信的核心
服务的执行入口是在 org.apache.rocketmq.namesrv.NamesrvStartup
中
/**
* NameServer 启动其实就是启动 NamesrvController
* @param args
* @return
*/
public static NamesrvController main0(String[] args) {
try {
// 1. 创建 NamesrvController
NamesrvController controller = createNamesrvController(args);
start(controller);
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
```
NameServer 做为路由中心,需要接收很多网络请求,单从 `Controller` 后缀就可以推知是该类大概就是路由的核心类了。
```java
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
//PackageConflictDetect.detectFastjson();
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
}
final NamesrvConfig namesrvConfig = new NamesrvConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(9876);
// 读取 -c 对应的配置文件,解析到 namesrvConfig 和 nettyServerConfig 中去
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
MixAll.printObjectProperties(console, namesrvConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
System.exit(0);
}
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
//重点关注: 配置完 nettyServerConfig 之后,就可以监听 9876 端口。之后 broker 和 客户端有请求过来,就可以处理了。
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
return controller;
}
创建 NamesrvController 的方法涉及到许多配置的加载,我觉得读源码不要线性阅读,因为这样很容易陷入无穷无尽的细节,导致自己大脑堆栈溢出,看到后面完全忘记上下文了。我们先来关注最核心的部分: 一个 NamesrvController 由哪几个部分组成,方法出口已经我已经加了注释:
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
一共两个核心配置,一个是 NameServerConfig,一个是 NettyServerConfig。
NameServerConfig 的配置如下:
public class NamesrvConfig {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
// ROCKET_HOME 环境变量
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
// kv 配置的路径
private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
// NameServer 自己的配置存储路径
private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
// 生产环境的名称
private String productEnvName = "center";
// 是否启动了 clusterTest 测试集群
private boolean clusterTest = false;
// 是否支持有序消息
private boolean orderMessageEnable = false;
// 省略 get set
……
}
NettyServerConfig 的配置如下:
public class NettyServerConfig implements Cloneable {
// NettyServer 默认的监听端口号 8888
private int listenPort = 8888;
// NettyServer 工作线程的数量,默认是8
private int serverWorkerThreads = 8;
// netty 回调线程池数量
private int serverCallbackExecutorThreads = 0;
// netty io 线程数量,默认为3,负责解析网络请求, 解析之后会交由work线程处理
private int serverSelectorThreads = 3;
// broker 端参数,broker 端基于 netty 构建网络服务器的时候会使用以下两参数
private int serverOnewaySemaphoreValue = 256;
private int serverAsyncSemaphoreValue = 64;
// 如果一个网络连接空闲时间超过120s,就会被关闭
private int serverChannelMaxIdleTimeSeconds = 120;
// socket send buffer 缓冲区 以及 receive buffer 缓冲区的大小
private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
private int writeBufferHighWaterMark = NettySystemConfig.writeBufferHighWaterMark;
private int writeBufferLowWaterMark = NettySystemConfig.writeBufferLowWaterMark;
private int serverSocketBacklog = NettySystemConfig.socketBacklog;
private boolean serverPooledByteBufAllocatorEnable = true;
/**
* make make install
*
*
* ../glibc-2.10.1/configure \ --prefix=/usr \ --with-headers=/usr/include \
* --host=x86_64-linux-gnu \ --build=x86_64-pc-linux-gnu \ --without-gd
*/
// 是否启动 epoll io 模型,默认是不开启的。
private boolean useEpollNativeSelector = false;
}
配置有点多,看到这里可以大概总结一下:
NettyServerConfig 和 NameServerConfig 组成了 NamesrvController,NameServer负责对外提供Broker 的注册功能。
以上就是 NameSeverController 的配置部分。我们接着往下看:
start(controller);
public static NamesrvController start(final NamesrvController controller) throws Exception {
if (null == controller) {
throw new IllegalArgumentException("NamesrvController is null");
}
// here : 方法里面创建了 new ServerBootstrap(),也就是 netty 的核心组件
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));
controller.start();
return controller;
}
```
```java
public boolean initialize() {
// 前文所说的 kv 配置
this.kvConfigManager.load();
// 初始化 netty 服务器
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
// netty 服务器的工作线程
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
// 把工作线程池给netty服务器
this.registerProcessor();
// 核心线程: 定时任务线程,定时扫描哪些broker 没发送心跳
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
// 启动一个后台线程执行定时任务,定时打印 kv 配置
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
// Register a listener to reload SslContext
try {
fileWatchService = new FileWatchService(
new String[] {
TlsSystemConfig.tlsServerCertPath,
TlsSystemConfig.tlsServerKeyPath,
TlsSystemConfig.tlsServerTrustCertPath
},
new FileWatchService.Listener() {
boolean certChanged, keyChanged = false;
@Override
public void onChanged(String path) {
if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
log.info("The trust certificate changed, reload the ssl context");
reloadServerSslContext();
}
if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
certChanged = true;
}
if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
keyChanged = true;
}
if (certChanged && keyChanged) {
log.info("The certificate and private key changed, reload the ssl context");
certChanged = keyChanged = false;
reloadServerSslContext();
}
}
private void reloadServerSslContext() {
((NettyRemotingServer) remotingServer).loadSslContext();
}
});
} catch (Exception e) {
log.warn("FileWatchService created error, can't load the certificate dynamically");
}
}
return true;
}
```
看完初始化的代码之后,我们再回过头来看看:
```
```java
public static NamesrvController start(final NamesrvController controller) throws Exception {
if (null == controller) {
throw new IllegalArgumentException("NamesrvController is null");
}
// here : 方法里面创建了 new ServerBootstrap(),也就是 netty 的核心组件
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
// 钩子,JVM 关闭的时候会回调该函数
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));
controller.start();
return controller;
}
其实这是一种很常见的编程技巧,通过注册一个 JVM 钩子函数,在 JVM 关闭之前先关闭线程池,释放那个资源。
public void shutdown() {
this.remotingServer.shutdown();
this.remotingExecutor.shutdown();
this.scheduledExecutorService.shutdown();
if (this.fileWatchService != null) {
this.fileWatchService.shutdown();
}
}
最后再看一下 NettyRemotingServer 的启动:
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
prepareSharableHandlers();
// netty 配置相关
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, nettyServerConfig.getServerSocketBacklog())
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
// 设置了一大堆的网络连接请求处理器
// HandShakeHandler 负责连接握手, NettyDecoder是负责编解码的,
// IdleStateHandler是负责网络连接管理的,serverHandler 是负责最关键的网络请求处理的。
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
serverHandler
);
}
});
if (nettyServerConfig.getServerSocketSndBufSize() > 0) {
log.info("server set SO_SNDBUF to {}", nettyServerConfig.getServerSocketSndBufSize());
childHandler.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize());
}
if (nettyServerConfig.getServerSocketRcvBufSize() > 0) {
log.info("server set SO_RCVBUF to {}", nettyServerConfig.getServerSocketRcvBufSize());
childHandler.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize());
}
if (nettyServerConfig.getWriteBufferLowWaterMark() > 0 && nettyServerConfig.getWriteBufferHighWaterMark() > 0) {
log.info("server set netty WRITE_BUFFER_WATER_MARK to {},{}",
nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark());
childHandler.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark()));
}
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
try {
// 设置netty 要监听的端口,默认是9876
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
}