Loading... # 消息中间件之RocketMQ(一) ## 什么是消息队列 消息队列是《数据结构》中先进先出的数据结构,在当前的架构中,作为中间件提供服务 ### 消息中间件功能 #### 应用解耦 在SOA模型中,使业务之间不再强依赖,可以异步处理,A与B都通过中间件交流,即使有一方下线了,也可以在上线之后继续处理。 #### 流量削峰 流量达到高峰的时候,通常使用限流算法来控制流量涌入系统,避免系统瘫痪。但是,这种方式损失了一部分请求,此时可以使用消息中间件来缓冲大量的请求,匀速消费,当消息队列中堆积消息过多时,可以动态增加消费端,来保证不丢失重要请求。 #### 大数据处理 消息中间件可以把各个模块中产生的管理员操作日志、用户行为、系统状态等数据文件作为消费收集到主题中,数据使用方可以订阅自己感兴趣的数据内容。 #### 异构系统 实现跨语言模块之间的数据交互。 ### RocketMQ角色 RocketMQ不遵循JMS协议,所以它的架构与ActiveMQ差别非常大。 ![webp.jpg][1] #### Broker - Broker面向producer和consumer接收和发送消息 - 向nameserver提交自己的信息 - 是消息中间件的消息存储、转发服务器 - 每个Broker节点,在启动时,都会遍历NameServer列表,与每个NameServer建立长连接,注册自己的信息,之后定时上报 #### Broker集群 - Broker高可用,可以配成Master/Slave架构,Master可写可读,Slave只可以读,Master将写入的数据同步给Slave - 一个Master可以对应多个Slave,但是一个Slave只能对应一个Master - Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave - Master多机负载,可以部署多个broker - 每个Broker与nameserver集群中的所有节点建立长连接,定时注册Topic信息到所有nameserver #### Producer - 消息的生产者 - 通过NameServer集群中的其中一个节点(随机选择)建立长连接,获得Topic的路由信息,包括Topic下面有哪些Queue,这些Queue分布在哪些Broker上等 - 接下来向提供Topic服务的Master建立长连接,且定时向Master发送心跳 #### Consumer 消息的消费者,通过NameServer集群获得Topic路由信息,连接到对应的Broker上消费消息。注意,由于Master和Slave都可以读取消息,因此Consumer会与Master和Slave都建立连接。 #### NameServer 底层由Netty实现,提供了路由管理、服务注册、服务发现的功能,是一个无状态节点。 **NameServer是服务发现者**,集群中各个角色(Producer,Broker,Consumer等)都需要定时向NameServer上报自己的状态,以便互相发现彼此,超时不上报,NameServer会把它从列表中剔除。 **NameServer可以部署多个**,当多个NameServer存在的时候,其它角色同时向它们上报信息,以保证高可用。 **NameServer集群互不通信**,没有主备的概念。 **NameServer内存式存储**,NameServer中的Broker、Topic等信息默认不会持久化。 #### 对比JMS中的Topic和Queue JMS标准中定义了两个物理上的概念,一个是Queue,基于P2P的消息传输,消息只能被获取一次。另一个是Topic,基于PUB/SUB的消息传输,消息向所有订阅者广播。而在RocketMQ中,所有消息都是以Queue的形式记录的,Topic是一个逻辑上的概念,一个Topic中可以包含多个Queue,消费者可以订阅一个Topic去消费其中所有的Queue,也可以订阅其中的某一个Queue,至于是P2P消费还是PUB/SUB广播,由客户端来决定 `consumer.setMessageModel(MessageModel.BROADCASTING);`。 ![1090617-20190626173042073-147043337.jpg][2] ## Rocket编译安装 **官方网站** > http://rocketmq.apache.org **GitHub** > https://github.com/apache/rocketmq RocketMQ4.6+需要jdk1.8环境编译和运行 **需要的环境** 1. 64bit OS,Linux/Unix/Mac is recommended 2. 64bit JDK 1.8+ 3. Maven 3.2.x 4. Git 5. 4g+ free disk for Broker server **各版本要求** | Version | **Client** | **Broker** | NameServer | | :-: | :-: | :-: | :-: | | 4.0.0-incubating | >=1.7 | >=1.8 | >=1.8 | | 4.1.0-incubating | >=1.6 | >=1.8 | >=1.8 | | 4.2.0 | >=1.6 | >=1.8 | >=1.8 | | 4.3.x | >=1.6 | >=1.8 | >=1.8 | | 4.4.x | >=1.6 | >=1.8 | >=1.8 | | 4.5.x | >=1.6 | >=1.8 | >=1.8 | | 4.6.x | >=1.6 | >=1.8 | >=1.8 | ### 从GitHub上下载源码并上传到服务器 ![image-20200218134855528.png][3] ### 在Linux上安装Maven #### 下载Maven ``` wget https://mirrors.tuna.tsinghua.edu.cn/apache/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz ``` #### 添加阿里云镜像 修改 `maven/conf`目录下的 `settings.xml` 在 `mirrors`节点下添加 ``` <mirror> <id>aliyun-maven</id> <mirrorOf>*</mirrorOf> <name>aliyun maven</name> <url>http://maven.aliyun.com/nexus/content/groups/public</url> </mirror> ``` #### 配置Maven环境变量 修改 `/etc/profile` ``` export M2_HOME=/usr/local/maven export PATH=$PATH:$M2_HOME/bin ``` 环境变量修改完执行 `source /etc/profile`使之生效 #### 进入RocketMQ主目录编译项目 ``` mvn -Prelease-all -DskipTests clean install -U ``` #### 遇到的问题 由于我本地的环境是JDK14,因此编译的时候报Compiler版本错误,于是修改pom.xml文件 ``` <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> <configuration> <source>14</source> <target>14</target> <compilerVersion>${maven.compiler.source}</compilerVersion> <showDeprecation>true</showDeprecation> <showWarnings>true</showWarnings> </configuration> </plugin> ``` 修改之后重新编译,结果出现了新的错误: ![Snipaste][4] 进入TransientStorePool类中查看,发现它引入了sun.nio包下的内容 ![Snipaste][5] sun与jdk包在jdk9版本之后,已经被oracle修改成默认禁止外部引入了,于是再次修改pom.xml,添加: ``` <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> <configuration> <source>14</source> <target>14</target> <compilerVersion>${maven.compiler.source}</compilerVersion> <showDeprecation>true</showDeprecation> <showWarnings>true</showWarnings> <encoding>utf-8</encoding> <verbose>false</verbose> <fork>true</fork> <compilerArgs> <arg>--add-exports=java.base/sun.nio.ch=ALL-UNNAMED,hanlp</arg> <arg>--add-exports=java.base/sun.util.locale=ALL-UNNAMED,hanlp</arg> </compilerArgs> </configuration> </plugin> ``` 修改之后再次编译,编译成功! 编译之后的文件项目位于 `distribution/target/rocketmq-4.7.0`中 ### 启动NameServer 在bin目录执行 ``` ./mqnamesrv ``` 报错:CMS垃圾回收器被废弃,CMS已经在JDK9版本被废弃了,于是打开 `runserver.sh`文件查看,发现: 1. CMS垃圾回收器已经被废弃 2. -Xloggc已经被-Xlog:gc替代 3. -XX:+UseGCLogFileRotation已经被废弃 4. -Djava.ext.dirs已经被废弃 于是做出如下修改: ``` JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC" 替换为>>> JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:MaxGCPauseMillis=1 -XX:InitiatingHeapOccupancyPercent=70" ``` ``` -Xloggc: 替换为>>> -Xlog:gc: ``` ``` JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m" 整行删除 ``` ``` JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib" 整行删除 ``` ``` export CLASSPATH=.:${BASE_DIR}/conf:${CLASSPATH} 替换为>>> export CLASSPATH=${BASE_DIR}/lib/rocketmq-namesrv-4.7.0.jar:${BASE_DIR}/lib/*:${BASE_DIR}/conf:${CLASSPATH} ``` 修改完成之后重新启动NameServer。 正常提示: ![image-20200218150050167.png][6] ### 启动Broker Broker同样需要修改启动文件 `runbroker.sh` ``` -Xloggc: 替换为>>> -Xlog:gc: 并删除 -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy ``` ``` JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m" 整行删除 ``` ``` JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib" 整行删除 ``` ``` export CLASSPATH=.:${BASE_DIR}/conf:${CLASSPATH} 替换为>>> export CLASSPATH=${BASE_DIR}/lib/rocketmq-namesrv-4.7.0.jar:${BASE_DIR}/lib/*:${BASE_DIR}/conf:${CLASSPATH} ``` 添加 ``` JAVA_OPT="${JAVA_OPT} --add-exports=java.base/jdk.internal.ref=ALL-UNNAMED" ``` 修改之后启动Broker ``` ./mqbroker -n localhost:9876 # NameServer启动的默认端口是9876,Broker需要向NameServer注册 ``` 正常提示 ![image-20200218150114714.png][7] 如果出现如下错误: ``` [root@node-113b bin]# ./mqbroker -n localhost:9876 Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000005c0000000, 8589934592, 0) failed ; error='Cannot allocate memory' (errno=12)# # There is insufficient memory for the Java Runtime Environment to continue. # Native memory allocation (mmap) failed to map 8589934592 bytes for committing reserved memory. # An error report file with more information is saved as: # /usr/local/rocketmq/bin/hs_err_pid1997.log ``` **原因:**jvm启动初始化内存分配大于物理内存 修改 `runbroker.sh` ``` JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g" ``` 默认设置的是8g,很大,改小即可。 如果遇到启动不起来,直接退出,而且不报错的情况,删除 `/root/store`文件夹,再试 ### 测试消息发送 使用 `tool.sh`脚本执行测试程序 修改 `tool.sh`文件 ``` JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib:${JAVA_HOME}/jre/lib/ext:${JAVA_HOME}/lib/ext" 整行删除 ``` ``` export CLASSPATH=.:${BASE_DIR}/conf:${CLASSPATH} 替换为>>> export CLASSPATH=${BASE_DIR}/lib/rocketmq-namesrv-4.7.0.jar:${BASE_DIR}/lib/*:${BASE_DIR}/conf:${CLASSPATH} ``` 修改后,在bin目录下执行 ``` ./tools.sh org.apache.rocketmq.example.quickstart.Producer ``` 提示如下表示成功 ![image-20200218145049603.png][8] 如果出现如下报错: ``` ./tools.sh org.apache.rocketmq.example.quickstart.Producer 22:49:02.470 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0). RocketMQLog:WARN Please initialize the logger system properly. java.lang.IllegalStateException: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to null failed ``` **原因:**不知道NameServer在哪儿 在 `tools.sh`脚本中添加 ``` export NAMESRV_ADDR=localhost:9876 ``` ### 接收消息 ``` ./tools.sh org.apache.rocketmq.example.quickstart.Consumer ``` ## 控制台rocketmq-console编译安装 ### 下载 > https://github.com/apache/rocketmq-externals ### 中文指南 > https://github.com/apache/rocketmq-externals/blob/master/rocketmq-console/doc/1_0_0/UserGuide_CN.md ### 上传到服务器并解压缩 ### 编译 进入 `rocketmq-console`目录 执行编译 ``` mvn clean package -Dmaven.test.skip=true ``` ### 启动 编译成功后在 `rocketmq-console/target`目录下执行 `rocketmq-console-ng-1.0.1.jar` 启动时,直接动态添加 `nameserver`地址或编辑 `application.properties`添加属性 ``` java -jar rocketmq-console-ng-1.0.1.jar --rocketmq.config.namesrvAddr=192.168.50.19:9876 # 不要用127.0.0.1 ``` 启动之后JDK14又来刷了一波存在,出现报错 ![Snipaste][9] 这个错误比较简单,原因是Java SE大于等于9的版本中将不再包含java EE 的Jar包。解决办法是手动添加依赖,进入 `rocketmq-console`目录,在pom.xml文件的 `dependencies`标签中添加: ``` <dependency> <groupId>javax.xml.bind</groupId> <artifactId>jaxb-api</artifactId> <version>2.3.0</version> </dependency> <dependency> <groupId>com.sun.xml.bind</groupId> <artifactId>jaxb-impl</artifactId> <version>2.3.0</version> </dependency> <dependency> <groupId>com.sun.xml.bind</groupId> <artifactId>jaxb-core</artifactId> <version>2.3.0</version> </dependency> <dependency> <groupId>javax.activation</groupId> <artifactId>activation</artifactId> <version>1.1.1</version> </dependency> ``` 添加之后重新编译,启动。 启动成功后访问服务器的8080端口即可 ![image-20200218155021928.png][10] [1]: https://www.princelei.club/usr/uploads/2020/06/325312584.png [2]: https://www.princelei.club/usr/uploads/2020/06/2058571528.jpg [3]: https://www.princelei.club/usr/uploads/2020/06/3938566374.png [4]: https://www.princelei.club/usr/uploads/2020/06/853461034.png [5]: https://www.princelei.club/usr/uploads/2020/06/3073186664.png [6]: https://www.princelei.club/usr/uploads/2020/06/662252766.png [7]: https://www.princelei.club/usr/uploads/2020/06/4004939558.png [8]: https://www.princelei.club/usr/uploads/2020/06/3230970608.png [9]: https://www.princelei.club/usr/uploads/2020/06/2540032264.png [10]: https://www.princelei.club/usr/uploads/2020/06/718683209.png Last modification:November 27th, 2020 at 10:51 pm © 允许规范转载