为什么嫌弃 lambda 匿名函数?
|
每个系统在执行消息的时候都会可能导致这条消息发不出去(可能是消息去重了,可能是用户的手机号不正确,可能是用户太久没有登录了等等都有可能)。我们在这些『关键位置』都打上日志,方便我们去排查。 这些「关键位置」我们都给它用简单的数字来命个名。比如说:我们用「11」来代表这个用户没有绑定手机号,用「12」来代表这个用户10分钟前收到了一条一模一样的消息,用「13」来代表这个用户屏蔽了消息..... 「11」「12」「13」「14」「15」「16」这些就叫做「点位」,把这些点位在关键的位置中打上日志,这个就叫做「埋点」 有了埋点,我们要做的就是将这些点位收集起来,然后统一处理成我们的数据格式,输出到数据源中。
有logAgent帮我们收集日志到Kafka,实时清洗日志我们用的是Flink,清洗完我们输出到Redis(实时)/Hive(离线)。
Hive表的数据样例(主要用于离线报表统计): 数据统计 我觉得这块是消息管理平台最最最精华的一部分。 梦回我们当初的接口设计环节,我们就是因为有“数据统计”的需求,才引入了模板的概念。现在我们已经有了一个模板Id了,在我们这边是怎么实现数据的统计的呢?我们对消息的统计都是基于模板的维度来实现的。
在创建模板时就会有一个模板Id生成,基于这个模板Id,我们生成了一个叫做umpId的值:第一位分为技术/运营推送,最后八位是日期,中间六位是模板Id 总结一下接口的实现:
Id转换 (扩展)在前面也提到了,发不同类型的消息会需要有不同的id类型:微信类需要openId、短信需要手机号、push通知栏推送需要did。 在大多数情况下,一般调用者就传入userId给到我,我这边需要根据不同的消息类型对userId进行转换。 那在我们这边是怎么实现该系统的呢?主要的步骤和逻辑有以下:
看着也不会很难,对吧? 有没有想过一个问题,为什么要用一个Id映射系统去监听Flink洗出来的topic,而不是在Flink直接写到数据源呢? 其实通过Flink直接写到数据源也是完全没问题的,而封装了一个Id映射系统,就可以把这活做得更细致。 从描述可以发现的是:在上面只实现了实时增量。很多时候我们会担心增量存在问题,导致部分数据的不准确或者丢失,都会写一份全量,Id映射也是同样的。 那Id映射的全量是怎么做的呢?用户数据通过各种关联关系会在Hive形成一张表,而Id映射的全量就是基于这张Hive表来实现全量(每天凌晨会读取Hive表的信息,再写一遍数据源)。
基于上面这些逻辑,专门给Id映射做了个后台管理(可以手动触发全量、是否开启增量/全量、修改全量触发的时间) (编辑:盐城站长网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
