这篇文章主要讲解了"风暴怎么构建拓扑代码",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"风暴怎么构建拓扑代码"吧!
1.构建拓扑代码
packagedemo
导入回类型。暴风雨。拓扑结构。拓扑生成器;
导入回类型。暴风雨。元组。字段;
publicclassAreaAmtTopo {
publicationstativitmain(String[]args){ 0
TopologyBuilderbuilder=newTopologyBuilder();
builder.setSpout('spout '),new orders baseboout(kafkapproperties .Order_topic),5);
builder.setBolt('filter ',newAreaFilterBolt(),5).无序分组(' spout ');
builder.setBolt('areabolt ',newAreaAmtBolt(),2).fieldsGrouping('filter ',新字段(' area _ id ');
builder.setBolt('rsltbolt ',newAreaRsltBolt(),1).无序分组('区域螺栓');
}
}2.一级过滤螺栓
packagedemo
导入Java。乌提尔。地图;
导入回类型。暴风雨。任务。topologycontext
导入回类型。暴风雨。拓扑结构。basicoutputcollector
导入回类型。暴风雨。拓扑结构。ibasicbolt
导入回类型。暴风雨。拓扑结构。OutPutfields clarer
导入回类型。暴风雨。元组。字段;
导入回类型。暴风雨。元组。元组;
导入回类型。暴风雨。元组。价值观;
//一级的过滤螺栓
public class arefilterbolt implementsibasicbolt {
@覆盖
public void declareoutputfield(outputfield claredclarer){ 0
//TODOAuto-generatedmethodstub
庄家。声明(NewFields(' area _ id ',' order_amt ',' create _ time ');//元组里面每个价值的对应名字
}
@覆盖
nbsp; public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
@Override
public void cleanup() {
// TODO Auto-generated method stub
}
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
//order_id,order_amt,create_time,area_id
String order=input.getString(0);//取出集合values中的第一个value
if(order!=null){
String orderArr[]=order.split("\\t");
collector.emit(new Values(orderArr[3],orderArr[1],DateFmt.getCountDate(orderArr[2], DateFmt.date_short)));//area_id,order_amt,create_time
}
}
@Override
public void prepare(Map arg0, TopologyContext arg1) {
// TODO Auto-generated method stub
}
}
3.局部汇总bolt(按日期和区域和汇总)
package demo; import java.util.HashMap; import java.util.Map; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.IBasicBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; //局部汇总 public class AreaAmtBolt implements IBasicBolt { Map<String,Double> countsMap=null; @Override public void declareOutputFields( OutputFieldsDeclarer declarer) { declarer.declare(new Fields("date_area","amt")); } @Override public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } @Override public void prepare(Map paramMap, TopologyContext paramTopologyContext) { // TODO Auto-generated method stub countsMap =new HashMap<String, Double>(); } @Override public void execute(Tuple input, BasicOutputCollector collector) { if(input!=null)//如果spout端没数据就会发空值,所以要做判断再往下发 { String area_id=input.getString(0); Double order_amt=input.getDouble(1); String order_date=input.getStringByField("order_date"); Double count=countsMap.get(area_id+"_"+order_date); if (count==null){ count = 0.0; } count+=order_amt; countsMap.put(area_id+"_"+order_date,count); System.err.println("areaAmtBolt"+order_date+"_"+area_id+"="+count); collector.emit(new Values(area_id+"_"+order_date,count)); } } @Override public void cleanup() { countsMap.clear(); } }
4. 最终结果写入Hbase
package demo; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.IBasicBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; //结果定时写入hbase的bolt public class AreaRsltBolt implements IBasicBolt { Map<String,Double> countsMap=null; long beginTime=System.currentTimeMillis(); long endTime=0L; HBaseDao dao=null; @Override public void declareOutputFields( OutputFieldsDeclarer paramOutputFieldsDeclarer) { // TODO Auto-generated method stub } @Override public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } @Override public void prepare(Map paramMap, TopologyContext paramTopologyContext) { countsMap =new HashMap<String, Double>(); dao=new HBaseDAOImp(); } @Override public void execute(Tuple input, BasicOutputCollector paramBasicOutputCollector) { String date_areaid=input.getString(0); double order_amt=input.getDouble(1); countsMap.put(date_areaid,order_amt); endTime=System.currentTimeMillis(); if (endTime-beginTime>=5*1000){ for(String key:countsMap.keySet()){ //put into hbase //2014-05-05_1,amt dao.insert("area_order","cf","order_amt",countsMap.get(key)); System.err.println("rsltBolt put hbase: key="+key+"; order_amt="+countsMap.get(key)); } beginTime=System.currentTimeMillis(); } } @Override public void cleanup() { // TODO Auto-generated method stub } }
5. DateFmt代码
package demo; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; public class DateFmt { public static final String date_long="yyyy-MM-dd HH:mm:ss"; public static final String date_short="yyyy-MM-dd"; public static SimpleDateFormat sdf=new SimpleDateFormat(date_short); public static String getCountDate(String date,String patton){ SimpleDateFormat sdf=new SimpleDateFormat(patton); Calendar cal =Calendar.getInstance(); if (date!=null){ try { cal.setTime(sdf.parse(date)); } catch (ParseException e) { e.printStackTrace(); } } return sdf.format(cal.getTime()); } public static Date parseDate(String dateStr) throws Exception{ return sdf.parse(dateStr); } public static void main(String[] args) { System.out.println(DateFmt.getCountDate("2015-09-08 09:09:08 ", DateFmt.date_long)); } }
感谢各位的阅读,以上就是“storm怎么构建拓扑代码”的内容了,经过本文的学习后,相信大家对storm怎么构建拓扑代码这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!
内容来源网络,如有侵权,联系删除,本文地址:https://www.230890.com/zhan/156275.html