hive中shuffle的优化
-
压缩
压缩可以使磁盘上存储的数据量变小,通过降低I/O来提高查询速度。对hive产生的一系列MR中间过程启用压缩
-
set hive.exec.compress.intermediate= true;
-
set mapred.map.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
对最终输出结果压缩(写到hdfs、本地磁盘的文件)
-
set hive.exec.compress.output= true;
-
set mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
-
- join优化
-
如果关联查询两张表中有一张小表默认map join,将小表加入内存
-
hive.mapjoin.smalltable.filesize= 25000000 默认大小
-
hive. auto.convert. join= true 默认开启
-
如果没有开启使用mapjoin,使用语句制定小表使用mapjoin
-
```sql
-
select /*+ MAPJOIN(time_dim) */ count( 1) from
-
store_sales join time_dim on (ss_sold_time_sk = t_time_sk)
-
-
3. 倾斜连接
-
```xml
-
<!-- hive.optimize.skewjoin:是否为连接表中的倾斜键创建单独的执行计划。它基于存储在元数据中的倾斜键。在编译时,Hive为倾斜键和其他键值生成各自的查询计 划。 -->
-
<property>
<name>hive.optimize.skewjoin
</name>
-
<value>true
</value>
-
</property>
<property>
-
-
<!-- hive.skewjoin.key:决定如何确定连接中的倾斜键。在连接操作中,如果同一键值所对应的数据行数超过该参数值,则认为该键是一个倾斜连接键。 -->
-
<name>hive.skewjoin.key
</name>
-
<value>100000
</value>
-
</property>
-
-
<!-- hive.skewjoin.mapjoin.map.tasks:指定倾斜连接中,用于Map连接作业的任务数。该参数应该与hive.skewjoin.mapjoin.min.split一起使用,执行细粒度的控制。 -->
-
<property>
<name>hive.skewjoin.mapjoin.map.tasks
</name>
-
<value>10000
</value>
-
</property>
-
-
<!-- hive.skewjoin.mapjoin.min.split:通过指定最小split的大小,确定Map连接作业的任务数。该参数应该与hive.skewjoin.mapjoin.map.tasks一起使用,执行细粒度的控制。 -->
-
<property>
-
<name>hive.skewjoin.mapjoin.min.split
</name>
-
<value>33554432
</value>
-
</property>
-
```
- Hive在集群过程中怎么解决数据倾斜
本质原因:key的分布不均导致的
Map 端部分聚合,相当于Combiner
hive.map.aggr=true
hive.groupby.skewindata=true
当选项设定为 true,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。
sqoop要将数据库中的所有表执行导入,怎么操作?哪些参数?增量导入?
-
[hadoop
@linux03 sqoop-
1.4.5-cdh5.3.6]$ bin/sqoop
import \
-
> --connect jdbc:mysql:
//linux03.ibf.com:3306/mydb \
-
> --username root \
-
> --password
123456 \
-
> --table user
-
bin/sqoop import \
-
--connect jdbc:mysql://linux03.ibf.com:3306/mydb \
-
--username root \
-
--password 123456 \
-
--table user \
-
--fields-terminated-by '\t' \
-
--target-dir /sqoop/incremental \
-
-m 1 \
-
--direct \
-
--check-column id \
-
--incremental append \
-
--last-value 3
hive导致数据倾斜的可能性(哪些操作会导致) -->分桶 join key分布不均匀 大量空值导致如何解决?
根据key操作到时结果分布不均都可能导致数据倾斜,如group by key
order by 使用全局排序最终只会在一个reducer上运行所有数据,导致数据倾斜
大量NULL
hive的NULL有时候是必须的:
1)hive中insert语句必须列数匹配,不支持不写入,没有值的列必须使用null占位。
2)hive表的数据文件中按分隔符区分各个列。空列会保存NULL(n)来保留列位置。但外部表加载某些数据时如果列不够,如表13列,文件数据只有2列,则在表查询时表中的末尾剩余列无数据对应,自动显示为NULL。
所以,NULL转化为空字符串,可以节省磁盘空间,实现方法有几种
-
a、用语句
-
ROW FORMAT SERDE ‘org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe’
-
with serdeproperties(
'serialization.null.format' =
'')
-
实现,注意两者必须一起使用,如
-
CREATE
TABLE hive_tb (
id
int,
name
STRING)
-
PARTITIONED
BY (
`day`
string,
`type` tinyint
COMMENT
'0 as bid, 1 as win, 2 as ck',
`hour` tinyint)
-
ROW
FORMAT SERDE ‘org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe’
-
WITH SERDEPROPERTIES (
-
‘field.delim’=
'/t’,
-
‘escape.delim’='//’,
-
‘serialization.null.format
'=''
-
) STORED AS TEXTFILE;
hive中如何增加一列数据?
新增一列
-
hive >
alter table log_messages add coloumns(
-
app_name string comment 'Application name',
-
session_id long comment 'The current session id'
-
);
-
-- 增加列的表的最后一个字段之后,在分区字段之前添加。
如果新增一列是分区,则可以新增数据到该分区下
insert into table clear partition(date='20150828',hour='18') select id,url,guid from tracklogs where date='20150828' and hour='18';
- 运行spark
-
有没有hive处理过json?有哪些函数?
-
建表时制定jar包处理json数据
- 首先添加jar包
-
-
2. 建表
-
```
-
hive (
default)> ADD JAR hcatalog/share/hcatalog/hive-hcatalog-core-
1.1.
0-cdh5.
14.2.jar;
-
Added [hcatalog/share/hcatalog/hive-hcatalog-core-
1.1.
0-cdh5.
14.2.jar]
to
class path
-
Added resources: [hcatalog/share/hcatalog/hive-hcatalog-core-
1.1.
0-cdh5.
14.2.jar]
-
hive (
default)> create table spark_people_json(
-
>
-
> `
name`
string,
-
>
-
> `age` int)
-
>
-
> ROW FORMAT SERDE
'org.apache.hive.hcatalog.data.JsonSerDe'
-
>
-
>
STORED
AS TEXTFILE;
-
OK
-
Time taken:
4.445 seconds
-
2. 记录下如果只是某个字段为json,想要获取里面的某个值怎么操作?
-
1. get_json_object()
-
-
只能获取一个字段
-
```sql
-
select get_json_object('{
"shop":{
"book":[{
"price":
43.3,
"type":
"art"},{
"price":
30,
"type":
"technology"}],
"clothes":{
"price":
19.951,
"type":
"shirt"}},
"name":
"jane",
"age":
"23"}', '$.shop.book[
0].type');
-
```
-
-
2. json_tuple()
-
-
可以获取多个字段
-
```sql
-
select json_tuple('{
"name":
"jack",
"server":
"www.qq.com"}','server','name')
-
```
-
-
3. 自行编写
UDF
- sparkstreaming和kafka集成中 精确一次的数据消费如何实现?
使用直接连接方式
-
消息语义有几种?
- at least once -- 消息绝不会丢,但可能会重复传输
- at most once -- 消息可能会丢,但绝不会重复传输
- exactly once -- 每条消息肯定会被传输一次且仅传输一次,很多时候这是用户所想要的
- kafka的消费者怎么去保证精确一次的消费?
-
sparkstreaming和kafka集成有几种方式?
- Receiver-based Approach
- Direct Approach (No Receivers) native Offsets
- 怎么实现sparkstreaming?
初始化 StreamingContext
通过创建输入DStreams来定义输入源。
通过将转换和输出操作应用于DStream来定义流式计算。
开始接收数据并使用它进行处理streamingContext.start()。
等待处理停止(手动或由于任何错误)使用streamingContext.awaitTermination()。
可以使用手动停止处理streamingContext.stop()。
- 项目中有几个人 如何分配?
- 所在项目中的存储架构?
- 开发工具用的是什么?(什么情况用什么工具/xshell/idea)
- 代码怎么去做的管理(git)?
-
hive中如何去统计每周一,每个月的第一天的pv?
获取指定日期月份的第一天、年份的第一天
-
select trunc('2019-02-24', 'YYYY');
-
-
select trunc('2019-02-24', 'MM');
指定日期下周的指定周几
select next_day('2019-02-24', 'TU');
按指定格式返回指定日期增加几个月后的日期
-
select add_months('2019-02-28', 1);
-
-
select add_months('2019-02-24 21:15:16', 2, 'YYYY-MM-dd HH:mm:ss');
select count(guid) from table group by trunc(date, 'MM')
select count(guid) from table group by next_day('2019-06-08', 'MONDAY');
-
- dataset和dataframe区别?
Spark2.x之后,官方已经将 ( DataFrame ) /Dataset (数据集)API的进行了 统一 ,DataFrame仅 是Dataset中每个元素为Row类型的时候
不同之处在于 Dataset 是 strongly typed (强类型的) ,而dataframe则是 untypedrel (弱类型的)
-
项目中hive的元数据在哪儿保存?
- 若没有制定将metastore保存到指定的数据库,则metastore默认存放在hive自带的deybe数据库中,这就是安装hive的嵌入模式
- 若在设置中制定外部数据库,则保存在该数据库中,本地模式和远程模式使用这种方式保存metastore
-
元数据怎么保证他的安全性?
修改元数据所用的用户名和密码
-
<property>
-
<name>javax.jdo.option.ConnectionUserName </name>
-
<value>root </value>
-
</property>
-
<property>
-
<name>javax.jdo.option.ConnectionPassword </name>
-
<value>123456 </value>
-
</property>
在mysql端设置metastore数据库的访问权限
-
-
sqoop导入导出有几种方式?增量导出?
导入
-
全量导入
-
```
-
[hadoop@linux03 sqoop-1.4.5-cdh5.3.6]$ bin/sqoop import \
-
> --connect jdbc:mysql://linux03.ibf.com:3306/test_db \
-
> --username root \
-
> --password root \
-
> --table toHdfs \
-
> --target-dir /toHdfs \
-
> --direct \
-
> --delete-target-dir \
-
> --fields-terminated-by '\t' \
-
> -m 1
-
```
-
-
增量导入append
-
```sh
-
bin/sqoop import \
-
--connect jdbc:mysql://linux03.ibf.com:3306/mydb \
-
--username root \
-
--password 123456 \
-
--table user \
-
--fields-terminated-by '\t' \
-
--target-dir /sqoop/incremental \
-
-m 1 \
-
--direct \
-
--check-column id \
-
--incremental append \
-
--last-value 3
-
```
-
-
增量导入lastmodified
-
表中必须有一列指示时间
-
```
-
sqoop import \
-
--connect jdbc:mysql://master:3306/test \
-
--username hive \
-
--password 123456 \
-
--table customertest \
-
--check-column last_mod \
-
--incremental lastmodified \
-
--last-value "2016-12-15 15:47:29" \
-
-m 1 \
-
--append
-
```
-
导出
-
插入
-
默认情况下,sqoop-export将新行添加到表中;每行输入记录都被转换成一条 INSERT语句,将此行记录添加到目标数据库表中。如果数据库中的表具有约束条件(例如,其值必须唯一的主键列)并且已有数据存在,则必须注意避免插入违反这些约束条件的记录。如果 INSERT语句失败,导出过程将失败。此模式主要用于将记录导出到可以接收这些结果的空表中。
-
-
更新
-
如果指定了 --update-key参数,则Sqoop将改为修改数据库中表中现有的数据。每个输入记录都将转化为UPDATE语句修改现有数据。语句修改的行取决于--update-key指定的列名,如果数据库中的表中不存在的数据,那么也不会插入。
-
根据目标数据库,如果要更新已存在于数据库中的行,或者如果行尚不存在则插入行,则还可以 --update-mode 使用allowinsert模式
-
sparkstreaming窗口函数的三个参数是怎么设计的?
一般窗口长度大于滑动间隔
增加窗口宽度,对一个窗口中的数据操作就可以做到用sparkstreaming跑批
https://segmentfault.com/a/1190000019420556
转载:https://blog.csdn.net/xiamaocheng/article/details/104544942