RocketMQ最佳实践

阅读 48

2021-09-30

核心概念

Producer 生产者

A producer sends messages generated by the business application systems to brokers. RocketMQ provides multiple paradigms of sending: synchronous, asynchronous and one-way.
生产者负责发送业务应用生成的消息给broker,RoketMQ提供多种消息发送方式:同步、异步、以及one-way

Producer Group 生产者组

Producers of the same role are grouped together. A different producer instance of the same producer group may be contacted by a broker to commit or roll back a transaction in case the original producer crashed after the transaction.
相同功能的producer组成一个group。另外,一旦producer在事务之后崩溃,group中的其他producer实例可以被broker通知去commit或rollback事务。
Warning: Considering the provided producer is sufficiently powerful at sending messages, only one instance is allowed per producer group to avoid unnecessary initialization of producer instances.
注意:在每个进程内、同一个producer group只允许创建一个producer实例。

Consumer 消费者

A Consumer pulls messages from brokers and feeds them into application. In perspective of user application, two types of consumers are provided:
消费者从broker拉取消息,并提供给其所在应用使用。从用户应用视角来看,有两种类型的消费者:

PullConsumer

Pull consumer actively pulls messages from brokers. Once batches of messages are pulled, user application initiates consuming process.
PullConsumer主动的从broker拉取消息。一批消息拉取成功之后,用户应用就可以开始消费过程了。

PushConsumer

Push consumer, on the other hand, encapsulates message pulling, consuming progress and maintaining other work inside, leaving a callback interface to end user to implement which will be executed on message arrival.
另一方面,PushConsumer封装了消息的pulling、消费的处理过程、以及其他一些内部工作。提供给用户应用一个回调接口,用户应用在其中实现当消息到达时需要回调的业务逻辑。
译者注:所以PullConsumer和PushConsumer本质上其实是一样的?都是pull,只不过前者需要手工自己处理不断循环轮询的过程,后者通过封装提供给用户通知回调的编程形式。

Consumer Group 消费者组

Similar to previously mentioned producer group, consumers of the exactly same role are grouped together and named Consumer Group.
跟producer group类似,具有相同功能的consumer组成consumer group。
Consumer Group is a great concept with which achieving goals of load-balance and fault-tolerance, in terms of message consuming, is super easy.
Consumer group是一个非常好的概念设计,有了它,消息消费过程中的负载均衡、失败容错等等,变得易于设计。
Warning: consumer instances of a consumer group must have exactly the same topic subscription(s). 注意:同一个consumer group的consumer实例必须订阅相同的topic。

Topic 主题

Topic is a category in which producers deliver messages and consumers pull messages. Topics have very loose relationship with producers and consumers. Specifically, a topic may have zero, one or multiple producers that sends messages to it; conversely, a producer can send messages of different topics. In consumer’s perspective, a topic may be subscribed by zero, one or multiple consumer groups. And a consumer group, similarly, may subscribe to one or more topics as long as instances of this group keep their subscription consistent.
Topic其实是一个producer投递消息、consumer拉取消息的分类。topic与生产者和消费者的关系非常松散。具体地说,topic可以有零、一个或多个生产者,它们向其发送消息;反过来,生产者也可以发送消息到不同主题。而从消费者的角度看,主题可以由零、一个或多个消费者组订阅。类似地,消费者组也可以订阅一个或多个主题,只要该组的实例保持其订阅。

Message 消息

Message is the information to be delivered. A message must have a topic, which can be interpreted as address of your letter to mail to. A message may also have an optional tag and extra key-value pairs. For example, you may set a business key to your message and look up the message on a broker server to diagnose issues during development.
消息就是需要投递的信息。一个message必须要有一个topic——可以认为是信的地址。message还可以有可选的tag和key-value对。例如,你可能会想把业务主键与一个消息一一对应起来,以便于在broker上查找这个消息来定位开发中遇到的问题。

Message Queue 消息队列

Topic is partitioned into one or more sub-topics, “message queues”.
每个Topic(的消息)被partition到一或多个message queue中。

Tag 标签

Tag, in other words sub-topic, provides extra flexibility to users. With tag, messages with different purposes from the same business module may have the same topic and different tag. Tags would be helpful to keep your code clean and coherent, and tags also can facilitate the query system RocketMQ provides.
Tag,也叫sub-topic,提供了用户额外的灵活性。来自同一业务模块的不同目的的消息可能具有相同的topic和不同的tag。 tag有助于保持代码的整洁和连贯,tag也可以促进RocketMQ提供的查询系统。

Broker

Broker is a major component of the RocketMQ system. It receives messages sent from producers, store them and prepare to handle pull requests from consumers. It also stores message related meta data, including consumer groups, consuming progress offsets and topic / queue info.
Broker是RocketMQ的核心组件,它接收Producer发来的消息,存储消息,然后处理来自consumer的pull请求。 它也存储消息相关的meta元数据,包括consumer group、消费进度的偏移量offset、以及topic和queue的信息。

Name Server

Name server serves as the routing information provider. Producer/Consumer clients look up topics to find the corresponding broker list.
Name server负责提供路由信息,producer和consumer通过name server来查找topic对应的broker列表。

Message Model 消息模式

RocketMQ支持两种消息:Clustering集群消息Broadcasting广播消息

Message Order 消息顺序

When DefaultMQPushConsumer is employed, you may decide to consume messages orderly or concurrently. 当使用DefaultMQPushConsumer 这个类的时候,需要选择是按顺序消费消息还是并发的消费消息。

  • Orderly 有序消费
    Consuming messages orderly means messages are consumed the same order they are sent by producers for each message queue. If you are dealing with scenario that global order is mandatory, make sure the topic you use has only one message queue. 有序消费消息意味着消息的消费顺序与生产者为每个消息队列发送消息的顺序相同。如果您处理的是强制全局必须有序的场景,请确保您使用的主题只有一个消息队列。
    Warn: If consuming orderly is specified, the maximum concurrency of message consuming is the number of message queues subscribed by the consumer group. 如果指定了按顺序消费,那消息消费的最大并发能力其实就是consumer group订阅的(这个topic对应的)message queue队列数。
  • Concurrently 并发消费
    When consuming messages concurrently, maximum concurrency of message consuming is only limited by thread pool specified for each consumer client. 并发的消息消费模式,这种情况下消息消费最大并发能力只取决于消费者客户端的thread pool。
    Warn: Message order is no longer guaranteed in this mode. 并发消费模式下不保证消息的顺序性。

Producer最佳实践

SendStatus 发送状态

When sending a message, you will get SendResult which contains SendStatus. Firstly, we assume that Message’s isWaitStoreMsgOK=true(default is true). If not, we will always get SEND_OK if no exception is thrown. Below is a list of descriptions about each status:
发送消息的时候,我们会得到返回值SendResult,里边包含SendStatus。首先我们假设消息的isWaitStoreMsgOK=true(默认值)。如果不是,在没有Exception抛出的情况下仍然会得到 SEND_OK状态。下面是关于每个状态的列表:

FLUSH_DISK_TIMEOUT 刷盘超时

If the Broker set MessageStoreConfig’s FlushDiskType=SYNC_FLUSH(default is ASYNC_FLUSH), and the Broker doesn’t finish flushing the disk within MessageStoreConfig’s syncFlushTimeout(default is 5 secs), you will get this status.

FLUSH_SLAVE_TIMEOUT 同步SLAVE超时

If the Broker’s role is SYNC_MASTER(default is ASYNC_MASTER), and the slave Broker doesn’t finish synchronizing with the master within the MessageStoreConfig’s syncFlushTimeout(default is 5 secs), you will get this status.

SLAVE_NOT_AVAILABLE 同步的Master没配置Slave

If the Broker’s role is SYNC_MASTER(default is ASYNC_MASTER), but no slave Broker is configured, you will get this status.

SEND_OK 发送成功

SEND_OK does not mean it is reliable. To make sure no message would be lost, you should also enable SYNC_MASTER or SYNC_FLUSH.

Duplication or Missing 重新投递或丢失

If you get FLUSH_DISK_TIMEOUT, FLUSH_SLAVE_TIMEOUT and the Broker happens to shutdown right the moment, you can find your message missing. At this time, you have two choices, one is to let it go, which may cause this message to be lost; another is to resend the message, which may get message duplication. Often we suggest resend and find a way to handle the duplication removal when consuming. Unless you feel it doesn’t matter when some messages are lost. But keep in mind that resending is useless when you get SLAVE_NOT_AVAILABLE. If this happens, you should keep the scene and alert the Cluster Manager.
如果返回了FLUSH_DISK_TIMEOUT, FLUSH_SLAVE_TIMEOUT,然后刚好这时候Broker( master)挂了,消息这时候就丢失了。这时候有两个选择:1、随他去吧,这个消息丢了就丢了。2、重发这个消息,需要消息的重新投递。 一般我们建议重发消息、并且找到一种办法来处理可能发生的消费时的重复处理问题(译者注:确保幂等性),除非你觉得丢一些消息没关系。但是请注意,返回SLAVE_NOT_AVAILABLE也就是slave不可用的时候,重发是无用的,如果发生了这种情况,应该保存现场然后向集群管理员发出警报。

Timeout 超时

The Client sends requests to Broker, and wait for the responses, but if the max wait time has elapsed and no response is returned, the Client will throw a RemotingTimeoutException. The default wait time is 3 seconds. You can also pass timeout argument using send(msg, timeout) instead of send(msg). Note that we do not suggest the wait time to be too small, as the Broker needs some time to flush the disk or synchronize with slaves. Also the value may have little effect if it exceeds syncFlushTimeout by a lot as Broker may return a response with FLUSH_SLAVE_TIMEOUT or FLUSH_SLAVE_TIMEOUT before the timeout.
客户端发送请求到Broker,然后等待响应,但是如果超过最大等待时间仍然没有收到返回,那么客户端会抛出一个RemotingTimeoutException异常。默认的等待时间是3秒。你可以通过send(msg, timeout) 来自己设定这个超时时间。 注意,我们不建议把超时时间设置的太小,因为Broker需要一些时间来刷盘和与slave做同步。此外,如果该值超过syncFlushTimeout太多,那么也意义不大,因为这时候Broker会在这个超时之前就已经返回给客户端FLUSH_SLAVE_TIMEOUT 或FLUSH_SLAVE_TIMEOUT的响应了。

Message Size 消息大小

We suggest the size of message should be no more than 512K.
我们建议消息的大小应不大于512K

Async Sending 异步发送

Default send(msg) will block until the response is returned. So if you care about performance, we suggest you use send(msg, callback) which will act in the async way.
默认的send(msg) 是同步发送,会阻塞直到响应返回。如果你对性能比较在意,我们建议使用send(msg, callback)做异步发送。

Producer Group 生产者组

Normally, the producer group has no effects. But if you are involved in a transaction, you should pay attention to it. By default, you can only create only one producer with the same producer group in the same JVM, which is usually enough.
通常,producer group没有什么用处。但是一旦你开启了一个事务,那么就应该关注producer group了。 默认的,在同一个JVM里,一个producer group中只能创建一个producer,这其实就够了。

Thread Safety 线程安全

The producer is thread-safe, you can just use it in your business solution.
producer是线程安全的,你可以放心在业务逻辑代码里用它。

Performance 性能

如果你想在一个JVM里创建多个producer来做大数据处理,我们建议:
If you want more than one producer in one JVM for big data processing, we suggest:

  • use async sending with a few producers (3~5 is enough) 每个JVM创建3-5个使用异步发送的producer就足够了
  • setInstanceName for each producer 为每个producer设置实例名

Consumer最佳实践

Consumer Group and Subscriptions 消费者组和订阅

The first thing you should be aware of is that different Consumer Group can consume the same topic independently, and each of them will have their own consuming offsets. Please make sure each Consumer within the same Group to subscribe the same topics.
首先,不同的consumer group可以消费相同的topic而互不影响,consumer group都有自己的标识消费进度的offset。而需要确保的是consumer group内的consumer都需要订阅相同的topic。

MessageListener 消息listener

Orderly

The Consumer will lock each MessageQueue to make sure it is consumed one by one in order. This will cause a performance loss, but it is useful when you care about the order of the messages. It is not recommended to throw exceptions, you can return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT instead.
选择了Orderly的MessageListener的话,Consumer会lock每一个MessageQueue(译者注:跟NameServer连上的时候,每个consumer分配了对应的自己去拿消息的MessageListener),这样是为了确保消费的时候是一个接一个的按顺序消费(译者注:应该说是进程内lock,这样当前jvm内如果有多个consumer都订阅了这个topic、不管是属于同一个group还是不同的group,这样都可以确保每次只有一个consumer去消费,其他等待、然后按顺序一个一个消费。如果是想整个集群中的所有相关consumer全局有序消费,只能去为这个topic设置唯一的MessageQueue这个办法了)。
有序消费方式会带来一些性能上的损失,但是如果你在意消息的有序性这就非常有用。
不建议在消费异常的时候抛exception,可以用返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT的方式代替。

Concurrently

As the name tells, the Consumer will consume the messages concurrently. It is recommended to use this for good performance. It is not recommended to throw exceptions, you can return ConsumeConcurrentlyStatus.RECONSUME_LATER instead.
Concurrently Listener,Consumer会并发的消费消息。建议追求性能的场景使用。我们同样不建议抛异常出来,而是返回ConsumeConcurrentlyStatus.RECONSUME_LATER代替。

Consume Status

For MessageListenerConcurrently, you can return RECONSUME_LATER to tell the consumer that you can not consume it right now and want to reconsume it later. Then you can continue to consume other messages. For MessageListenerOrderly, because you care about the order, you can not jump over the message, but you can return SUSPEND_CURRENT_QUEUE_A_MOMENT to tell the consumer to wait for a moment.
使用MessageListenerConcurrently的情况下,你可以返回 RECONSUME_LATER状态来告知consumer、业务应用程序现在没法消费,想稍后再消费。这样你可以继续的去消费其他的消息。如果是用的MessageListenerOrderly,因为在意顺序,你没法跳过当前的这个消息,但是仍然可以返回SUSPEND_CURRENT_QUEUE_A_MOMENT ,来告知consumer稍等一段时间。

Blocking

It is not recommend to block the Listener, because it will block the thread pool, and eventually may stop the consuming process. 不建议阻塞Listener,因为这会阻塞线程池,最后挂掉整个消费处理过程。

Thread Number (consumer消费)线程的数量

The consumer use a ThreadPoolExecutor to process consuming internally, so you can change it by setting setConsumeThreadMin or setConsumeThreadMax.
consumer内部是使用一个ThreadPoolExecutor 线程池去处理消费,你可以通过 setConsumeThreadMin或setConsumeThreadMax来修改消费线程数量。

ConsumeFromWhere

When a new Consumer Group is established, it will need to decide whether it needs to consume the historical messages which had already existed in the Broker. CONSUME_FROM_LAST_OFFSET will ignore the historical messages, and consume anything produced after that. CONSUME_FROM_FIRST_OFFSET will consume every message existed in the Broker. You can also use CONSUME_FROM_TIMESTAMP to consume messages produced after the specified timestamp.
当一个新的Consumer Group被建立,需要确定是否去消费已经存在于Broker里的那些历史消息。CONSUME_FROM_LAST_OFFSET选项会忽视历史消息,而选择消费从此刻之后生产的消息。CONSUME_FROM_FIRST_OFFSET选项则是消费每一个存在于Broker的消息。你也可以使用CONSUME_FROM_TIMESTAMP去指定消费从某一个timestamp之后的消息。

Duplication 重新投递

Many circumstances could cause duplication, such as:
很多情况都会触发重新投递,比如:

  • Producer resend messages(i.e, in case of FLUSH_SLAVE_TIMEOUT) 生产者重发消息(例如,发生了FLUSH_SLAVE_TIMEOUT)
  • Consumer shutdown with some offsets not updated to the Broker in time. 一些个offset还没有及时修改回broker时消费者就挂了
    So you may need to do some external work to handle this if your application cannot tolerate duplication. For example, you may check the primary key of your DB.
    所以你可能需要做些额外的工作来处理这些情况,如果你的应用不能容忍重新投递。比如你可以校验数据库的主键。(译者注:说的还是RocketMQ可能会重复投递消息,消费程序要自己确保幂等性。)

Broker最佳实践

Broker Role

Broker Role is ASYNC_MASTER, SYNC_MASTER or SLAVE. If you cannot tolerate message missing, we suggest you deploy SYNC_MASTER and attach a SLAVE to it. If you feel ok about missing, but you want the Broker to be always available, you may deploy ASYNC_MASTER with SLAVE. If you just want to make it easy, you may only need a ASYNC_MASTER without SLAVE.
Broker有ASYNC_MASTER, SYNC_MASTER 和SLAVE几个角色。如果你不能容忍消息的丢失,我们建议部署 同步的SYNC_MASTER,并且给它附加1个SLAVE。如果觉得消息丢失也行,但是要求Broker高可用,那么可以部署ASYNC_MASTER加1个SLAVE。如果想简单点搞搞,那么就只部署1个 ASYNC_MASTER就行了。

FlushDiskType 刷盘方式

ASYNC_FLUSH is recommended, for SYNC_FLUSH is expensive and will cause too much performance loss. If you want reliability, we recommend you use SYNC_MASTER with SLAVE.
建议使用ASYNC_FLUSH 异步刷盘,因为SYNC_FLUSH同步刷盘开销巨大、且会带来非常大的性能损耗。如果对可靠性有要求,我们建议使用SYNC_MASTER加SLAVE的部署。

ReentrantLock vs CAS

to be finished

os.sh

to be finished


NameServer最佳实践

In Apache RocketMQ, name servers are designed to coordinate each component of the distributed system and the coordination is mainly achieved through managing topic routing information. Apache RocketMQ中的NameServer是被设计用来协调分布式系统的各个组件,协调主要是通过管理topic路由信息来实现。
Management consists of two parts: Topic路由的管理由两部分组成:

  • Brokers periodically renew meta data kept in every name server. Broker定期更新保存在每个NameServer里的meta元数据。
  • Name servers are serving clients, including producers, consumers and command line clients with the latest routing information. NameServer使用最新的路由信息为客户端提供服务,包括producer、concumer、以及命令行客户端。
    Therefore, before launching brokers and clients, we need to tell them how to reach name servers by feeding them with a name server address list. In Apache RocketMQ, this can be done in four ways.
    因此,在加载broker和客户端之前,我们需要告知它们如何获取namer server的服务器列表,有4种方式:

Programmatic Way 代码方式

For brokers, we can specify namesrvAddr=name-server-ip1:port;name-server-ip2:port in broker configuration file.
For producers and consumers, we can feed name server address list to them as follows:

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("name-server1-ip:port;name-server2-ip:port");
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
consumer.setNamesrvAddr("name-server1-ip:port;name-server2-ip:port");

If you use admin command line from shell, you can also specify this way:

sh mqadmin command-name -n name-server-ip1:port;name-server-ip2:port -X OTHER-OPTION

A simple example is: sh mqadmin -n localhost:9876 clusterList assuming to query cluster info on the name server node.
If you have integrated admin tool into your own dashboard, you can:

DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt("please_rename_unique_group_name");
defaultMQAdminExt.setNamesrvAddr("name-server1-ip:port;name-server2-ip:port");

Java Options 通过java option

Name server address list may also be fed to your application through specifying the sequel java option rocketmq.namesrv.addr before launching.

Environment Variable 通过环境变量

You can export NAMESRV_ADDR environment variable. Brokers and clients will examine and use its value if set.

HTTP Endpoint 通过http Endpoint

If you do not specify name server address list using previously mentioned methods, Apache RocketMQ will access the following HTTP end point to acquire and update name server address list every two minutes with initial delay of ten seconds.
By default, the end point is:
http://jmenv.tbsite.net:8080/rocketmq/nsaddr
You may override jmenv.tbsite.net using this Java option: rocketmq.namesrv.domain, You may also override nsaddr part using this Java option: rocketmq.namesrv.domain.subgroup
If you are running Apache RocketMQ in production, this method is recommended because it gives you maximum flexibility – you can dynamically add or remove name server nodes without necessity of rebooting your brokers and clients according to your name servers’ system load.

Priority 这几种方式的优先级

Programmatic Way > Java Options > Environment Variable > HTTP Endpoint


JVM/Linux调优配置

介绍针对RocketMQ broker的JVM参数和操作系统参数的调优建议,一些需要在部署RoketMQ集群之前进行考虑的一些配置工作。

JVM Options

建议使用最新的JDK1.8版本,以-server参数启动server compiler,并分配8g堆内存空间。将Xmx和Xms设置一样大来避免JVM自动重置堆内存大小,这样可以提高性能。一个简单的JVM启动参数配置如下所示:

-server -Xms8g -Xmx8g -Xmn4g

如果不介意RocketMQ broker的启动时间变慢,可以配置堆pre-touch来确保每个内存页都在JVM启动的时候由操作系统真正分配(注:进程启动的时候、操作系统是不会把JVM声明的内存一下子真正给到进程的,一般会在ygc之后给到、这样会使得gc性能变差一些):If you don’t care about the boot time of RocketMQ broker, pre-touch the Java heap to make sure that every page will be allocated during JVM initialization is a better choice. Those who don’t care about the boot time can enable it:

-XX:+AlwaysPreTouch

禁用偏向锁可以减少JVM暂停(注:stop the world):Disable biased locking may reduce JVM pauses:

-XX:-UseBiasedLocking

建议使用JDK1.8的G1垃圾收集器:As for garbage collection, G1 collector with JDK 1.8 is recommended:

-XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30

上述GC选项可能看起来有一些激进,但是通过我们的生产环境证明这样配置有不错的性能。不要把-XX:MaxGCPauseMillis设置的太小,否则JVM为了达到设置的这样小的gc允许时长、只能触发多次频繁的minor GC。
然后,建议使用滚动gc日志文件:

-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m

如果写gc日志文件增加了broker的延迟,可以考虑把gc日志文件输出到内存文件系统上:

-Xloggc:/dev/shm/mq_gc_%p.log

Linux内核参数配置

RocketMQ自带了一个os.sh脚本文件,里边列出了许多内核参数的调优,基本可以直接使用在生产上。
如下几个参数需要关注一下,更多细节请参考文档 /proc/sys/vm/*[1].
(译者注:vm开头的都是关于虚拟内存的)

vm.extra_free_kbytes, 告诉VM在后台回收(kswapd)开始的阈值和直接回收(通过分配进程)开始的阈值之间保留额外的可用内存。 RocketMQ通过调整这个参数来避免在内存分配时的高延迟。tells the VM to keep extra free memory between the threshold where background reclaim (kswapd) kicks in, and the threshold where direct reclaim (by allocating processes) kicks in.RocketMQ uses this parameter to avoid high latency in memory allocation.

vm.min_free_kbytes, 如果这个值被设置低于1024KB,你的系统将会被微妙的破坏,在高负载时更容易发生死锁。if you set this to lower than 1024KB, your system will become subtly broken, and prone to deadlock under high loads.

vm.max_map_count, 表示一个进程可用的最多内存映射区mmap数量。RocketMQ使用mmap(内存映射)来加载CommitLog和ConsumeQueue,所以建议把这个参数设置的大一些。limits the maximum number of memory map areas a process may have. RocketMQ will use mmap to load CommitLog and ConsumeQueue, so set a bigger value for this parameter is recommended.

vm.swappiness, 定义内核交换内存页行为的激进程度。这个值越大、则越容易发生内存页的交换,越小则减少交换次数。为避免交换延迟,建议将该值设置为10。define how aggressive the kernel will swap memory pages. Higher values will increase agressiveness, lower values decrease the amount of swap. 10 for this value to avoid swap latency is recommended.

File descriptor limits, RocketMQ需要打开fd:比如文件(CommitLog 和ConsumeQueue)和网络连接。所以我们建议设置最大fd打开数量为655350.

Disk scheduler, 建议为RocketMQ使用最晚执行时间(deadline)类型的 I/O调度器,这样可以尝试确保请求的延迟在一个可控范围之内[2]。

Reference 参考

  1. https://www.kernel.org/doc/Documentation/sysctl/vm.txt
  2. https://access.redhat.com/documentation/en-US/Red_Hat_Enterprise_Linux/6/html/Performance_Tuning_Guide/ch06s04s02.html

精彩评论(0)

0 0 举报