目录
1.前提2.选择性订阅部分核心数据3。RabbitMQ的queue与exchange绑定回顾4.direct exchange实现消息路由5。按需订阅数的代码实现6。按需订阅更强大、更灵活。1.前提
上一篇文章《你以为架构师每天都在画画PPT我们已经给出了一套完整的数据一致性保证方案,告诉你还有很多其他~。
我们从以下三个角度给出了如何实现计划。并通过数据平台和电子商务系统进行例子分析。
核心数据监控数据链路跟踪自动化数据链路分析
到目前为止,我们的架构图大致如下:
我们以前是如何基于这种架构的?MQ还详细说明了解耦的实现。
对于不清楚的学生,可以具体看前三篇文章:
当我第一次成为一名架构师时,我设计了一个高并发架构,发现了N个痛点。。。如何设计100亿流量的系统架构?今天我要教你!我和同事们谈到了架构设计。让我们听听…
然后,在本文中,我们将进一步解释基于此架构的数据一致性。同样,我们RabbitMQ以消息中间件为例。
二、选择性订阅部分核心数据
第一个基础MQ实现的细节是,例如,对于数据监控系统,它可能只是从MQ订阅部分数据消费。
这是什么意思?比如实时计算平台,他会把自己计算的所有数据指标都投递给自己MQ里去的。
但这些数据指标可能多达几十个甚至几百个,不可能所有的数据指标都是核心数据吗?
基本上根据我们过去的经验,这类数据系统的核心数据指标约占10%。
那么对于数据查询平台来说,他可能需要消耗所有的数据指标,然后落地到自己的存储中。
但对于数据监控系统来说,他只需要过滤10%的核心数据指标,所以他需要有选择地订阅数据。
让我们看看下面的图片,立刻明白它是什么意思。
三、RabbitMQ的queue与exchange的绑定
不知道大家还记不记得以前的解释RabbitMQ多系统订阅相同数据的场景。
我们使用的是每个系统使用自己的一个queue,但都绑定到一个fanout exchange然后生产者直接将数据发送到上面fanout exchange。
fanout exchange它将分发数据并绑定到所有自己queue然后每个系统都会从自己开始queue拿到同样的数据。
再看下图回顾一下。
以下是一个关键代码:
也就是说,创造自己queue绑定到exchange上去,这种绑定关系在RabbitMQ有一个专业术语叫:binding。
四、direct exchange实现新闻路由
若只使用以前的fanout exchange,因此,不同的系统无法按需订阅数据。如果允许不同的系统按需订阅数据,则需要使用direct exchange。
direct exchange允许你在发送信息时给每个信息打一个routing key。同时direct exchange还允许binding到自己的queue指定一个binding key。
这样,direct exchange根据消息routing key这个消息路由同一个binding key对应的queue这样,不同的系统就可以根据需要订阅数据。
说了这么多,是不是觉得有点晕?老规矩,我们来一张图,直观的感受一下发生了什么:
而且一个queue可以使用多个binding key例如,使用k1”和“k2”两个binding key的话,那么routing key为“k1”和“k2消息会路过那个queue里去。
同时不同的queue也可以指定相同的ruoting key,此时跟随fanout exchange其实是一样的,一个消息会同时路过多个queue里去。
五、实现按需订阅的代码
首先,在生产者中,比如实时计算平台,他应该定义一个direct exchange了。
如下代码所示,所有数据都投递到此处exchange例如,我们在这里使用它exchange名字就是“rt_data意思是实时数据计算结果,类型为direct”:
channel.exchangeDeclare(
"rt_data",
"direct");
而且,在发送消息时,给消息贴上标签,也就是他的routing key,表明该消息是普通数据还是核心数据,以实现路由,如下代码所示:
上面的第一个参数是指向哪个投递exchange进去,第二个参数是routing key,这里的“common_data代表普通数据,也可以使用core_data实时计算平台代表核心数据,根据自身情况指定普通或核心数据。
然后消费者在做queue和exchange的binding需要指定时间binding key,代码如下:
以上第一行是在消费者身上定义的,比如数据监控系统。direct exchange。
然后第二行是定义rt_data_monitor“这个queue。
第三行是对的queue和exchange绑定,指定binding key是“core_data”。
如果是数据查询系统,需要普通数据和核心数据,那么就可以了binding key指定多个值,用逗号隔开,如下所示:
channel.queueBind(
"rt_data_query",
"rt_data",
"common_data, core_data");
在这里,我们将了解如何标记不同的数据(即routing key),然后让不同的系统按需订阅所需的数据(即指定binding key),这样就用到了direct exchange这种类型非常灵活。
最后,看看之前画的图片,大家再来感受一下:
六、按需订阅更强大、更灵活
RabbitMQ 还支持更强大、更灵活的按需数据订阅,即使用topic exchange,其实跟direct exchange类似,只是功能更强大。
比如你定义一个topic exchange,然后routing key多个单词需要指定为用点号分开,如下所示:
然后,你在设置binding key的时候,他是支持通配符的。 * 匹配一个单词,# 匹配0个或者多个单词,比如说你的binding key可以这么来设置:
这个product.*.* ,就会跟“product.common.data”匹配上,意思就是,可能某个系统就是对商品类的数据指标感兴趣,不管是普通数据还是核心数据。
所以到这里,大家就应该很容易明白了,通过RabbitMQ的direct、topic两种exchange,我们可以轻松实现各种强大的数据按需订阅的功能。
通过本文,我们就将最近讲的数据一致性保障方案里的一些MQ中间件落地的细节给大家说明白了。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至827202335@qq.com 举报,一经查实,本站将立刻删除。文章链接:https://www.eztwang.com/dongtai/65192.html