Hadoop平台简介-肖韬南京大学计算机系.ppt

上传人:max****ui 文档编号:6349373 上传时间:2020-02-23 格式:PPT 页数:44 大小:1.32MB
返回 下载 相关 举报
Hadoop平台简介-肖韬南京大学计算机系.ppt_第1页
第1页 / 共44页
Hadoop平台简介-肖韬南京大学计算机系.ppt_第2页
第2页 / 共44页
Hadoop平台简介-肖韬南京大学计算机系.ppt_第3页
第3页 / 共44页
点击查看更多>>
资源描述
Hadoop平台简介 肖韬南京大学计算机科学与技术系2010 使用Hadoop的JavaAPI接口 在Hadoop文件系统中的文件是由一个HadoopPath对象来表示的 可以把一个Path对象想象成一个Hadoop文件系统的URI 例如hdfs localhost 9000 user xt input text dat 通过2个静态工厂方法从抽象的Hadoop文件系统中抽取出一个具体的实现的实例 publicstaticFileSystemget Configurationconf throwsIOException 返回默认的文件系统 在conf core site xml中指定 或者本地的文件系统 如果该文件中没有指定 publicstaticFileSystemget URIuri Configurationconf throwsIOException 返回由uri决定的文件系统 或者默认的文件系统 如果uri无效 新旧API变化的对比 以0 20 0版本为分水岭 有一些API在新的版本中被舍弃了 且推荐不使用 而是改为使用新的API下面将以WordCount程序为例进行说明 0 20 0之前的WordCount程序 publicWordCount publicstaticvoidmain String args throwsThrowable JobConfconf newJobConf WordCount class conf setJobName ASampleWordCountExample FileInputFormat addInputPath conf newPath args 0 FileInputFormat setOutputPath conf newPath args 1 conf setMapperClass WordCountMapper class conf setReducerClass WordCountReducer class conf setOutputKeyClass Text class conf setOutputValueClass IntWritable class JobClient runJob conf classWordCountMapperextendsMapReduceBaseimplementsMapper publicvoidmap LongWritableoffset Textline OutputCollectorcollector Reporterreporter throwsIOException StringTokenizertokenzier newStringTokenizer line toString while tokenizer hasMoreTokens collector collect newText tokenizer nextToken newIntWritable 1 classWordCountReducerextendsMapReduceBaseimplementsReducer publicvoidreduce Textword Iteratorcounts OutputCollectorcollector Reporterreporter throwsIOException intsum 0 while counts hasNext sum counts next get collector collect word newIntWritable sum 0 20 0之后的WordCount程序 publicclassWordCount publicstaticvoidmain String args throwException Configurationconf newConfiguration Jobjob newJob conf ASampleWordCountExample job setJarByClass WordCount class job setMapperClass WordCountMapper class job setReducerClass WordCountReducer class job setOutputKeyClass Text class job setOutputValueClass IntWritable class FileInputFormat addInputPath job newPath args 0 FileOutputFormat setOutputPath job newPath args 1 job waitForCompletion true classWordCountMapperextendsMapper publicvoidmap LongWritableoffset Textline Contextcontext throwsIOException InterruptedException StringTokenizertokenizer newStringTokenizer line toString while tokenizer hasMoreTokens context write newText tokenizer nextToken newIntWritable 1 classWordCountReducerextendsReducer publicvoidreduce Textword Iteratorcounts Contextcontext throwsIOException InterruptedException intsum 0 while counts hasNext sum counts next get context write word newIntWritable sum ShuffleandSort MapReduce保证每一个reducetask的输入基于key排序的 MapReducemakestheguaranteethattheinputtoeveryreducerissortedbykey 系统进行排序的过程 包括将map的输出转换为reduce的输入 被称为shuffle Theprocessbywhichthesystemperformsthesort andtransfersthemapoutputstothereducerasinputs isknownastheshuffle shuffle过程maptask中生成了3个spillfile 每个spillfile中有3个partition shuffle过程 maptaskside 当一个maptask开始产生它的输出时 输出并非不经处理被直接就写到磁盘上去的 每一个maptask都有一个circularmemorybuffer 缺省大小为100MB maptask会将它产生的输出 key valuepairs 写入到它的memorybuffer中去 当maptask写入到memorybuffer的数据占memorybuffer的大小百分比到达一个阈值 缺省为80 时 一个backgroundthread 记为thread 将开始把memorybuffer中的内容spill到磁盘上去 在thread将memorybuffer中的数据spill到磁盘中之前 thread首先将这些数据分成若干partition 每一个partition将被发送至一个reducer 在每一个partition内 thread将根据key对该partition内的数据 即key valuepairs 进行in memorysort 如果指定了combinerfunction 那么该combinerfunction将会被作用于上述in memorysort的输出 每当memorybuffer中的数据达到一个阈值时 就会产生一个spillfile 所以在maptask输出了所有的record之后 就会存在多个spillfiles 1个record即1个key valuepair 在maptask结束之前 所有的spillfiles将被merge到一个单独的outputfile中 该outputfile在结构上由多个partition组成 每一个partition内的数据都是排好序的 且每一个partition将被送至对应的一个reducetask 如果指定了combinerfunction并且spill的数量不低于3个 那么在生成outputfile之前 combinerfunction将会作用于将要被写入到outputfile里的每一个partition内的数据 reducetaskside maptask的输出存储在maptask节点所在机器的本地文件系统中 reducetask会自己所需的某个partition数据复制到自己所在的HDFS中 且一个reducetask将会从多个maptask复制其所需要的partition 这些partition都是同一类的 reducer怎样知道从哪些maptasktracker那里去取自己所需要的partition 亦即maptask的输出 当maptask成功完成后 它会将状态更新通知它所属的tasktracker 该tasktracker进而又会通知其所属的jobtracker 这些通知是通过heartbeat通信机制实现的 这样 对于一个job而言 jobtracker知道mapoutput与tasktracker之间的映射关系 reducer中的一个线程会周期性地向jobtracker询问mapoutput所在的位置 直到该reducer接收了所有的mapoutput combinerfunction与partitionerfunction 当存在多个reducer时 maptasks将会对它们的输出进行partition 每一个masktask都会为每一个reducetask生成一个partition 在每一个partition内都可能会有很多keys 以及相应的values 但是对于任一个key而言 它的records都在一个partition内 partition的过程可以由用户定义的partitioning函数来控制 但是一般来说 默认的partitioner函数 根据key进行hash映射 已经可以令人满意 存在多个reducetask时的partitioningpartition的数量与reducer的数量是一致的 定制个性化的partitioner 自定义的partitionerfunction需要继承于一个抽象类Partitionercontrolsthepartitioninigofthekeysoftheintermediatemap outputs Thekey orasubsetofkey isusedtoderivethepartition typicallybyahashfunction Thiscontrolswhichofthemreducetaskstheintermediatekey andhencetherecord issentforreduction 实现Partitioner中的getPartition函数原型abstractintgetPartition KEYkey VALUEvalue intnumPatitions 其中 key和value是mapper输出的intermediateoutput 例如 在WordCount例子中就分别是word与1 numPartitions是reducers的数量 返回值是该record将被发送至的reducer的编号 0 1 m 1 指定多个reducers bin hadoopjarWordCount Dmapred reduce tasks 3inputoutput这样 在reduce阶段会有3个reducetasks运行 speculativeexecution 默认打开 当多个task并行运行时 可能若干个task运行明显比其他task要慢 这种情况下 Hadoop将会为这些运行较慢的task启动一个相同的backuptask 称为speculativeexecution 一个task及其speculativetask不会同时运行 以避免竞争 在一个job的所有task都已经启动的情况下 对于那些同时满足1 已经运行了一段时间 至少1分钟 2 运行的速度明显慢于其余task的平均速度的task 一个speculativetask才会被启动 对于originaltask及其speculativetask而言 如果任何一方先运行结束 则另一方将被killed Skippingbadrecords 当一个task失败时 原因可能是硬件故障 待处理数据非法等 该task将会被retried 但是如果该task失败的次数达到4次 那么该task所属的整个job就将被标记为failed 当maptask读到一个badrecord时 可能会因为抛出异常而失败 进而整个job可能会失败 有时 第三方的库可能有bug 导致task因读取了某个badrecord而失败 而这个第三方的库又无法修改 这时 可以使用Hadoop的skipmode 以使得读取输入文件使自动地跳过badrecords 在打开了skippingmode之后 task会将其所处理的records报告给tasktracker 当task失败时 tasktracker会retry该task 并跳过引起失败的records 为了减少skippingmode带来的带宽及记账信息 bookkeeping 的消耗 当一个task失败达到2次时 才会开启skippingmode 如果一个task因为某个badrecord而持续地失败 那么tasktracker将会以下列的结果执行taskattempts task失败 task再次失败 skippingmode被打开 task仍然失败 但是badrecord被tasktracker记录下来 skippingmode处于使能状态 task因为跳过了前面导致失败的badrecord而成功 skippingmode是默认关闭的 注意 对于每一个taskattempt skippingmode只能发现一个badrecord Taskside effectfiles 要保证一个task的多个instance不会试图向同一个文件进行写操作 1 如果某个task失败了 失败前已经向输出文件中写了一部分数据 那么当其再次运行 retry 时 必须先将旧的文件删掉 2 当speculativeexecution被使能时 某个originaltask与它的speculativetask可能会试图向同一个文件进行写操作 Hadoop为每一个taskattempt指定了一个临时目录 每一个taskattempt的输出就会被写到这个目录中去 从而避免了上述的问题 这个目录就是 mapred output dir InputFormat map k1 v1 list k2 v2 combine k2 list v2 list k2 v2 reduce k2 list v2 list k3 v3 可以看出 如果使用combiner 那么它的输入 输出格式与reducer是完全一样的 同时也是Reducer的子类 只不过combiner的输出是intermediatekey valuepairs 这将是reducer的输入 Inputtypes由Inputformat决定 例如TextInputFormat决定了输入的key的类型是LongWritable 首字符在文件中的偏移量 value的类型是Text 一行文本内容 如果希望产生其他类型的输入 可以显式地调用JobConf的方法 否则 若不显式地 setexplicitly 设置 则不论是否使用combiner intermediatetypes默认与最终的输出类型相同 即LongWritable与Text 所以 若k2和k3相同 则不需要调用setMapKeyOutputClass 因为intermediatekeytype已经被setOutputKeyClass 设置好了 同理 若v2和v3相同 则只需要调用setOutputValueClass 即可 为什么要为intermediatekey valuepairs和最终的output指定类型 似乎通过mapper与reducer就可以确定intermediatekey valuepairs和最终的output的类型了 原因 Java的泛型机制中的typeerasure使得这些类型信息在运行时是不可知的 所以必须显式地为Hadoop指定这些类型 InputFormatclasshierarchy InputSplit 什么是inputsplit 1个inputsplit是inputfile中的1个chunk 该chunk将被1个单独的map进行处理 每一个map处理一个inputsplit 每一个split可被划分为若干records 1个record即1个key valuepair map依次处理每一个record Inputsplit由一个Java抽象类代表 即org apache hadoop mapreduce abstractclassInputSplit InputSplitrepresentsthedatatobeprocessedbyanindividualmapper Typically itpresentsabyte orientedviewontheinputandistheresponsibilityofRecordReaderofthejobtoprocessthisandpresentarecord orientedview 注意 InputSplit并不包含inputdata 而只是指向inputdata的一个reference Map Reduce系统利用getLocations 所得到的storagelocations信息来将maptasks放置在尽可能靠近inputsplit数据的地方 利用getLength 得到的size信息对splits进行排序 使得最大的spilt先被处理 试图来最小化job的运行时间 Inputfile inputsplitandrecord inputfile Inputsplit record key valuepair MapReduce应用程序开发者不需要直接处理InputSplit 因为它是由一个InputFormat生成的 InputFormat负责生成inputsplits 并把它们划分为records 0 20 0之前的定义如下publicinterfaceInputFormat InputSplit getSplits JobConfjob intnumSplits throwsIOException RecordReadergetRecordReader InputSplitsplit JobConfjob Reporterreporter throwsIOException 其实跟新的还是很类似的 对于旧版InputFormat的解释 TheJobClientcallsthegetSplits method passingthedesirednumberofmaptasksasthenumSplitsargument Thisnumberistreatedasahint asInputFormatimplementationsarefreetoreturnadifferentnumberofsplitstothenumberspecifiedinnumSplits Havingcalculatedthesplits theclientsendsthemtothejobtracker whichusestheirstoragelocationstoschedulemaptaskstoprocessthemonthetasktrackers Onatasktracker themaptaskpassesthesplittothegetRecordReader methodonInputFormattoobtainaRecordReaderforthatsplit ARecordReaderislittlemorethananiteratoroverrecords andthemaptaskusesonetogeneraterecordkey valuepairs whichitpassestothemapfunction TheabstractInputFormatclass TheMap ReduceframworkreliesontheInputFormatofthejobto 1 Validatetheinput specificationofthejob 2 Split uptheinputfile s intologicalInputSplits eachofwhichisthenassignedtoanindividualMapper ProvidetheRecordReaderimplementationtobeusedtogleaninputrecordsfromlogicalInputSplitforprocessingbyaMapper org apache hadoop mapredInterfaceRecordReader RecordReaderreadspairsfromanInputSplit RecordReader typically convertsthebyte orientedviewoftheinputprovidedbytheInputSplit andpresentsarecord orientedviewfortheMapper Reducertasksforprocessing Itthusassumestheresponsibilityofprocessingboundariesandpresentingthetaskswithkeys values MapRunnable MaptasksarerunbyMapRunner thedefaultimplementationofMapRunnablethatcallstheMapper smap methodsequentiallywitheachrecord NotethatMapRunnerisonlywayofrunningmappers MultithreadedMapRunnerisanotherimplementationoftheMapRunnableinterfacethatrunsmappersconcurrentlyinaconfigurablenumberofthreads setbymapred map multithreadedrunner threads FileInputFormat FileInputFormat提供了 1 对一个job的输入路径的定义2 为inputfiles产生splits的实现注意 输入路径不应该包含子目录 而只包含文件 因为InputFormat不会自动解析子目录 而是将其当作一个文件 对于给定的若干文件 FileInputFormat怎样将它们变为splits呢 FileInputFormat只对 大文件 进行split 这里的 大 是指比HDFS的一个block还要大 Splitsize通常就等于一个HDFSblock的大小 MapReduce中的所有数据元素都是不可修改的 AlldataelementsinMapReduceareimmutable meaningthattheycannotbeupdated Ifinamappingtaskyouchangeaninput key value pair itdoesnotgetreflectedbackintheinputfiles Communicationoccursonlybygeneratingnewoutput key value pairswhicharethenforwardedbytheHadoopsystemintothenextphaseofexecution
展开阅读全文
相关资源
正为您匹配相似的精品文档
相关搜索

最新文档


当前位置:首页 > 图纸专区 > 课件教案


copyright@ 2023-2025  zhuangpeitu.com 装配图网版权所有   联系电话:18123376007

备案号:ICP2024067431-1 川公网安备51140202000466号


本站为文档C2C交易模式,即用户上传的文档直接被用户下载,本站只是中间服务平台,本站所有文档下载所得的收益归上传人(含作者)所有。装配图网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对上载内容本身不做任何修改或编辑。若文档所含内容侵犯了您的版权或隐私,请立即通知装配图网,我们立即给予删除!