简述storm的拓扑结构(storm拓扑原理)

技术storm怎么构建拓扑代码这篇文章主要讲解了“storm怎么构建拓扑代码”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“storm怎么构建拓扑代码”吧!1. 构建拓扑

这篇文章主要讲解了"风暴怎么构建拓扑代码",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"风暴怎么构建拓扑代码"吧!

storm怎么构建拓扑代码

storm怎么构建拓扑代码

storm怎么构建拓扑代码

1.构建拓扑代码

packagedemo

导入回类型。暴风雨。拓扑结构。拓扑生成器;

导入回类型。暴风雨。元组。字段;

publicclassAreaAmtTopo {

publicationstativitmain(String[]args){ 0

TopologyBuilderbuilder=newTopologyBuilder();

builder.setSpout('spout '),new orders baseboout(kafkapproperties .Order_topic),5);

builder.setBolt('filter ',newAreaFilterBolt(),5).无序分组(' spout ');

builder.setBolt('areabolt ',newAreaAmtBolt(),2).fieldsGrouping('filter ',新字段(' area _ id ');

builder.setBolt('rsltbolt ',newAreaRsltBolt(),1).无序分组('区域螺栓');

}

}2.一级过滤螺栓

packagedemo

导入Java。乌提尔。地图;

导入回类型。暴风雨。任务。topologycontext

导入回类型。暴风雨。拓扑结构。basicoutputcollector

导入回类型。暴风雨。拓扑结构。ibasicbolt

导入回类型。暴风雨。拓扑结构。OutPutfields clarer

导入回类型。暴风雨。元组。字段;

导入回类型。暴风雨。元组。元组;

导入回类型。暴风雨。元组。价值观;

//一级的过滤螺栓

public class arefilterbolt implementsibasicbolt {

@覆盖

public void declareoutputfield(outputfield claredclarer){ 0

//TODOAuto-generatedmethodstub

庄家。声明(NewFields(' area _ id ',' order_amt ',' create _ time ');//元组里面每个价值的对应名字

}

@覆盖

nbsp; public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public void cleanup() {
        // TODO Auto-generated method stub

    }

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        //order_id,order_amt,create_time,area_id
        String order=input.getString(0);//取出集合values中的第一个value
        if(order!=null){
            
            String orderArr[]=order.split("\\t");
            collector.emit(new Values(orderArr[3],orderArr[1],DateFmt.getCountDate(orderArr[2], DateFmt.date_short)));//area_id,order_amt,create_time
            
        }

     }

    @Override
    public void prepare(Map arg0, TopologyContext arg1) {
        // TODO Auto-generated method stub

    }

}

3.局部汇总bolt(按日期和区域和汇总)

package demo;

import java.util.HashMap;
import java.util.Map;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

//局部汇总
public class AreaAmtBolt implements IBasicBolt {

    
    Map<String,Double> countsMap=null;
    @Override
    public void declareOutputFields(
            OutputFieldsDeclarer declarer) {
        
        declarer.declare(new Fields("date_area","amt"));

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public void prepare(Map paramMap, TopologyContext paramTopologyContext) {
        // TODO Auto-generated method stub
         countsMap =new HashMap<String, Double>();
    }

    @Override
    public void execute(Tuple input,
            BasicOutputCollector collector) {
        
        if(input!=null)//如果spout端没数据就会发空值,所以要做判断再往下发
        {
        String area_id=input.getString(0);
        Double order_amt=input.getDouble(1);
        String  order_date=input.getStringByField("order_date");
        
        Double count=countsMap.get(area_id+"_"+order_date);
        if (count==null){
            count = 0.0;    
        }
        
        count+=order_amt;
        countsMap.put(area_id+"_"+order_date,count);
        System.err.println("areaAmtBolt"+order_date+"_"+area_id+"="+count);
        collector.emit(new Values(area_id+"_"+order_date,count));
        }
    }

    @Override
    public void cleanup() {
        countsMap.clear();
    }

}

4. 最终结果写入Hbase

package demo;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

//结果定时写入hbase的bolt
public class AreaRsltBolt implements IBasicBolt {

    Map<String,Double> countsMap=null;
    long beginTime=System.currentTimeMillis();
    long endTime=0L;
    HBaseDao dao=null;
    @Override
    public void declareOutputFields(
            OutputFieldsDeclarer paramOutputFieldsDeclarer) {
        // TODO Auto-generated method stub

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public void prepare(Map paramMap, TopologyContext paramTopologyContext) {
         countsMap =new HashMap<String, Double>();
         dao=new HBaseDAOImp();
    }

    @Override
    public void execute(Tuple input,
            BasicOutputCollector paramBasicOutputCollector) {
        String date_areaid=input.getString(0);
        double  order_amt=input.getDouble(1); 
        countsMap.put(date_areaid,order_amt);
        endTime=System.currentTimeMillis();
        if (endTime-beginTime>=5*1000){
        

           for(String key:countsMap.keySet()){
              //put into hbase
            //2014-05-05_1,amt
              dao.insert("area_order","cf","order_amt",countsMap.get(key));
              System.err.println("rsltBolt put hbase: key="+key+"; order_amt="+countsMap.get(key));
            }
            beginTime=System.currentTimeMillis();
        }
        
    }

    @Override
    public void cleanup() {
        // TODO Auto-generated method stub

    }

}

5. DateFmt代码

package demo;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;

public class DateFmt {

    public static final String date_long="yyyy-MM-dd HH:mm:ss";
    public static final String date_short="yyyy-MM-dd";
    
    public static SimpleDateFormat sdf=new SimpleDateFormat(date_short);
    public static String getCountDate(String date,String patton){
        SimpleDateFormat sdf=new SimpleDateFormat(patton);
        Calendar cal =Calendar.getInstance();
        if (date!=null){
            
            try {
                cal.setTime(sdf.parse(date));
            } catch (ParseException e) {
                
                e.printStackTrace();
            }
        }
        
        return sdf.format(cal.getTime());
        
    }
    
    public static Date parseDate(String dateStr) throws Exception{
        
        return sdf.parse(dateStr);
    }
    
    
    public static void main(String[] args) {
        
        System.out.println(DateFmt.getCountDate("2015-09-08 09:09:08 ", DateFmt.date_long));
    }
}

感谢各位的阅读,以上就是“storm怎么构建拓扑代码”的内容了,经过本文的学习后,相信大家对storm怎么构建拓扑代码这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!

内容来源网络,如有侵权,联系删除,本文地址:https://www.230890.com/zhan/156275.html

(0)

相关推荐

  • hadoop命令有哪些

    技术hadoop命令有哪些这篇文章主要为大家展示了“hadoop命令有哪些”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“hadoop命令有哪些”这篇文章吧。查看:hadoop

    攻略 2021年11月20日
  • ios怎么整个手机截屏(ios有哪几种截屏方式)

    技术IOS如何实现手机截屏小编给大家分享一下IOS如何实现手机截屏 ,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!IOS手机截屏 主要步骤1

    攻略 2021年12月24日
  • 美国服务器搭建游戏服务端有什么好USA-IDC

    技术美国服务器搭建游戏服务端有什么好USA-IDC通过服务器设置,您可以将其视为更像是为其玩家托管视频游戏的远程计算机。由于美国服务器不在游戏引擎上运行,因此它需要使用上述图形卡那样呈现任何内容。然而,它的作用是指示客户

    礼包 2021年12月23日
  • leetcode 数组出现最多的数(leetcode数组加减乘除)

    技术LeetCode如何调整数组顺序使得奇数位于偶数前面这篇文章主要介绍LeetCode如何调整数组顺序使得奇数位于偶数前面,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!题目:输入一个整数数组,实

    攻略 2021年12月15日
  • 盖的繁体字,情定豪气义盖天下繁体怎么写

    技术盖的繁体字,情定豪气义盖天下繁体怎么写“定”的繁体字就是“定”盖的繁体字。定,ding,从宀从正。正家而天下定。
    不动的,不变的:~额。~价。~律。~论。~期。~型。~义。~都(dū)。~稿。~数(shù)(a.规定

    生活 2021年10月21日
  • 配置化+Serverless 开发个人博客「附源码」

    技术配置化+Serverless 开发个人博客「附源码」 配置化+Serverless 开发个人博客「附源码」高清原画 链接: https://pan.baidu.com/s/1d6YONkCi4u7T

    礼包 2021年11月5日