Spark SQL笔记

HDFS

HDFS架构

1、Master(NameNode/NN) 对应 N个Slaves(DataNode/NN)
2、一个文件会被拆分成多个块(Block)
默认:128M
例: 130M ==> 128M + 2M
3、NameNode、DataNode负责内容:
NN:
1)负责客户端请求的响应
2)负责元数据(文件名称、副本系数、Block存放的DN)的管理
DN:
1)存储用的文件对应的数据块(Block)
2)定期向NN发送心跳信息(默认3秒),汇报本身及其所有的Block信息,健康状况
4、 重要提示:
A typical deployment has a dedicated machine that runs only the NameNode software.
Each of the other machines in the cluster runs one instance of the DataNode software.
The architecture does not preclude running multiple DataNodes on the same machine
but in a real deployment that is rarely the case.
一个典型的部署有一个专用的机器,它只运行NAMENODE软件。
群集中的每个其他机器运行DATEAON软件的一个实例。
架构不排除在同一台机器上运行多个数据流。
但在实际部署中,情况很少。 #不建议这种操作
NameNode + N个DataNode
建议:NameNode、DataNode分布在单独的节点中
2)replication factor 副本系数、副本因子
3)All blocks in a file except the last block are the same size
除了最后一个块之外,文件中的所有块都是相同大小的

HDFS相关配置:
hadoop-env.sh,core-site.xml,hdfs-site.xml3个配置文件进行修改
机器参数配置
hostname:主机名
修改机器名:/etc/sysconfig/network
NETWORK=yes
HOSTNAME=主机名
设置IP和hostname的映射关系:IP地址:主机名
SSH免密码登录:
ssh-keygen -t rsa

5、HDFS优缺点
优点:
高容错
适合批处理
适合大数据处理
可构建在廉价机器上
缺点:
低延迟访数据访问
不适合小文件存储
6、MapReduce
1、特点
1)易于编程
2)良好的扩展性
3)高容错性
4)海量数据的离线处理
2、不擅长场景
1)实时计算
2)流式计算
3)DAG(有向无环图)计算

YARN

YARN架构:

1个RM(ResourceManager)主节点+N个NM(NodeManager)从节点
ResourceManager职责:一个集群active状态的RM只有一个,负责整个集群额资源管理和调度
1.处理客户端的请求(启动或杀死一个作业)
2.启动/监控ApplicationMaster(一个作业对应一个AM)
3.通过心跳监控NM
4.系统的资源分配和调度

NodeManager职责:整个集群中有N个节点,负责单个节点的资源管理和使用以及task的运行
1.定期向RM汇报本节点资源使用情况和各个Container的运行状态
2.接收并处理RM的Container启停的各种命令
3.单个节点的资源管理和任务管理

ApplicationMaster职责:每个作业/应用对应一个,负责 应用程序的管理
1.数据切分
2.为应用程序向RM申请资源(container),并分配给内部任务
3.与NM通信以启停task,task是运行在container中的
4.task的监控和容错
container职责:对任务运行情况的描述包括(cpu,memory,环境变量)

YARN执行流程:

1.用户向YARN提交作业
2.RM为该作业分配的第一个Container(启动AM)
3.RM会与对应的NM通信,要求NM在这个Container上启动应用程序的AM
4.AM首先向RM注册,然后AM将为各个任务申请资源,并监控运行情况
5.AM采用轮训的方式通过RPC协议向RM申请和领取资源
6.AM申请到资源后,便和相应的NM通信,要求NM启动任务
7.NM启动我们作业对应的task

YARN环境搭建:

mapred-site.xml配置
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>
yarn-site.xml配置

<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>

启动yarn:sbin/start-yarn.sh
验证是否启动成功:jps
有如下进程:ResourceManager,NodeManager,Web访问:http://主机名:8088
停止yarn:sbin/stop-yarn.sh
提交MapReduce作业到yarn上运行:
需要找到jar包的位置例如:/home/hadoop/app/hadoop-2.6.0-cdh5.7.0/share/hadoop/mapreduce//hadoop-mapreduce-examples-2.x-cdh5.7.jar
在$HADOOP_HOME/bin执行:hadoop jar /home/hadoop/app/hadoop-2.6.0-cdh5.7.0/share/hadoop/mapreduce//hadoop-mapreduce-examples-2.x-cdh5.7.jar wordcount /input/wc/文件名 /output/wc/

将所需数据上传到hdfs上,

    a:hadoop fs -ls /(查看根目录)

    b:hadoop fs -mkdir -p /input/wc(创建文件夹,-p创建多个文件夹)

    c:上传数据文件到hadoop文件夹中,命令:hadoop fs -put hello.txt /input/wc

    d:hadoop fs -text /input/wc/hello.txt(查看文件内容)

    e:hadoop fs -rm -r /output/wc(删除文件夹)

再次执行该作业时会报错:

FileAlreadyExistsException:

Output directory hdfs://hadoop000:8020/ouput/wc already exists

需要执行删除文件夹的命令

Hive

Hive构建在Hadoop上的数据仓库,定义一种类SQL的HQL语言,通常用于离线数据处理
Hive底层的执行引擎有:MapReduce,Spark,Tez
压缩有Gzip,Lzo,SNappy,Bzip2
存储有:TextFile,SequenceFile,RCfile,ORCfILE,
UDF:自定义函数,自己发开

Hive环境搭建

1.下载地址:
2.解压:tar -zxvf hive-1.1.0-cdh5.7.0.tar.gz -C ~/app/
3.配置环境变量:vi ~/.bash_profile 添加export HIVE_HOME=/home/hadoop/app/hive-1.1.0-cdh5.7.0,export PATH=$HIVE_HOME/bin:$PATH
4.安装一个MySQL yum install mysql-server,输入service mysqld start 启动服务,输入:mysqladmin -u root -p password '123456'来设置密码
5.在conf文件夹的hive-env.sh配置hadoop的安装路径:HADOOP_HOME=/home/hadoop/app/hadoop-2.6.0-cdh5.7.0

6.配置mysql的数据信息及原数据:有4项
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<!--alterdb为要创建的数据库名,注意字符集设置-->
<value>jdbc:mysql://localhost:3306/sparksql?createDatabaseIfNotExist=true</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<!--MySQL登录账户名-->
<value>root</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<!--MySQL登录密码-->
<value>root</value>
</property>
<property>
<name>kite.hive.allow-local-metastore</name>
<value>true</value>
</property>
<property>
<name>hive.metastore.schema.verification</name>
<value>false</value>
</property>
<property>
<name>hive.metastore.uris</name>
<value>thrift://hadoop000:9083</value>
<description>Thrift uri for the remote metastore. Used by metastore client to connect to remote metastore.</description>
</property>
</configuration>
7.拷贝mysql驱动到hive安装路径下lib文件夹中 cp ~/software/mysql-connector-java-5.1.27-bin.jar .
8.启动hive之前要先启动Hive的Metastore Server服务进程,在${HIVE_HOME/bin}执行hive --service metastore &后,再执行 ./hive

Hive基本使用

9.创建表:

创建数据表:
CREATE TABLE table_name
[(col_name data_type [COMMENT col_comment], ... [constraint_specification])]

例如:create table hive_worddcount(context string);

如果创建数据表出现如下问题:hive> create table ehr_base(id string);
FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. MetaException(message:For direct MetaStore DB connections, we don't support retries at the client level.)说明MySQL中的hive数据库字符编码有问题,需要连接mysql服务器,并执行SQL语句修改hive数据库的字符编码alter database '数据库' character set latin1;
登录mysql(mysql --uroot -proot)
show tables;
select * from TBLS;
表的字段在COLUMNS_V2 select * from COLUMNS_V2;
表创建完成后,需要将数据加载到表里面(将linux服务器目录下的data中的数据例如hello.txt加载到hive表里面去)

使用Hive进行wordcount统计

10.加载数据到hive表

命令:LOAD DATA LOCAL INPATH 'filepath' INTO TABLE tablename
例如:load data local inpath '/home/hadoop/data/hello.txt' into table hive_worddcount;

使用explode函数对split的结果集进行行拆列:

  select explode(split(context,' ')) as word from hive_wordcount;

11.查询统计词频出现的次数:
  select word,count(1) as count from (select explode(split(context,' ')) as word from hive_wordcount) word group by word order by count;

$hive> INSERT OVERWRITE LOCAL DIRECTORY ‘/tmp/wordcount_result’ SELECT word,count(1) FROM words GROUP BY word ORDER BY word;
LOCAL去掉表示导出到HDFS的目录;

lateral view explade()作用是把每行记录按照指定分隔符进行拆解

练习
1.创建员工表:
create table emp(
empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ';

2.创建部门表:
create table dept(
deptno int,
dname string,
location string
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ';

加载数据表到hive中,执行:如下的hive语句:

hive> load data local input '/home/hadoop/data/dept.txt' into table dept;

hive> load data local input '/home/hadoop/data/emp.txt' into table emp;

求每个部门人数?

select deptno,count(1) from emp group by deptno;

12.

hive sql 提交执行以后会生产mr作业,并在yarn运行

Spark SQL

1.Spark1.x中SparkSQL的入口点是SQLContext

2.Spark2.x中SparkSQL的入口点是SparkSession

已经存在hive中的数据表,sparkSQL如何进行访问?
在$SPARK_HOME/bin目录下启动./spark-shell --master local[2]
访问hive中的数据表有2个条件
1.将$HIVE_HOME/conf目录下的hive-site.xml拷贝到$SPARK_HOME/conf目录下
2.需要jars包,执行./spark-shell --master local[2] --jars ~/software/mysql-connector-java-5.1.27-bin.jar
在scala>spark.sql("show tables").show,就可以访问hive中的数据表了
例如执行:

1.scala>spark.sql("select * from dept").show

2.scala>spark.table("emp").show

spark是否可以和hive一样使用sql命令得到结果?
在$SPARK_HOME/bin目录下启动./spark-sql --master local[2] --jars ~/software/mysql-connector-java-5.1.27-bin.jar
通过web访问:hadoop000:4040
thriftserver/beeline的配合使用(启动thriftserver成功后,通过beeline连接到thriftserver)
1.在$SPARK_HOME/sbin目录下启动./start-thriftserver.sh --master local[2] --jars ~/software/mysql-connector-java-5.1.27-bin.jar
另:通过jps -m查看隐藏进程,默认端口是10000,该端口可更改
2.启动beeline,在$SPARK_HOME/bin目录下启动./beeline -u jdbc:hive2://localhost:10000 -n hadoop(-n是指用户名而不是主机名)

Beeline version 1.2.1.spark2 by Apache Hive
0: jdbc:hive2://localhost:10000> show tables;
+-----------+-----------------+--------------+--+
| database | tableName | isTemporary |
+-----------+-----------------+--------------+--+
| default | dept | false |
| default | emp | false |
| default | hive_wordcount | false |
+-----------+-----------------+--------------+--+
3 rows selected (2.45 seconds)
0: jdbc:hive2://localhost:10000>

3.可以启动多个beeline
4.停止./stop-thriftserver.sh

5.修改端口:在启动时候./start-thriftserver.sh
--master local[2] --jars ~/software/mysql-connector-java-5.1.27-bin.jar
hiveconf hive.server2.thrift..port=14000,启动beeline时候./beeline -u jdbc:hive2://localhost:14000 -n hadoop

thriftserver和普通的spark-shell/sparkSQL有什么区别?
1.每次启动一个spark-shell或sparkSQL都是一个spark application,都需要重新启动并申请资源
2.但是采用thriftserver不管启动多少个客户端(beeline)只要连接到一个server(服务端)上,永远都是一个spark application,无需重新申请资源,可以对缓存的数据进行直接使用,解决数据共享的问题

DataFrame基本API常用操作:
1、collect 和 collectAsList 将df中的数据转化成Array和List
2、count 统计df中的总记录数
3、first 获取df中的第一条记录,数据类型为Row
4、head 获取df的前几条记录
5、show 默认打印前20条数据
6、take 获取df中的前几条记录
7、cache 对df进行缓存
8、columns 显示所有的列的schema列名,类型为Array[String]
9、dtypes 显示所有的列的schema信息,类型为Array[(String, String)]
10、explain 显示当前df的执行计划
11、isLocal 当前spark sql的执行是否为本地,true为真,false为非本地
12、printSchema 打印schema以树的格式
13、registerTempTable
14、schema 返回DataFrame的schema为types.StructType
15、toDF 备注:toDF带有参数时,参数个数必须和调用这DataFrame的列个数据是一样的类似于sql中的:toDF:insert into t select * from t1;
16、intersect 返回两个DataFrame相同的Rows

原文地址:https://www.cnblogs.com/fenghuoliancheng/p/10483073.html