0
点赞
收藏
分享

微信扫一扫

016-Storm借助tick消息定时器统计统计周期性业务


在实际业务中,经常需要定时做一些业务逻辑,如每1分钟做一些统计数值。普通业务做法是启动一个Timer线程或者使用Quartz来做定时触发。在Storm中,可以通过让Topology的系统组件定时发送tick消息,Bolt接收到消息后,触发相应的逻辑来完成



使用Storm组件的定时器需要为bolt重写下面的方法:


public Map<String, Object> getComponentConfiguration() {

 

     Map<String, Object> conf = new HashMap<String, Object>();

 

     conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 60);//每60s持久化一次数据

 

     return conf;

 

 }



其中:Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS 定时消息发送的频率,单位为秒。




我们判断是否为tick消息。可以使用TupleHelpers类中的isTickTuple方法,具体代码:


public static boolean isTickTuple(Tuple tuple) {

 

 return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals(

 

     Constants.SYSTEM_TICK_STREAM_ID);

 

 }




下面通过统计uv来看一下tick消息的使用,通过tick消息,同时也解决每个bolt中map数据过大的问题。


1、业务需求:实时统计每分钟访客数,页面效果如图所示:



016-Storm借助tick消息定时器统计统计周期性业务_数据




2. 平台架构:主要包括日志的传输和采集部分、数据传输和计算平台、持久化数据服务以及在线数据服务部分



2.1日志数据源一般包括以下几个日志数据:


用户浏览网页点击行为和鼠标悬停等会触发相应的日志数据,并实时传递给后端的服务器。


APP框架中内涵所有页面安妮、页面滑动以及页面切换等的埋点,只要用户有相应的操作,就会记录日志,批量发送日志服务器



2.2 storm是实时平台的核心


2.3 Topology中的bolt计算的结果数据和中间交换数据根据业务需求存放到redis、hbase、或者mysql中


2.4 数据持久化到相应的数据库中后,由RPC服务器提供对外统一的访问服务,用户不用关心数据存储的细节



3. 系统架构一般如图所示:


016-Storm借助tick消息定时器统计统计周期性业务_kafka_02




4. 统计UV逻辑计算如图所示:



016-Storm借助tick消息定时器统计统计周期性业务_kafka_03




其中: DeepVisitUVBolt、AggreatorUVBolt通过map来保持中间结果,通过Storm的组件tick定时处理消息。


5.storm程序代码


5.1 Topoloy代码


public class PortalUVTopology {

    private static final String KAFKA_SPOUT_ID = "CustomQueueSpout";
    private static final String LOG_PARSER_ID = "LogParserBolt";
    private static final String DEEP_VISIT_UV_ID = "DeepVisitUVBolt";
    private static final String AGGREGATOR_UV_ID = "AggregatorUVBolt";
    private static final String PERSISTENCE_UV_ID = "PersistenceBoltUVBolt";

    public static void main(String[] args) throws Exception, Exception {
        TopologyBuilder builder = new TopologyBuilder();
        // 读取kafka消息,单线程
        KafkaSpout kafkaSpout = KafkaUtils.getKafkaSpout();
        builder.setSpout(KAFKA_SPOUT_ID, kafkaSpout, 1);
        //builder.setSpout(KAFKA_SPOUT_ID,new  CustomQueueSpout(), 1);
        // 多线程解析消息,输出: yyyyMMddHHmm uid ,即: 201602221411 1000
        builder.setBolt(LOG_PARSER_ID, new LogParserBolt(), 3).shuffleGrouping(KAFKA_SPOUT_ID);
        // 多线程统计每分钟每个uid的uv数,即: yyyyMMddHHmm_uid 4
        builder.setBolt(DEEP_VISIT_UV_ID, new DeepVisitUVBolt(), 3).fieldsGrouping(LOG_PARSER_ID,
                new Fields("date", "uid"));
        // 汇总统计
        builder.setBolt(AGGREGATOR_UV_ID, new AggregatorUVBolt(), 1).noneGrouping(DEEP_VISIT_UV_ID);
        // 持久化存储
        builder.setBolt(PERSISTENCE_UV_ID, new PersistenceUVBolt(),2).shuffleGrouping(
                AGGREGATOR_UV_ID);

        Config conf = new Config();
        conf.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8);
        conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 32);
        conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);
        conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);
        //集群模式
        //StormSubmitter.submitTopology("PortalUV", conf , builder.createTopology());
        //本地模式
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("PortalUV" + System.currentTimeMillis(), conf ,builder.createTopology());
    }
}




举报

相关推荐

0 条评论