加入收藏 | 设为首页 | 会员中心 | 我要投稿 盐城站长网 (https://www.0515zz.cn/)- 运维、云管理、管理运维、智能数字人、AI硬件!
当前位置: 首页 > 站长资讯 > 评论 > 正文

为什么嫌弃 lambda 匿名函数?

发布时间:2021-02-12 13:07:07 所属栏目:评论 来源:互联网
导读:每个系统在执行消息的时候都会可能导致这条消息发不出去(可能是消息去重了,可能是用户的手机号不正确,可能是用户太久没有登录了等等都有可能)。我们在这些『关键位置』都打上日志,方便我们去排查。 这些「关键位置」我们都给它用简单的数字来命个名。比如

每个系统在执行消息的时候都会可能导致这条消息发不出去(可能是消息去重了,可能是用户的手机号不正确,可能是用户太久没有登录了等等都有可能)。我们在这些『关键位置』都打上日志,方便我们去排查。

这些「关键位置」我们都给它用简单的数字来命个名。比如说:我们用「11」来代表这个用户没有绑定手机号,用「12」来代表这个用户10分钟前收到了一条一模一样的消息,用「13」来代表这个用户屏蔽了消息.....

「11」「12」「13」「14」「15」「16」这些就叫做「点位」,把这些点位在关键的位置中打上日志,这个就叫做「埋点」

有了埋点,我们要做的就是将这些点位收集起来,然后统一处理成我们的数据格式,输出到数据源中。

  1. 收集日志
  2. 清洗日志
  3. 输出到数据源

有logAgent帮我们收集日志到Kafka,实时清洗日志我们用的是Flink,清洗完我们输出到Redis(实时)/Hive(离线)。

Hive表的数据样例(主要用于离线报表统计):
 

数据统计

我觉得这块是消息管理平台最最最精华的一部分。

梦回我们当初的接口设计环节,我们就是因为有“数据统计”的需求,才引入了模板的概念。现在我们已经有了一个模板Id了,在我们这边是怎么实现数据的统计的呢?我们对消息的统计都是基于模板的维度来实现的。

在创建模板时就会有一个模板Id生成,基于这个模板Id,我们生成了一个叫做umpId的值:第一位分为技术/运营推送,最后八位是日期,中间六位是模板Id
 

总结一下接口的实现:

  1. 调用方调用接口时,接口不会同步直接调用下游的API发送消息,而是放入消息队列上(支持高并发)
  2. 放入队列时,会根据不同渠道以及不同类型的消息进行分类,放到不同的topic(业务隔离)
  3. 消费队列时,会在本地使用阻塞队列来提高并发度(加快消费的速度)

Id转换

(扩展)在前面也提到了,发不同类型的消息会需要有不同的id类型:微信类需要openId、短信需要手机号、push通知栏推送需要did。

在大多数情况下,一般调用者就传入userId给到我,我这边需要根据不同的消息类型对userId进行转换。

那在我们这边是怎么实现该系统的呢?主要的步骤和逻辑有以下:

  1. 监听用户变更和微信公众号订阅/取关的topic,在Flink清洗出一个统一的数据模型,将清洗后的数据写到另一个的topic。
  2. Id映射系统监听Flink清洗出的topic,实时写到数据源(这里我们用的是搜索引擎)

看着也不会很难,对吧?

有没有想过一个问题,为什么要用一个Id映射系统去监听Flink洗出来的topic,而不是在Flink直接写到数据源呢?

其实通过Flink直接写到数据源也是完全没问题的,而封装了一个Id映射系统,就可以把这活做得更细致。

从描述可以发现的是:在上面只实现了实时增量。很多时候我们会担心增量存在问题,导致部分数据的不准确或者丢失,都会写一份全量,Id映射也是同样的。

那Id映射的全量是怎么做的呢?用户数据通过各种关联关系会在Hive形成一张表,而Id映射的全量就是基于这张Hive表来实现全量(每天凌晨会读取Hive表的信息,再写一遍数据源)。

基于上面这些逻辑,专门给Id映射做了个后台管理(可以手动触发全量、是否开启增量/全量、修改全量触发的时间)

(编辑:盐城站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读