资源描述
单击此处编辑母版标题样式,单击此处编辑母版文本样式,第二级,第三级,第四级,第五级,小象科技,让你的数据产生价值,LOGO,单击此处编辑母版标题样式,单击此处编辑母版文本样式,第二级,第三级,第四级,第五级,小象科技,让你的数据产生价值,单击此处编辑母版标题样式,单击此处编辑母版文本样式,第二级,第三级,第四级,第五级,小象科技,让你的数据产生价值,单击此处编辑母版标题样式,单击此处编辑母版文本样式,第二级,第三级,第四级,第五级,公开课主题:浅析,Storm,流式计算,主讲人:肖康,主要内容,Storm,简介,Storm,原理和架构,Storm,实战,2,Storm,简介,-,案例分析,统计某个服务被访问的客户端地域分布情况,日志中记录了客户端,IP,把,IP,转换成地域,按照地域进行统计,Storm,简介,-,案例分析,Hadoop,貌似就可以轻松搞定,日志存,HDFS,运行,MapReduce,程序,map,做,ip,提取,转换成地域,reduce,以地域为,key,聚合,计数统计,从,HDFS,取出结果,Storm,简介,-,案例分析,如果有时效性要求呢?,小时级:还行,每小时跑一个,MapReduce Job,10,分钟:还凑合能跑,5,分钟,:够呛了,等槽位可能要几分钟呢,1,分钟,:算了吧,启动,Job,就要几十秒呢,秒级,:,分析,MapReduce,不满足时效性要求的原因,一批数据启动一次,处理完进程停止,启动本身是需要时间的:输入切分、调度、起进程,共享集群,Job,比较杂,可能需要等待资源,所有数据都需要读写磁盘,Storm,简介,-,案例分析,解决方案,进程常驻运行,数据在内存中,Storm,正好适合这种需求,log,MQ,从,MQ,取日志,解析,ip,转成地域,内存累加计数定期输出,redis,redis,Storm,简介,-,是什么,Storm,是一个,分布式实时流式计算平台,分布式,水平扩展:通过加机器、提高并发数就提高处理能力,自动容错:自动处理进程、机器、网络异常,实时:数据不写磁盘,延迟低(毫秒级),流式:不断有数据流入、处理、流出,开源:,twitter,开源,社区很活跃,Storm,简介,和其他大数据计算平台,对比,Storm vs.MapReduce,常驻运行,流式处理:数据来一点处理一点,实时处理:数据在内存中不写磁盘,DAG,模型:可以组合多个阶段,Storm vs.queue+worker,系统,维护简单:无需维护,queue,,,queue,和,worker,对应关系,扩展简单:加机器,提高并发,重新提交,自动容错:进程、机器、网络异常,消息可重发,Storm,简介,-,典型应用场景,请求应答(同步),DRPC,实时图片处理,实时网页分析,流式处理(异步),逐条处理,数据之间无关系:如实时日志格式标准化入库,分析统计,数据之间有关系(聚合等):如日志,pv/uv,统计、访问热点统计,9,Client,DRPC,Server,Spout,Bolt,Return,图片,X,图片,X,图片,X,图片,Y,图片,Y,图片,Y,Client,MQ,Spout,Bolt1,Storage,N,行日志,N,行日志,N,行日志,Bolt2,ip,pv/uv,received,主要内容,Storm,简介,Storm,原理和架构,Storm,实战,10,Storm,原理和架构,-,计算模型,DAG,计算模型,Tuple,:数据处理单元,一个,Tuple,由多个,字段,组成,Stream,:持续的,Tuple,流,Spout,:,从外部获取数据,输出原始,Tuple,Bolt,:,接收,Spout/Bolt,输出的,Tuple,,处理,输出新,Tuple,11,Storm,原理和架构,-,计算模型,DAG,计算模型(续),Grouping,Tuple,从上游到某个下游多个并发,task,的分组方式,shuffleGrouping,:随机发给某个下游,task,fieldsGrouping,:按照某几个字段做,hash,取模,发给对应,task,allGrouping,:发给下游全部,task,Topology,一个应用的,spout,bolt,grouping,组合,Storm,原理和架构,-,架构,nimbus,:集群的,master,,负责管理,supervisor,、调度,topology,supervisor,:负责运行,topology,的,worker,worker,:负责实际的计算和网络通信,zookeeper,:负责存储以上模块的状态,做到高可用,13,nimbus,zookeeper,zookeeper,zookeeper,supervisor,supervisor,supervisor,supervisor,worker,worker,Storm,原理和架构,-,数据流程,executor,执行,spout.nextTuple(),或,bolt.execute(),,调用,emit,生成新的,tuple,,放到,executor,的,transfer queue,executor transfer thread,把自己,transfer queue,里面的,tuple,放到,worker transfer queue,worker transfer thread,把,transfer queue,里面的,tuple,序列化发送到远程的,worker,worker receive thread,分别从网络收数据,反序列化成,tuple,放到对应,executor,的,receive queue,executor receive thread,从自己的,receive queue,取出,tuple,,调用,bolt.execute(),14,主要内容,Storm,简介,Storm,原理和架构,Storm,实战,15,Storm,实战,-,集群部署,依赖包,java 6+,集群规划,1,个,nimbus+3/5,个,zookeeper+n,个,supervisor,16,Storm,实战,-,集群部署,部署步骤,修改,$STORM_HOME/conf/storm.yaml,配置,至少要修改两项,zk,地址:,nimbus,地址:,nimbus.host,启动,zookeeper,启动,drpc,:,$STORM_HOME/bin/storm drpc,启动,nimbus,:,$STORM_HOME/bin/storm nimbus,启动,ui,:,$STORM_HOME/bin/storm ui,启动,supervisor,:,$STORM_HOME/bin/storm supervisor,启动,logviewer,:,$STORM_HOME/bin/storm log viewer,在浏览器中打开,webui,查看集群状态,http:/nimbus.host:8080,更多参考,storm,官网的详细文档,17,Storm,实战,-,应用开发,API,Java API,Spout,nextTuple(),:回调函数,循环触发,ack(id),:回调函数,消息成功处理时触发,fail(id),:回调函数,消息超时时触发,Bolt,execute(Tuple input),:回调函数,数据触发,collector.emit(tuple),:通过,collector,向下游发送,tuple,collector.ack(tuple),:通过,collector,确认已经成处理输入,tuple,一定要用,Java,吗?,NO,ShellBolt,18,Storm,实战,-,应用开发,public class MD5Topology,public static class,MD5Bolt,extends BaseBasicBolt,Override,public void,execute,(Tuple tuple,BasicOutputCollector collector),String input=tuple.getString(0);,/,获取来自,DRPCSpout,的实际输入数据,String output=MD5Util.getMD5Str(input);,/,往下游,ReturnBolt emit,数据,/,第一个字段是计算的结果,这里是,md5,串,/,第二个字段是来自,DRPCSpout,的,return-info,,是一个,json,串,包括,drpc request id,,,server host,、,port,collector.emit(new Values(output,tuple.getString(1);,Override,public void,declareOutputFields,(OutputFieldsDeclarer declarer),declarer.declare(new Fields(result,return-info);,/,声明输出两个字段,和,emit,是对应的,public static void main(String args)throws Exception,TopologyBuilder builder=new TopologyBuilder();,builder.setSpout,(DRPCSpout,new DRPCSpout(args0),2);,builder.setBolt,(MD5Bolt,new MD5Bolt(),4),/,参数依次是,spout/bolt id,,,spout/bolt,对象,并发度,.shuffleGrouping(DRPCSpout);,/,指定上游以及,grouping,方式,builder.setBolt,(ReturnBolt,new ReturnResults(),2),.shuffleGrouping(MD5Bolt);,Config conf=new Config();,conf.setNumWorkers(4);,/,设置,worker,个数,StormSubmitter.submitTopology,(args0,conf,builder.createTopology();,19,Storm,实战,-,应用开发,编译打包成,md5.jar,提交,topology,$STORM_HOME/bin/storm jar md5.jar,MD5Topology md5,在,webui,查看,topology,状态,发送,drpc,请求,incubator-storm/storm-core/src/py/storm/,DistributedRPC-remote-h host:port-f execute,md5 abcd,20,联系我们:,新浪微博:,ChinaHadoop,微信公号:,ChinaHadoop,让你的数据产生价值!,
展开阅读全文