Hadoop平台简介-肖韬南京大学计算机系.ppt

上传人:tia****nde 文档编号:12706629 上传时间:2020-05-14 格式:PPT 页数:44 大小:1.32MB
返回 下载 相关 举报
Hadoop平台简介-肖韬南京大学计算机系.ppt_第1页
第1页 / 共44页
Hadoop平台简介-肖韬南京大学计算机系.ppt_第2页
第2页 / 共44页
Hadoop平台简介-肖韬南京大学计算机系.ppt_第3页
第3页 / 共44页
点击查看更多>>
资源描述
Hadoop平台简介,肖韬南京大学计算机科学与技术系2010,使用Hadoop的JavaAPI接口,在Hadoop文件系统中的文件是由一个HadoopPath对象来表示的,可以把一个Path对象想象成一个Hadoop文件系统的URI,例如hdfs:/localhost:9000/user/xt/input/text.dat,通过2个静态工厂方法从抽象的Hadoop文件系统中抽取出一个具体的实现的实例。publicstaticFileSystemget(Configurationconf)throwsIOException;返回默认的文件系统(在conf/core-site.xml中指定),或者本地的文件系统(如果该文件中没有指定)publicstaticFileSystemget(URIuri,Configurationconf)throwsIOException;返回由uri决定的文件系统,或者默认的文件系统(如果uri无效),新旧API变化的对比,以0.20.0版本为分水岭,有一些API在新的版本中被舍弃了,且推荐不使用,而是改为使用新的API下面将以WordCount程序为例进行说明,0.20.0之前的WordCount程序,publicWordCountpublicstaticvoidmain(Stringargs)throwsThrowableJobConfconf=newJobConf(WordCount.class);conf.setJobName(“ASampleWordCountExample”);FileInputFormat.addInputPath(conf,newPath(args0);FileInputFormat.setOutputPath(conf,newPath(args1);conf.setMapperClass(WordCountMapper.class);conf.setReducerClass(WordCountReducer.class);conf.setOutputKeyClass(Text.class);conf.setOutputValueClass(IntWritable.class);JobClient.runJob(conf);,classWordCountMapperextendsMapReduceBaseimplementsMapperpublicvoidmap(LongWritableoffset,Textline,OutputCollectorcollector,Reporterreporter)throwsIOExceptionStringTokenizertokenzier=newStringTokenizer(line.toString();while(tokenizer.hasMoreTokens()collector.collect(newText(tokenizer.nextToken(),newIntWritable(1);,classWordCountReducerextendsMapReduceBaseimplementsReducerpublicvoidreduce(Textword,Iteratorcounts,OutputCollectorcollector,Reporterreporter)throwsIOExceptionintsum=0;while(counts.hasNext()sum+=counts.next().get();collector.collect(word,newIntWritable(sum);,0.20.0之后的WordCount程序,publicclassWordCountpublicstaticvoidmain(Stringargs)throwExceptionConfigurationconf=newConfiguration();Jobjob=newJob(conf,“ASampleWordCountExample”);job.setJarByClass(WordCount.class);job.setMapperClass(WordCountMapper.class);job.setReducerClass(WordCountReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job,newPath(args0);FileOutputFormat.setOutputPath(job,newPath(args1);job.waitForCompletion(true);,classWordCountMapperextendsMapperpublicvoidmap(LongWritableoffset,Textline,Contextcontext)throwsIOException,InterruptedExceptionStringTokenizertokenizer=newStringTokenizer(line.toString();while(tokenizer.hasMoreTokens()context.write(newText(tokenizer.nextToken(),newIntWritable(1);,classWordCountReducerextendsReducerpublicvoidreduce(Textword,Iteratorcounts,Contextcontext)throwsIOException,InterruptedExceptionintsum=0;while(counts.hasNext()sum+=counts.next().get();context.write(word,newIntWritable(sum);,ShuffleandSort,MapReduce保证每一个reducetask的输入基于key排序的。(MapReducemakestheguaranteethattheinputtoeveryreducerissortedbykey).系统进行排序的过程(包括将map的输出转换为reduce的输入)被称为shuffle.Theprocessbywhichthesystemperformsthesortandtransfersthemapoutputstothereducerasinputsisknownastheshuffle.,shuffle过程maptask中生成了3个spillfile,每个spillfile中有3个partition,shuffle过程:maptaskside,当一个maptask开始产生它的输出时,输出并非不经处理被直接就写到磁盘上去的。每一个maptask都有一个circularmemorybuffer,缺省大小为100MB,maptask会将它产生的输出(key-valuepairs)写入到它的memorybuffer中去。当maptask写入到memorybuffer的数据占memorybuffer的大小百分比到达一个阈值(缺省为80%)时,一个backgroundthread(记为thread)将开始把memorybuffer中的内容spill到磁盘上去。在thread将memorybuffer中的数据spill到磁盘中之前,thread首先将这些数据分成若干partition,每一个partition将被发送至一个reducer。,在每一个partition内,thread将根据key对该partition内的数据(即key-valuepairs)进行in-memorysort,如果指定了combinerfunction,那么该combinerfunction将会被作用于上述in-memorysort的输出。每当memorybuffer中的数据达到一个阈值时,就会产生一个spillfile,所以在maptask输出了所有的record之后,就会存在多个spillfiles.(1个record即1个key-valuepair)在maptask结束之前,所有的spillfiles将被merge到一个单独的outputfile中,该outputfile在结构上由多个partition组成,每一个partition内的数据都是排好序的,且每一个partition将被送至对应的一个reducetask.如果指定了combinerfunction并且spill的数量不低于3个,那么在生成outputfile之前,combinerfunction将会作用于将要被写入到outputfile里的每一个partition内的数据。,reducetaskside,maptask的输出存储在maptask节点所在机器的本地文件系统中,reducetask会自己所需的某个partition数据复制到自己所在的HDFS中,且一个reducetask将会从多个maptask复制其所需要的partition(这些partition都是同一类的)。reducer怎样知道从哪些maptasktracker那里去取自己所需要的partition(亦即maptask的输出)?当maptask成功完成后,它会将状态更新通知它所属的tasktracker,该tasktracker进而又会通知其所属的jobtracker。这些通知是通过heartbeat通信机制实现的。这样,对于一个job而言,jobtracker知道mapoutput与tasktracker之间的映射关系。reducer中的一个线程会周期性地向jobtracker询问mapoutput所在的位置,直到该reducer接收了所有的mapoutput.,combinerfunction与partitionerfunction,当存在多个reducer时,maptasks将会对它们的输出进行partition,每一个masktask都会为每一个reducetask生成一个partition.在每一个partition内都可能会有很多keys(以及相应的values),但是对于任一个key而言,它的records都在一个partition内。partition的过程可以由用户定义的partitioning函数来控制,但是一般来说,默认的partitioner函数(根据key进行hash映射)已经可以令人满意。,存在多个reducetask时的partitioningpartition的数量与reducer的数量是一致的,定制个性化的partitioner,自定义的partitionerfunction需要继承于一个抽象类Partitionercontrolsthepartitioninigofthekeysoftheintermediatemap-outputs.Thekey(orasubsetofkey)isusedtoderivethepartition,typicallybyahashfunction.Thiscontrolswhichofthemreducetaskstheintermediatekey(andhencetherecord)issentforreduction.,实现Partitioner中的getPartition函数原型abstractintgetPartition(KEYkey,VALUEvalue,intnumPatitions);其中,key和value是mapper输出的intermediateoutput。例如,在WordCount例子中就分别是word与1。numPartitions是reducers的数量。返回值是该record将被发送至的reducer的编号(0,1,m-1),指定多个reducers,bin/hadoopjarWordCountDmapred.reduce.tasks=3inputoutput这样,在reduce阶段会有3个reducetasks运行。,speculativeexecution(默认打开),当多个task并行运行时,可能若干个task运行明显比其他task要慢。这种情况下,Hadoop将会为这些运行较慢的task启动一个相同的backuptask,称为speculativeexecution.一个task及其speculativetask不会同时运行,以避免竞争。在一个job的所有task都已经启动的情况下,对于那些同时满足1)已经运行了一段时间(至少1分钟)2)运行的速度明显慢于其余task的平均速度的task,一个speculativetask才会被启动。对于originaltask及其speculativetask而言,如果任何一方先运行结束,则另一方将被killed.,Skippingbadrecords,当一个task失败时(原因可能是硬件故障、待处理数据非法等),该task将会被retried,但是如果该task失败的次数达到4次,那么该task所属的整个job就将被标记为failed。当maptask读到一个badrecord时,可能会因为抛出异常而失败,进而整个job可能会失败。有时,第三方的库可能有bug,导致task因读取了某个badrecord而失败,而这个第三方的库又无法修改。这时,可以使用Hadoop的skipmode,以使得读取输入文件使自动地跳过badrecords.,在打开了skippingmode之后,task会将其所处理的records报告给tasktracker。当task失败时,tasktracker会retry该task,并跳过引起失败的records。为了减少skippingmode带来的带宽及记账信息(bookkeeping)的消耗,当一个task失败达到2次时,才会开启skippingmode。,如果一个task因为某个badrecord而持续地失败,那么tasktracker将会以下列的结果执行taskattempts:task失败.task再次失败.skippingmode被打开.task仍然失败,但是badrecord被tasktracker记录下来.skippingmode处于使能状态,task因为跳过了前面导致失败的badrecord而成功.skippingmode是默认关闭的。注意,对于每一个taskattempt,skippingmode只能发现一个badrecord.,Taskside-effectfiles,要保证一个task的多个instance不会试图向同一个文件进行写操作:1)如果某个task失败了(失败前已经向输出文件中写了一部分数据),那么当其再次运行(retry)时,必须先将旧的文件删掉。2)当speculativeexecution被使能时,某个originaltask与它的speculativetask可能会试图向同一个文件进行写操作。Hadoop为每一个taskattempt指定了一个临时目录,每一个taskattempt的输出就会被写到这个目录中去,从而避免了上述的问题。这个目录就是$mapred.output.dir,InputFormat,map:(k1,v1)list(k2,v2)combine:(k2,list(v2)list(k2,v2)reduce:(k2,list(v2)list(k3,v3)可以看出,如果使用combiner,那么它的输入/输出格式与reducer是完全一样的(同时也是Reducer的子类),只不过combiner的输出是intermediatekey-valuepairs(这将是reducer的输入)。,Inputtypes由Inputformat决定,例如TextInputFormat决定了输入的key的类型是LongWritable(首字符在文件中的偏移量),value的类型是Text(一行文本内容).如果希望产生其他类型的输入,可以显式地调用JobConf的方法。否则,若不显式地(setexplicitly)设置,则不论是否使用combiner,intermediatetypes默认与最终的输出类型相同(即LongWritable与Text)。所以,若k2和k3相同,则不需要调用setMapKeyOutputClass(),因为intermediatekeytype已经被setOutputKeyClass()设置好了。同理,若v2和v3相同,则只需要调用setOutputValueClass()即可。,为什么要为intermediatekey-valuepairs和最终的output指定类型?,似乎通过mapper与reducer就可以确定intermediatekey-valuepairs和最终的output的类型了。原因:Java的泛型机制中的typeerasure使得这些类型信息在运行时是不可知的,所以必须显式地为Hadoop指定这些类型。,InputFormatclasshierarchy,InputSplit,什么是inputsplit?1个inputsplit是inputfile中的1个chunk,该chunk将被1个单独的map进行处理。每一个map处理一个inputsplit.每一个split可被划分为若干records,1个record即1个key-valuepair,map依次处理每一个record.Inputsplit由一个Java抽象类代表,即org.apache.hadoop.mapreduce;abstractclassInputSplit,InputSplitrepresentsthedatatobeprocessedbyanindividualmapper.Typically,itpresentsabyte-orientedviewontheinputandistheresponsibilityofRecordReaderofthejobtoprocessthisandpresentarecord-orientedview.注意,InputSplit并不包含inputdata,而只是指向inputdata的一个reference。Map/Reduce系统利用getLocations()所得到的storagelocations信息来将maptasks放置在尽可能靠近inputsplit数据的地方;利用getLength()得到的size信息对splits进行排序,使得最大的spilt先被处理,试图来最小化job的运行时间。,Inputfile,inputsplitandrecord,inputfile,Inputsplit,record,key-valuepair,MapReduce应用程序开发者不需要直接处理InputSplit,因为它是由一个InputFormat生成的。InputFormat负责生成inputsplits,并把它们划分为records.,0.20.0之前的定义如下publicinterfaceInputFormatInputSplitgetSplits(JobConfjob,intnumSplits)throwsIOException;RecordReadergetRecordReader(InputSplitsplit,JobConfjob,Reporterreporter)throwsIOException;其实跟新的还是很类似的。,对于旧版InputFormat的解释,TheJobClientcallsthegetSplits()method,passingthedesirednumberofmaptasksasthenumSplitsargument.Thisnumberistreatedasahint,asInputFormatimplementationsarefreetoreturnadifferentnumberofsplitstothenumberspecifiedinnumSplits.Havingcalculatedthesplits,theclientsendsthemtothejobtracker,whichusestheirstoragelocationstoschedulemaptaskstoprocessthemonthetasktrackers.Onatasktracker,themaptaskpassesthesplittothegetRecordReader()methodonInputFormattoobtainaRecordReaderforthatsplit.ARecordReaderislittlemorethananiteratoroverrecords,andthemaptaskusesonetogeneraterecordkey-valuepairs,whichitpassestothemapfunction.,TheabstractInputFormatclass,TheMap/ReduceframworkreliesontheInputFormatofthejobto:1.Validatetheinput-specificationofthejob.2.Split-uptheinputfile(s)intologicalInputSplits,eachofwhichisthenassignedtoanindividualMapper.ProvidetheRecordReaderimplementationtobeusedtogleaninputrecordsfromlogicalInputSplitforprocessingbyaMapper.,org.apache.hadoop.mapredInterfaceRecordReader,RecordReaderreadspairsfromanInputSplit.RecordReader,typically,convertsthebyte-orientedviewoftheinputprovidedbytheInputSplit,andpresentsarecord-orientedviewfortheMapper&Reducertasksforprocessing.Itthusassumestheresponsibilityofprocessingboundariesandpresentingthetaskswithkeys&values.,MapRunnable,MaptasksarerunbyMapRunner,thedefaultimplementationofMapRunnablethatcallstheMappersmap()methodsequentiallywitheachrecord.NotethatMapRunnerisonlywayofrunningmappers.MultithreadedMapRunnerisanotherimplementationoftheMapRunnableinterfacethatrunsmappersconcurrentlyinaconfigurablenumberofthreads(setbymapred.map.multithreadedrunner.threads),FileInputFormat,FileInputFormat提供了:1)对一个job的输入路径的定义2)为inputfiles产生splits的实现注意:输入路径不应该包含子目录,而只包含文件,因为InputFormat不会自动解析子目录,而是将其当作一个文件。对于给定的若干文件,FileInputFormat怎样将它们变为splits呢?FileInputFormat只对“大文件”进行split,这里的”大”是指比HDFS的一个block还要大。Splitsize通常就等于一个HDFSblock的大小。,MapReduce中的所有数据元素都是不可修改的,AlldataelementsinMapReduceareimmutable,meaningthattheycannotbeupdated.Ifinamappingtaskyouchangeaninput(key,value)pair,itdoesnotgetreflectedbackintheinputfiles.Communicationoccursonlybygeneratingnewoutput(key,value)pairswhicharethenforwardedbytheHadoopsystemintothenextphaseofexecution.,
展开阅读全文
相关资源
相关搜索

当前位置:首页 > 图纸专区 > 课件教案


copyright@ 2023-2025  zhuangpeitu.com 装配图网版权所有   联系电话:18123376007

备案号:ICP2024067431-1 川公网安备51140202000466号


本站为文档C2C交易模式,即用户上传的文档直接被用户下载,本站只是中间服务平台,本站所有文档下载所得的收益归上传人(含作者)所有。装配图网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对上载内容本身不做任何修改或编辑。若文档所含内容侵犯了您的版权或隐私,请立即通知装配图网,我们立即给予删除!