Flink原理与实践-DataStream-API的介绍和使用ppt课件

上传人:29 文档编号:244353111 上传时间:2024-10-04 格式:PPTX 页数:39 大小:1.24MB
返回 下载 相关 举报
Flink原理与实践-DataStream-API的介绍和使用ppt课件_第1页
第1页 / 共39页
Flink原理与实践-DataStream-API的介绍和使用ppt课件_第2页
第2页 / 共39页
Flink原理与实践-DataStream-API的介绍和使用ppt课件_第3页
第3页 / 共39页
点击查看更多>>
资源描述
单击此处编辑母版标题样式,编辑母版文本样式,第二级,第三级,第四级,第五级,2020年11月23日 Monday,#,第四章,DataStream,API,的介绍和使用,Flink,程序的骨架结构,初始化运行环境,读取一到多个,Source,数据源,根据业务逻辑对数据流进行,Transformation,转换,将结果输出到,Sink,调用作业执行函数,执行环境是作业与集群交互的入口,设置并行度,关闭算子链,时间、,Checkpoint,流处理和批处理的执行环境不一样,Java,、,Scala,两套,API,设置执行环境,/,创建,Flink,执行环境,StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();,env.setParallelism(2),;,env.disableOperatorChaining();,Source,、,Transformation,和,Sink,Source,读取数据源统称为,Source,文件系统、消息队列、数据库等,Transformation,使用,Flink,提供的各类函数,进行有状态的计算,数据流的分组、窗口和聚合操作等,Sink,将计算结果输出到外部系统,统称为,Sink,目的地可以是文件系统、消息队列、数据库等,Flink,是延迟执行,(,Lazy Evaluation,),的,调用,execute(),方法,,Flink,才会真正执行,否则无法得到计算结果,字符串参数为当前作业名,执行,/ execute,env.execute(kafka streaming word count);,单数据流转换,基于,Key,的分组转换,多数据流转换,数据重分布转换,DataStream,泛型,T,为数据流中每个元素的类型,四类,Tranformation,转换,每个输入元素对应一个输出元素,重写,MapFunction,或,RichMapFunction,MapFunction,T,为输入类型,O,为输出类型,实现其中的,map(),虚方法,主逻辑中调用该函数,单数据流转换,-,map,FunctionalInterface,public,interface,MapFunction,extends,Function,Serializable,/,调用这个,API,就是继承并实现这个虚函数,O,map,(T value),throws,Exception,;,/,第一个泛型是输入类型,第二个泛型是输出类型,public,static,class,DoubleMapFunction,implements,MapFunction, ,Override,public,String,map,(Integer input),return,function input : ,+ input +, output : ,+ (input *,2,);,DataStream functionDataStream = dataStream.map(,new,DoubleMapFunction();,MapFunction,源代码,一个,MapFunction,的实现,直接继承接口类并实现,map,虚方法,上页所示,使用匿名类,使用,Lambda,表达式,单数据流转换,-,map,/,匿名类,DataStream anonymousDataStream = dataStream.map(,new,MapFunction() ,Override,public,String,map,(Integer input),throws,Exception,return,anonymous function input : ,+ input +, output : ,+ (input *,2,);,);,/,使用,Lambda,表达式,DataStream lambdaStream = dataStream,.map(input -,lambda input : ,+ input +, output : ,+ (input *,2,);,匿名类实现,MapFunction,Lambda,表达式实现,MapFunction,对输入元素进行过滤,继承并实现,FilterFunction,或,RichFilterFunction,重写,filter,虚方法,True,保留,False,过滤,单数据流转换,-,filter,DataStream dataStream = senv.fromElements(,1,2, -,3,0,5, -,9,8,);,/,使用,-,构造,Lambda,表达式,DataStream lambda = dataStream.filter ( input - input ,0,);,public,static,class,MyFilterFunction,extends,RichFilterFunction, ,/ limit,参数可以从外部传入,private,Integer limit;,public,MyFilterFunction,(Integer limit),this,.limit = limit; ,Override,public,boolean,filter,(Integer input),return,input ,this,.limit;,Lambda,表达式实现,FilterFunction,实现,FilterFunction,与,map(),相似,输出零个、一个或多个元素,可对列表结果展平,单数据流转换,-,flatMap,苹果,梨,香蕉,.map(,去皮,),去皮,苹果,,去皮,梨,,去皮,香蕉,map,flatMap,苹果,梨,香蕉,.flatMap(,切碎,),苹果碎片,1,苹果碎片,2, ,梨碎片,1,,梨碎片,2,梨碎片,3,,,香蕉碎片,1,苹果碎片,1,苹果碎片,2,梨碎片,1,,梨碎片,2,梨碎片,3,,香蕉碎片,1,使用,Lambda,表达式,Collector,用来收集元素,flatMap(),虚方法中不使用,return,返回数据,使用,Collector,收集返回数据,Collector,中的泛型,String,为返回数据类型,将,flatMap(),看做,map(),和,filter(),更一般的形式,map(),和,filter(),的语义更明确,单数据流转换,-,flatMap,DataStream dataStream = senv.fromElements(,Hello World,Hello this is Flink,);,/ split,函数的输入为,Hello World,输出为,Hello,和,World,组成的列表,Hello, World,/ flatMap,将列表中每个元素提取出来,/,最后输出为,Hello, World, Hello, this, is, Flink,DataStream words = dataStream.flatMap (,(String input, Collector collector) - ,for,(String word : input.split(, ,) ,collector.collect(word);,).returns(Types.STRING);,数据分组后可进行聚合操作,keyBy(),将一个,DataStream,转化为一个,KeyedStream,聚合操作将,KeyedStream,转化为,DataStream,KeyedStream,继承自,DataStream,基于,Key,的分组转换,根据某种属性或数据的某些字段对数据进行分组,对一个分组内的数据进行处理,股票:相同股票代号的数据分组到一起,相同,Key,的数据被分配到同一算子实例上,需要指定,Key,数字位置,字段名,KeySelector,基于,Key,的分组转换,-,keyBy,DataStream dataStream = senv.fromElements(,Tuple2.of(,1,1.0,), Tuple2.of(,2,3.2,),Tuple2.of(,1,5.5,), Tuple2.of(,3,10.0,), Tuple2.of(,3,12.5,);,/,使用数字位置定义,Key,按照第一个字段进行分组,DataStream keyedStream = dataStream.keyBy(,0,).sum(,1,);,KeySelector,重写,getKey(),方法,单数据流转换,-,keyBy,/ IN,为数据流元素,,KEY,为所选择的,Key,FunctionalInterface,public,interface,KeySelector,extends,Function,Serializable,/,选择一个字段作为,Key,KEY,getKey,(IN value),throws,Exception,;,public,class,Word,public,String word;,public,int,count;,/,使用,KeySelector,DataStream keySelectorStream = wordStream.keyBy(,new,KeySelector () ,Override,public,String,getKey,(Word in),return,in.word;,).sum(,count,);,KeySelector,源码,一个,KeySelector,的实现,sum(),、,max(),、,min(),等,指定字段,对该字段进行聚合,KeySelector,流数据上的聚合,实时,不断输出到下游,状态存储中间数据,单数据流转换,Aggregations,将某个字段加和,结果保存到该字段上,不关心其他字段的计算结果,单数据流转换,sum,DataStream tupleStream =,senv.fromElements(,Tuple3.of(,0,0,0,), Tuple3.of(,0,1,1,),Tuple3.of(,0,2,2,), Tuple3.of(,1,0,6,),Tuple3.of(,1,1,7,), Tuple3.of(,1,0,8,);,/,按第一个字段分组,对第二个字段求和,打印出来的结果如下:,/ (0,0,0),/ (0,1,0),/ (0,3,0),/ (1,0,6),/ (1,1,6),/ (1,1,6),DataStream sumStream = tupleStream.keyBy(,0,).sum(,1,);,max(),对该字段求最大值,结果保存到该字段上,不保证其他字段的计算结果,maxBy(),对该字段求最大值,其他字段保留最大值元素的值,单数据流转换,max,/,maxBy,DataStream tupleStream =,senv.fromElements(,Tuple3.of(,0,0,0,), Tuple3.of(,0,1,1,),Tuple3.of(,0,2,2,), Tuple3.of(,1,0,6,),Tuple3.of(,1,1,7,), Tuple3.of(,1,0,8,);,/,按第一个字段分组,对第三个字段求最大值,max,,打印出来的结果如下:,/ (0,0,0),/ (0,0,1),/ (0,0,2),/ (1,0,6),/ (1,0,7),/ (1,0,8),DataStream maxStream = tupleStream.keyBy(,0,).max(,2,);,/,按第一个字段分组,对第三个字段求最大值,maxBy,,打印出来的结果如下:,/ (0,0,0),/ (0,1,1),/ (0,2,2),/ (1,0,6),/ (1,1,7),/ (1,0,8),DataStream maxByStream = tupleStream.keyBy(,0,).maxBy(,2,);,比,Aggregation,更通用,在,KeyedStream,上生效,接受两个输入,生成一个输出,两两合一地汇总操作,基于,Key,的分组转换,-,reduce,实现,ReduceFunction,基于,Key,的分组转换,-,reduce,public,static,class,MyReduceFunction,implements,ReduceFunction,Override,public,Score,reduce,(Score s1, Score s2),return,Score.of(s1.name,Sum, s1.score + s2.score);,DataStream dataStream = senv.fromElements(,Score.of(,Li,English,90,), Score.of(,Wang,English,88,),Score.of(,Li,Math,85,), Score.of(,Wang,Math,92,),Score.of(,Liu,Math,91,), Score.of(,Liu,English,87,);,/,实现,ReduceFunction,DataStream sumReduceFunctionStream = dataStream,.keyBy(,name,),.reduce(,new,MyReduceFunction();,/,使用,Lambda,表达式,DataStream sumLambdaStream = dataStream,.keyBy(,name,),.reduce(s1, s2) - Score.of(s1.name,Sum, s1.score + s2.score);,将多个同类型的,DataStream,合并为一个,DataStream,数据按照先进先出(,FIFO,)合并,多数据流转换,-,union,DataStream shenzhenStockStream = .,DataStream hongkongStockStream = .,DataStream shanghaiStockStream = .,DataStream unionStockStream = shenzhenStockStream.union(hongkongStockStream, shanghaiStockStream);,只能连接两个,DataStream,数据流,两个数据流类型可以不一致,两个,DataStream,经过,connect(),之后转化为,ConnectedStreams,,,ConnectedStreams,会对两个流的数据应用不同的处理方法,且双流之间可以共享状态,应用场景为:使用一个控制流对另一个数据流进行控制,多数据流转换,-,connect,重写,CoMapFunction,或,CoFlatMapFunction,三个泛型,分别对应第一个输入流的数据类型、第二个输入流的数据类型和输出流的数据类型,对于,CoFlatMapFunction,,,flatMap1(),方法处理第一个流的数据,,flatMap2(),方法处理第二个流的数据,可以做到类似,SQL,Join,的效果,多数据流转换,-,connect,/ IN1,为第一个输入流的数据类型,/ IN2,为第二个输入流的数据类型,/ OUT,为输出类型,public,interface,CoFlatMapFunction,extends,Function,Serializable,/,处理第一个流的数据,void,flatMap1,(IN1 value, Collector out),throws,Exception,;,/,处理第二个流的数据,void,flatMap2,(IN2 value, Collector out),throws,Exception,;,/ CoMapFunction,三个泛型分别对应第一个流的输入、第二个流的输入,,map,之后的输出,public,static,class,MyCoMapFunction,implements,CoMapFunction, ,Override,public,String,map1,(Integer input1),return,input1.toString();,Override,public,String,map2,(String input2),return,input2;,CoFlatMapFunction,源代码,一个,CoFlatMapFunction,实现,并行度,逻辑视图中的算子被切分为多个算子子任务,每个算子子任务处理一部分数据,可以在整个作业的执行环境层面设置,也可以对某个算子单独设置,并行度,StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();,/,获取当前执行环境的默认并行度,int,defaultParalleism = senv.getParallelism();,/,设置所有算子的并行度为,4,,表示所有算子的并行执行的实例数为,4,senv.setParallelism(,4,);,在执行环境中设置并行度:,对某个算子单独设置:,dataStream.map(,new,MyMapper().setParallelism(defaultParallelism *,2,);,默认情况下,数据自动分布到多个实例(或者称之为分区)上,手动在多个实例上进行数据分配,避免数据倾斜,输入是,DataStream,,输出也是,DataStream,数据重分布,dataStream.shuffle();,基于正态分布,将数据随机分配到下游各算子实例上,:,dataStream.broadcast();,数据会被复制并广播发送给下游的所有实例上,:,dataStream.global();,将所有数据发送给下游算子的第一个实例上,:,rebalance(),使用,Round-Ribon,思想将数据均匀分配到各实例上,rescale(),就近发送给下游每个实例,数据重分布,rebalance(),将数据轮询式地分布到下游子任务上,当上游有,2,个子任务、下游有,4,个子任务时使用,rescale(),partitionCustom(),自定义数据重分布逻辑,PartitionerK,中泛型,K,为根据哪个字段进行分区,对一个,Score,类型数据流重分布,希望按照,id,均匀分配到下游各实例,那么泛型,K,就为,id,的数据类型,Long,重写,partition(),方法,数据重分布,FunctionalInterface,public,interface,Partitioner,extends,java,.,io,.,Serializable,Function,/,根据,key,决定该数据分配到下游第几个分区(实例),int,partition,(K key,int,numPartitions),;,/*,* Partitioner,其中泛型,T,为指定的字段类型,* 重写,partiton,函数,并根据,T,字段对数据流中的所有元素进行数据重分配,* *,/,public,static,class,MyPartitioner,implements,Partitioner,private,Random rand =,new,Random();,private,Pattern pattern = Ppile(,.*d+.*,);,/*,* key,泛型,T,即根据哪个字段进行数据重分配,本例中是,Tuple2(Int, String),中的,String,* numPartitons,为当前有多少个并行实例,* 函数返回值是一个,Int,为该元素将被发送给下游第几个实例,* *,/,Override,public,int,partition,(String key,int,numPartitions),int,randomNum = rand.nextInt(numPartitions /,2,);,Matcher m = pattern.matcher(key);,if,(m.matches() ,return,randomNum; ,else,return,randomNum + numPartitions /,2,; ,/,对,(Int, String),中的第二个字段使用,MyPartitioner,中的重分布逻辑,DataStream partitioned = dataStream.partitionCustom(,new,MyPartitioner(),1,);,Partitioner,源码,一个,Partitioner,的实现,数据传输、持久化,序列化:将内存对象转换成二进制串、网络可传输或可持久化,反序列化:将二进制串转换为内存对象,可直接在编程语言中读写和操作,常见序列化方式:,JSON,Java,、,Kryo,、,Avro,、,Thrift,、,Protobuf,Flink,开发了自己的序列化框架,更早地完成类型检查,节省数据存储空间,序列化和反序列化,基础类型,Java,、,Scala,基础数据类型,数组,复合类型,Scala,case,class,Java,POJO,Tuple,辅助类型,Option,、,List,、,Map,泛型和其他类型,Generic,Flink,支持的数据类型,TypeInformaton,用来表示数据类型,创建序列化器,每种数据类型都对应一个,TypeInfomation,TupleTypeInfo,、,PojoTypeInfo,TypeInformation,Flink,会自动推断类型,调用对应的序列化器,对数据进行序列化和反序列化,类型推断和序列化,package,mon.typeinfo;,public,class,Types,/ java.lang.Void,public,static,final,TypeInformation VOID = BasicTypeInfo.VOID_TYPE_INFO;,/ java.lang.String,public,static,final,TypeInformation STRING = BasicTypeInfo.STRING_TYPE_INFO;,/ java.lang.Boolean,public,static,final,TypeInformation BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO;,/ java.lang.Integer,public,static,final,TypeInformation INT = BasicTypeInfo.INT_TYPE_INFO;,/ java.lang.Long,public,static,final,TypeInformation LONG = BasicTypeInfo.LONG_TYPE_INFO;,.,一些基础类型的,TypeInformation,:,Types.STRING,是用来表示,java.lang.String,的,TypeInformation,Types.STRING,被定义为,BasicTypeInfo.STRING_TYPE_INFO,STRING_TYPE_INFO,:使用何种序列化器和比较器,类型推断和序列化,public,static,final,BasicTypeInfo,STRING_TYPE_INFO =,new,BasicTypeInfo(,String.class,new,Class,StringSerializer.INSTANCE,StringComparator.class);,STRING_TYPE_INFO,定义使用何种序列化器和比较器:,在声明式文件中定义,Schema,使用工具将,Schema,转换为,Java,可用的类,Avro,Specific,生成的类与,POJO,类似,有,getter,、,setter,方法,在,Flink,中可以像使用,POJO,一样使用,Avro,Specific,模式,Avro,Generic,不生成具体的类,用,GenericRecord,封装所有用户定义的数据结构,必须给,Flink,提供,Schema,信息,Avro,namespace,:,org.apache.flink.tutorials.avro,type,:,record,name,:,MyPojo,fields,: ,name,:,id,type,:,int,name,:,name,type,:,string,Avro,声明式文件:,Kryo,是大数据领域经常使用的序列化框架,Flink,无法推断出数据类型时,将该数据类型定义为,GenericTypeInfo,,使用,Kryo,作为后备选项进行序列化,最好实现自己的序列化器,并对数据类型和序列化器进行注册,Kryo,在有些场景效率不高,env.getConfig.disableGenericTypes(),禁用,Kryo,,可以定位到具体哪个类型无法被,Flink,自动推断,然后针对该类型创建更高效的序列化器,Kryo,注册数据类型和序列化器:,/,将,MyCustomType,类进行注册,env.getConfig().registerKryoType(MyCustomType.class);,/,或者使用下面的方式并且实现自定义序列化器,env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, MyCustomSerializer.class);,static,class,MyClassSerializer,extends,Serializer,implements,Serializable,private,static,final,long,serialVersionUID = .,Override,public,void,write,(Kryo kryo, Output output, MyCustomType myCustomType), . ,Override,public,MyCustomType,read,(Kryo kryo, Input input, Class type), . ,与,Avro,Specific,模式相似,使用声明式语言定义,Schema,,使用工具将声明式语言转化为,Java,类,有人已经实现好,Kryo,的序列化器,案例:,MyCustomType,是使用,Thrift,工具生成的,Java,类,,TBaseSerializer,是,com.twitter:chill-thrift,包中别人实现好的序列化器,该序列化器基于,Kryo,的,Serializer,。,注意在,pom.xml,中添加相应的依赖,Thrift,、,Protobuf,/ Google Protobuf,/ MyCustomType,类是使用,Protobuf,生成的,Java,类,/ ProtobufSerializer,是别人实现好的序列化器,env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, ProtobufSerializer.class);,/ Apache Thrift,/ MyCustomType,是使用,Thrift,生成的,Java,类,/ TBaseSerializer,是别人实现好的序列化器,env.getConfig().addDefaultKryoSerializer(MyCustomType.class, TBaseSerializer.class);,Flink,的数据类型:,Java,、,Scala,、,Table,API,分别有自己的数据类型体系,绝大多数情况下,程序员不需要关心使用何种,TypeInformation,,只需要使用自己所需的数据类型,Flink,会做类型推断、选择对应的序列化器,当自动类型推断失效,用户需要关注,TypeInformation,数据类型选择:,需要考虑:上下游的数据结构、序列化器的性能、状态数据的持续迭代能力,POJO,和,Tuple,等内置类型性能更好,Avro,、,Thrift,和,Protobuf,对上下游数据的兼容性更好,不需要在,Flink,应用中重新设计一套,POJO,POJO,和,Avro,对,Flink,状态数据的持续迭代更友好,数据类型小结,用户自定义函数的三种方式:,继承并实现函数类,使用,Lambda,表达式,继承并实现,Rich,函数类,用户自定义函数,对于,map(),、,flatMap(),、,reduce(),等函数,我们可以实现,MapFunction,、,FlatMapFunction,、,ReduceFunction,等,interface,接口。,以,FlatMapFunction,函数式接口为例:,继承了,Flink,的,Function,函数式接口,函数在运行过程中要发送到各个实例上,发送前后要进行序列化和反序列化,一定要保证函数内的所有内容都可以被序列化,两个泛型,T,和,O,,,T,是输入,,O,是输出,要设置好输入和输出数据类型,否则会报错,重写虚方法,flatMap(),Collector,收集输出数据,函数类,package,mon.functions;,FunctionalInterface,public interface FlatMapFunction,extends,Function, Serializable,void flatMap(T value, Collector out),throws,Exception;,/,使用,FlatMapFunction,实现过滤逻辑,只对字符串长度大于,limit,的内容进行词频统计,public static,class,WordSplitFlatMap,implements,FlatMapFunction,private,Integer limit;,public WordSplitFlatMap(Integer limit) ,this,.limit = limit; ,Override,public void flatMap(String input, Collector collector),throws,Exception,if,(input.length() limit),for,(String word: input.split(, ,),collector.collect(word);,DataStream dataStream = senv.fromElements(,Hello World,Hello this is Flink,);,DataStream functionStream = dataStream.flatMap(,new,WordSplitFlatMap(,10,);,FlatMapFunction,源码,一个,FlatMapFunction,实现,简洁紧凑,Scala,对,Lambda,表达式支持更好,Java,8,之后也开始支持,Lambda,表达式,有类型擦除问题,使用,returns,提供类型信息,Lambda,表达式,DataStream words = dataStream.flatMap (,(String input, Collector collector) -,for,(String word : input.split(, ,),collector.collect(word);,),/,提供类型信息以解决类型擦除问题,.returns(Types.STRING);,val,lambda = dataStream.flatMap,(value: String, out: CollectorString) =,if,(value.size ,10,),value.split(, ,).foreach(out.collect),Scala,:,Java,:,RichMapFunction,、,RichFlatMapFunction,、,RichReduceFunction,增加了更多功能:,open(),方法:初始化,close(),方法:算子最后执行这个方法,可以释放一些资源,getRuntimeContext(),方法:获取算子子任务的运行时上下文,累加器例子:分布式计算环境下,计算是分布在多台节点上的,每个节点处理一部分数据,使用,for,循环无法满足累加器功能,Rich,函数类,/,实现,RichFlatMapFunction,类,/,添加了累加器,Accumulator,public,static,class,WordSplitRichFlatMap,extends,RichFlatMapFunction,private,int,limit;,/,创建一个累加器,private,IntCounter numOfLines =,new,IntCounter(,0,);,public,WordSplitRichFlatMap,(Integer limit),this,.limit = limit; ,Override,public,void,open,(Configuration parameters),throws,Exception,super,.open(parameters);,/,在,RuntimeContext,中注册累加器,getRuntimeContext().addAccumulator(,num-of-lines,this,.numOfLines);,Override,public,void,flatMap,(String input, Collector collector),throws,Exception,/,运行过程中调用累加器,this,.numOfLines.add(,1,);,if,(input.length() limit),for,(String word: input.split(, ,),collector.collect(word);,
展开阅读全文
相关资源
正为您匹配相似的精品文档
相关搜索

最新文档


当前位置:首页 > 办公文档 > 教学培训


copyright@ 2023-2025  zhuangpeitu.com 装配图网版权所有   联系电话:18123376007

备案号:ICP2024067431-1 川公网安备51140202000466号


本站为文档C2C交易模式,即用户上传的文档直接被用户下载,本站只是中间服务平台,本站所有文档下载所得的收益归上传人(含作者)所有。装配图网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对上载内容本身不做任何修改或编辑。若文档所含内容侵犯了您的版权或隐私,请立即通知装配图网,我们立即给予删除!