1.  构建拓扑代码

package demo;import backtype.storm.topology.TopologyBuilder;import backtype.storm.tuple.Fields;public class AreaAmtTopo {    public static void main(String[] args) {    TopologyBuilder builder = new TopologyBuilder();builder.setSpout("spout", new OrderBaseSpout(KafkaProperties.Order_topic),5);builder.setBolt("filter",new AreaFilterBolt(),5).shuffleGrouping("spout");builder.setBolt("areabolt",new AreaAmtBolt(),2).fieldsGrouping("filter",new Fields("area_id"));builder.setBolt("rsltbolt",new AreaRsltBolt(),1).shuffleGrouping("areabolt");    }}

2.一级过滤bolt

package demo;import java.util.Map;import backtype.storm.task.TopologyContext;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.IBasicBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;//一级的过滤boltpublic class AreaFilterBolt implements IBasicBolt {    @Override    public void declareOutputFields(OutputFieldsDeclarer declarer) {        // TODO Auto-generated method stub        declarer.declare(new Fields("area_id","order_amt","create_time"));//tuple里面每个value的对应name    }    @Override    public Map
 getComponentConfiguration() {        // TODO Auto-generated method stub        return null;    }    @Override    public void cleanup() {        // TODO Auto-generated method stub    }    @Override    public void execute(Tuple input, BasicOutputCollector collector) {        //order_id,order_amt,create_time,area_id        String order=input.getString(0);//取出集合values中的第一个value        if(order!=null){                        String orderArr[]=order.split("\\t");            collector.emit(new Values(orderArr[3],orderArr[1],DateFmt.getCountDate(orderArr[2], DateFmt.date_short)));//area_id,order_amt,create_time                    }     }    @Override    public void prepare(Map arg0, TopologyContext arg1) {        // TODO Auto-generated method stub    }}

3.局部汇总bolt(按日期和区域和汇总)

package demo;import java.util.HashMap;import java.util.Map;import backtype.storm.task.TopologyContext;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.IBasicBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;//局部汇总public class AreaAmtBolt implements IBasicBolt {        Map
 countsMap=null;    @Override    public void declareOutputFields(            OutputFieldsDeclarer declarer) {                declarer.declare(new Fields("date_area","amt"));    }    @Override    public Map
 getComponentConfiguration() {        // TODO Auto-generated method stub        return null;    }    @Override    public void prepare(Map paramMap, TopologyContext paramTopologyContext) {        // TODO Auto-generated method stub         countsMap =new HashMap
();    }    @Override    public void execute(Tuple input,            BasicOutputCollector collector) {                if(input!=null)//如果spout端没数据就会发空值,所以要做判断再往下发        {        String area_id=input.getString(0);        Double order_amt=input.getDouble(1);        String  order_date=input.getStringByField("order_date");                Double count=countsMap.get(area_id+"_"+order_date);        if (count==null){            count = 0.0;            }                count+=order_amt;        countsMap.put(area_id+"_"+order_date,count);        System.err.println("areaAmtBolt"+order_date+"_"+area_id+"="+count);        collector.emit(new Values(area_id+"_"+order_date,count));        }    }    @Override    public void cleanup() {        countsMap.clear();    }}

4. 最终结果写入Hbase

package demo;import java.util.HashMap;import java.util.HashSet;import java.util.Map;import java.util.Set;import backtype.storm.task.TopologyContext;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.IBasicBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Tuple;//结果定时写入hbase的boltpublic class AreaRsltBolt implements IBasicBolt {    Map
 countsMap=null;    long beginTime=System.currentTimeMillis();    long endTime=0L;    HBaseDao dao=null;    @Override    public void declareOutputFields(            OutputFieldsDeclarer paramOutputFieldsDeclarer) {        // TODO Auto-generated method stub    }    @Override    public Map
 getComponentConfiguration() {        // TODO Auto-generated method stub        return null;    }    @Override    public void prepare(Map paramMap, TopologyContext paramTopologyContext) {         countsMap =new HashMap
();         dao=new HBaseDAOImp();    }    @Override    public void execute(Tuple input,            BasicOutputCollector paramBasicOutputCollector) {        String date_areaid=input.getString(0);        double  order_amt=input.getDouble(1);         countsMap.put(date_areaid,order_amt);        endTime=System.currentTimeMillis();        if (endTime-beginTime>=5*1000){                   for(String key:countsMap.keySet()){              //put into hbase            //2014-05-05_1,amt              dao.insert("area_order","cf","order_amt",countsMap.get(key));              System.err.println("rsltBolt put hbase: key="+key+"; order_amt="+countsMap.get(key));            }            beginTime=System.currentTimeMillis();        }            }    @Override    public void cleanup() {        // TODO Auto-generated method stub    }}

5. DateFmt代码

package demo;import java.text.ParseException;import java.text.SimpleDateFormat;import java.util.Calendar;import java.util.Date;public class DateFmt {    public static final String date_long="yyyy-MM-dd HH:mm:ss";    public static final String date_short="yyyy-MM-dd";        public static SimpleDateFormat sdf=new SimpleDateFormat(date_short);    public static String getCountDate(String date,String patton){        SimpleDateFormat sdf=new SimpleDateFormat(patton);        Calendar cal =Calendar.getInstance();        if (date!=null){                        try {                cal.setTime(sdf.parse(date));            } catch (ParseException e) {                                e.printStackTrace();            }        }                return sdf.format(cal.getTime());            }        public static Date parseDate(String dateStr) throws Exception{                return sdf.parse(dateStr);    }            public static void main(String[] args) {                System.out.println(DateFmt.getCountDate("2015-09-08 09:09:08 ", DateFmt.date_long));    }}