大数据导论思维第12章-SPARK-SQL课件

上传人:9** 文档编号:243155106 上传时间:2024-09-17 格式:PPTX 页数:62 大小:7MB
返回 下载 相关 举报
大数据导论思维第12章-SPARK-SQL课件_第1页
第1页 / 共62页
大数据导论思维第12章-SPARK-SQL课件_第2页
第2页 / 共62页
大数据导论思维第12章-SPARK-SQL课件_第3页
第3页 / 共62页
点击查看更多>>
资源描述
单击此处编辑母版标题样式,单击此处编辑母版文本样式,第二级,第三级,第四级,第五级,大数据导论,第十二章,CONTENTS,目录,PART 01,SPARK SQL,简介,PART 02,SPARK SQL,执行流程,PART 03,基础数据模型,DATAFRAME,PART 04,使用,Spark SQL,的方式,PART 05,SPARK SQL,数据源,PART 06,SPARK SQL CLI,介绍,PART 07,在,Pyspark,中使用,Spark SQL,PART 08,在,Java,中连接,Spark SQL,PART 09,习题,PART 01,Spark SQL,简介,Spark SQL,是一个用来处理结构化数据的,Spark,组件,为,Spark,提供了查询结构化数据的能力。,Spark SQL,可被视为一个分布式的,SQL,查询引擎,可以实现对多种数据格式和数据源进行,SQL,操作,包括,Parquet,,,Hive,,,MongoDB,,,JSON,、,HDFS,、,JDBC,、,S3,和,RDD,等。,Spark SQL,简介,Spark SQL,介绍:,Spark,SQL,是为了处理结构化数据的一个,Spark,模块,。,不同,于,Spark RDD,的基本,API,,,Spark SQL,接口拥有更多关于数据结构本身与执行计划等更多信息,。,在,Spark,内部,,Spark SQL,可以利用这些信息更好地对操作进行,优化。,Spark,SQL,提供了三种访问接口:,SQL,,,DataFrame API,和,Dataset,API,。,当,计算引擎被用来执行一个计算时,有不同的,API,和语言种类可供,选择。,这种,统一性意味着开发人员可以来回轻松切换各种最熟悉的,API,来完成同一个计算工作,。,Spark SQL,简介,Spark SQL,具有如下,特点,数据兼容,方面,:能加载和查询来自各种来源的数据。,性能优化方面,:除了采取内存列存储、代码生成等优化技术外,还引进成本模型对查询进行动态评估、获取最佳物理计划等;,组件扩展,方面,:无论,是,SQL,的语法解析器、分析器还是优化器都可以重新定义,进行扩展,。,标准连接,:,Spark SQL,包括具有行业标准,JDBC,和,ODBC,连接的服务器模式,。,Spark SQL,简介,Spark SQL,具有如下,特点,集成,:无缝地将,SQL,查询与,Spark,程序混合。,Spark SQL,允许将结构化数据作为,Spark,中的分布式数据集(,RDD,)进行查询,在,Python,,,Scala,和,Java,中集成了,API,。这种紧密的集成使得,SQL,查询以及复杂的分析算法可以轻松地运行,。,可扩展性,:对于交互式查询和长查询使用相同的引擎。,Spark SQL,利用,RDD,模型来支持查询容错,使其能够扩展到大型作业,不需担心为历史数据使用不同的引擎,。,PART 02,Spark SQL,执行流程,Spark SQL,执行流程,类似于,关系型数据库,,Spark SQL,语句也是由,Projection,(,a1,,,a2,,,a3,)、,Data Source,(,tableA,)、,Filter,(,condition,)三部分组成,分别对应,SQL,查询过程中的,Result,、,Data Source,、,Operation,,也就是说,SQL,语句按,Result-Data Source-Operation,的次序来描述的。,Spark SQL,执行流程,解析,(,Parse,),对读入的,SQL,语句进行解析,分辨出,SQL,语句中哪些词是关键词(如,SELECT,、,FROM,、,WHERE,),哪些是表达式、哪些是,Projection,、哪些是,Data Source,等,从而判断,SQL,语句是否规范;,绑定,(,Bind,),将,SQL,语句和数据库的数据字典(列、表、视图等)进行绑定,如果相关的,Projection,、,Data Source,等都存在,则这个,SQL,语句是可以执行的;,Spark SQL,执行流程,优化(,Optimize,),一般,的数据库会提供几个执行计划,这些计划一般都有运行统计数据,数据库会在这些计划中选择一个最优计划;,执行(,Execute,),按,Operation-Data Source-Result,的次序来执行计划。在执行过程有时候甚至不需要读取物理表就可以返回结果,比如重新运行刚运行过的,SQL,语句,可能直接从数据库的缓冲池中获取返回结果,。,PART 03,基础数据模型,DataFrame,DataFrame,是由“命名列”(类似关系表的字段定义)所组织起来的一个分布式数据集合,可以把它看成是一个关系型数据库的表。,基础数据模型,DataFrame,DataFrame,是,Spark SQL,的核心,它将数据保存为行构成的集合,行对应列有相应的列名。,DataFrame,与,RDD,的主要区别在于,,DataFrame,带有,Schema,元信息,即,DataFrame,所表示的二维表数据集的每一列都带有名称和类型,。,这使得,Spark SQL,可以掌握更多的结构信息,从而能够对,DataFrame,背后的数据源以及作用于,DataFrame,之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标,。,基础数据模型,DataFrame,DataFrame,与,RDD,的,对比:,PART 04,使用,Spark SQL,的方式,使用,Spark SQL,的方式,使用,Spark SQL,,首先利用,sqlContext,从外部数据源加载数据为,DataFrame,;然后,利用,DataFrame,上丰富的,API,进行查询、转换;最后,将结果进行展现或存储为各种外部数据形式,。,Spark,SQL,为,Spark,提供了查询结构化数据的能力,查询时既可以使用,SQL,也可以使用,DataFrameAPI,(,RDD,)。通过,Thrift Server,,,SparkSQL,支持多语言编程包括,Java,、,Scala,、,Python,及,R,。,使用,Spark SQL,的方式,使用,Spark SQL,的方式,加载,数据,.,从,Hive,中的,users,表构造,DataFrame,:,users = sqlContext.table(users),.,加载,S3,上的,JSON,文件:,logs = sqlContext.load(s3n:/path/to/data.json, json),.,加载,HDFS,上的,Parquet,文件,:,clicks = sqlContext.load(hdfs:/path/to/data.parquet, parquet),使用,Spark SQL,的方式,加载,数据,.,通过,JDBC,访问,MySQL,:,comments = sqlContext.jdbc(jdbc:mysql:/localhost/comments, user),.,将,普通,RDD,转变为,DataFrame,:,rdd = sparkContext.textFile(“article.txt”) .flatMap(_.split( ) .map(_, 1) .reduceByKey(_+_),wordCounts = sqlContext.createDataFrame(rdd, word, count),使用,Spark SQL,的方式,加载,数据,.,将,本地数据容器转变为,DataFrame,:,data,= (Alice, 21), (Bob, 24),people = sqlContext.createDataFrame(data, name, age,),.,将,PandasDataFrame,转变为,SparkDataFrame,(,PythonAPI,特有功能):,sparkDF=sqlContext.createDataFrame(pandasDF,),使用,Spark SQL,的方式,使用,DataFrame,.,创建,一个只包含,年轻,用户的,DataFrame,:,young = users.filter(users.age 21),.,也,可以使用,Pandas,风格的,语法:,young = usersusers.age = 13 AND age = 19),teenagers.show,(),Parquet,文件数据源,JSON DataSets,数据源,JSON DataSets,数据源,Spark SQL,可以自动根据,JSON DataSet,的格式把其上载为,DataFrame,。,用路径指定,JSON dataset,;路径下可以是一个文件,也可以是多个,文件:,sc,=,spark.sparkContext,path,= examples/src/main/resources/people.json,peopleDF = spark.read.json(path,),使用的结构可以调用,printSchema(),方法,打印:,peopleDF.printSchema(),利用,DataFrame,创建一个临时,表:,使用,Spark,的,sql,方法进行,SQL,查询:,peopleDF.createOrReplaceTempView,(people,),teenagerNamesDF,= spark.sql(SELECT name FROM people WHERE age BETWEEN 13 AND 19),teenagerNamesDF.show,(),JSON DataSets,数据源,JSON dataset,的,DataFrame,也可以是,RDDString,格式,每个,JSON,对象为一个,string,:,jsonStrings,= name:Yin,address:city:Columbus,state:Ohio,otherPeopleRDD = sc.parallelize(jsonStrings),otherPeople = spark.read.json(otherPeopleRDD),otherPeople.show,(),JSON DataSets,数据源,Hive,表数据源,Hive,表数据源,Spark SQL,支持对,Hive,中的数据进行读写。首先创建一个支持,Hive,的,SparkSession,对象,包括与,Hive metastore,的连接,支持,Hive,的序列化和反序列化操作,支持用户定义的,Hive,操作等。,warehouse_location,= abspath(spark-warehouse),spark = SparkSession .builder ,.appName(Python Spark SQL Hive integration example) ,.config(spark.sql.warehouse.dir, warehouse_location) ,.enableHiveSupport() .getOrCreate,(),warehouse_location,指定数据库和表的缺省,位置:,Hive,表数据源,spark.sql,(CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive),spark.sql(LOAD DATA LOCAL INPATH examples/src/main/resources/kv1.txt INTO TABLE src,),基于新创建的,SparkSession,创建表和上载数据到表,中:,spark.sql,(SELECT * FROM src).show(),spark.sql(SELECT COUNT(*) FROM src).show,(),使用,HiveQL,进行,查询:,Hive,表数据源,sqlDF,= spark.sql(SELECT key, value FROM src WHERE key val sqlContext = new org.apache.spark.sql.SQLContext(sc),sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext1943a343,scala import sqlContext.implicits._,import sqlContext.implicits,._,SQLContext,Spark SQL CLI,介绍,下面的操作基于一个简单的数据文件,people.json,,文件的内容如下,:,name:Michael,name:Andy, age:30,name:Justin, age:19,数据文件,下面语句从本地文件,people.json,读取数据创建,DataFrame,:,val df = sqlContext.read.json(file:/data/people. json),df: org.apache.spark.sql.DataFrame = age: bigint, name: string,创建,DataFrames,Pyspark,是针对,Spark,的,Python API,。,Spark,使用,py4j,来实现,Python,与,Java,的互操作,从而实现使用,Python,编写,Spark,程序。,Spark,也同样提供了,Pyspark,,一个,Spark,的,Python Shell,,可以以交互的方式使用,Python,编写,Spark,程序。,PART 07,在,Pyspark,中使用,Spark SQL,在,Pyspark,中使用,Spark SQL,在终端上启动,PythonSpark,Shell,:,./bin/pyspark,使用,JSON,文件作为数据源,创建,JSON,文件,/home/sparksql/courses.json,,并输入下面的内容,:,实例描述,name:Linux, type:basic, length:10,name:TCPIP, type:project, length:15,name:Python, type:project, length:8,name:GO, type:basic, length:2,name:Ruby, type:basic, length:5,在,Pyspark,中使用,Spark SQL,首先使用,SQLContext,模块,其作用是提供,Spark SQL,处理的功能。在,Pyspark Shell,中逐步输入下面步骤的,内容:,引入,pyspark.sql,中的,SQLContext,:,from,pyspark.sql import,SQLContext,创建,SQLContext,对象,使用,pyspark,的,SparkContext,对象,创建,SQLContext,对象:,sqlContext,= SQLContext(sc,),在,Pyspark,中使用,Spark SQL,DataFrame,对象可以由,RDD,创建,也可以从,Hive,表或,JSON,文件等数据源创建,。,创建,DataFrame,,指明来源自,JSON,文件:,df,= sqlContext.read.json(/home/shiyanlou/courses.json,),创建,DataFrame,对象,在,Pyspark,中使用,Spark SQL,首先打印当前,DataFrame,里的内容和数据表的格式,:,df.select,(name).show()#,展示了所有的课程名,df.select(name, length).show()#,展示了所有的课程名及课程,长度,对,DataFrame,进行操作,show(),函数将打印出,JSON,文件中存储的数据表;使用,printSchema(),函数打印数据表的格式,。,然后对,DataFrame,的数据进行各种操作,:,df.show,(),df.printSchema(),在,Pyspark,中使用,Spark SQL,df.filter(df,type = basic).select(name, type).show()#,展示了课程类型为基础课(,basic,)的课程名和课程类型,df.groupBy(type).count().show()#,计算所有基础课和项目课的数量,。,首先需要将,DataFrame,注册为,Table,才可以在该表上执行,SQL,语句:,df.registerTempTable,(courses),coursesRDD = sqlContext.sql(SELECT name FROM courses WHERE length = 5 and length = 10),names = coursesRDD.rdd.map(lambda p: Name: + p.name),for name in names.collect():,print,name,执行,SQL,语句,在,Pyspark,中使用,Spark SQL,Parquet,是,Spark SQL,读取的默认数据文件格式,把从,JSON,中读取的,DataFrame,保存为,Parquet,格式,只保存课程名称和长度两项数据:,df.select,(name, length).write.save(/tmp/courses.parquet, format=parquet,),保存,DataFrame,为其他格式,将创建,hdfs:/master:9000/tmp/courses.parquet,文件夹并存入课程名称和长度数据。,Spark SQL,实现了,Thrift JDBC/ODBC server,,所以,Java,程序可以通过,JDBC,远程连接,Spark SQL,发送,SQL,语句并执行。,PART,08,在,Java,中,连接,Spark,SQL,在,Java,中,连接,Spark,SQL,首先将,$HIVE_HOME/conf/hive-site.xml,拷贝到,$SPARK_HOME/conf,目录下。另外,因为,Hive,元数据信息存储在,MySQL,中,所以,Spark,在访问这些元数据信息时需要,MySQL,连接驱动的支持。添加驱动的方式有三种,:,在,$SPARK_HOME/conf,目录下的,spark-defaults.conf,中添加:,spark.jars /opt/lib2/mysql-connector-java-5.1.26-bin.jar,;,可以实现添加多个依赖,jar,比较方便:,spark.driver.extraClassPath /opt/lib2/mysql-connector-java-5.1.26-bin.jar,;,设置配置,在运行时添加,-jars/,opt/lib2/mysql-connector-java-5.1.26-bin.jar,做完上面的准备工作后,,Spark SQL,和,Hive,就继承在一起了,,Spark SQL,可以读取,Hive,中的数据,。,设置配置,启动,Thrift,在,Spark,根目录下执行:,./sbin/start-thriftserver.sh,开启,thrift,服务器,它可以接受所有,spark-submit,的参数,并且还可以接受,-hiveconf,参数。不添加任何参数表示以,local,方式运行,默认的监听端口为,10000,在,Java,中,连接,Spark,SQL,添加依赖,打开,Eclipse,用,JDBC,连接,Hive Server2,。新建一个,Maven,项目,在,pom.xml,添加以下依赖:,org.apache.hive,hive-jdbc,1.2.1,org.apache.hadoop,hadoop-common,2.4.1,在,Java,中,连接,Spark,SQL,添加依赖,jdk.tools,jdk.tools,1.6,system,$JAVA_HOME/lib/tools.jar,在,Java,中,连接,Spark,SQL,JDBC,连接,Hive Server2,的相关参数,:,驱动:,org.apache.hive.jdbc.HiveDriver,url,:,jdbc:hive2:/192.168.1.131:10000/default,用户名:,hadoop (,启动,thriftserver,的,linux,用户名,),密码:“”(默认密码为空,),JDBC,连接参数,在,Java,中,连接,Spark,SQL,importjava.sql.Connection;,importjava.sql.DriverManager;,importjava.sql.ResultSet;,importjava.sql.SQLException;,importjava.sql.Statement;,publicclassTest1,publicstaticvoidmain(Stringargs)throwsSQLException,Stringurl=jdbc:hive2:/192.168.1.131:10000/default;,try,Class.forName(org.apache.hive.jdbc.HiveDriver);,catch(ClassNotFoundExceptione),e.printStackTrace();,Connectionconn=DriverManager.getConnection(url,hadoop,);,Statementstmt=conn.createStatement();,Stringsql=SELECT*FROMdoc1limit10;,System.out.println(Running+sql);,ResultSetres=stmt.executeQuery(sql);,while(res.next(),System.out.println(id:+res.getInt(1)+ttype:+res.getString(2)+tauthors:+res.getString(3)+ttitle:+res.getString(4)+tyear:+res.getInt(5);,JDBC,连接参数,在,Java,中,连接,Spark,SQL,PART 09,作业,作业,作业:,什么,是,Spark SQL,?其主要目的是什么,?,Spark,SQL,的执行流程有哪几个步骤,?,在,Spark SQL,中,什么是,DataFrame,?使用,DataFrame,的优势是什么?,DataFrame,与,RDD,的主要区别是什么,?,使用,Spark SQL,的方式有哪几种?使用,Spark SQL,的步骤是什么,?,常用,的,Spark SQL,的数据源有哪些,?,Parquet,文件格式是什么?它的主要特点是什么,?,为了,使,Java,程序可以通过,JDBC,远程连接,Spark SQL,,需要做哪些准备工作?连接数据库的语句是什么?有哪些参数?,作业,作业:,请,按下述要求写出相应的,Spark SQL,语句:,从,一个本地,JSON,文件创建,DataFrame,;,打印,DataFrame,元数据,;,按,照列属性过滤,DataFrame,的数据,;,返回,某列满足条件的数据,;,把,DataFrame,注册成数据库表。,谢谢,FOR YOUR LISTENING,Handge CO. LTD.,2016.12.09,
展开阅读全文
相关资源
正为您匹配相似的精品文档
相关搜索

最新文档


当前位置:首页 > 办公文档 > 教学培训


copyright@ 2023-2025  zhuangpeitu.com 装配图网版权所有   联系电话:18123376007

备案号:ICP2024067431-1 川公网安备51140202000466号


本站为文档C2C交易模式,即用户上传的文档直接被用户下载,本站只是中间服务平台,本站所有文档下载所得的收益归上传人(含作者)所有。装配图网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对上载内容本身不做任何修改或编辑。若文档所含内容侵犯了您的版权或隐私,请立即通知装配图网,我们立即给予删除!