RDD数据源
普通文本文件
sc.textFile("./dir/*.txt")
如果传递目录,则将目录下的所有文件读取作为RDD。文件路径支持通配符。
但是这样对于大量的小文件读取效率并不高,应该使用wholeTextFiles
def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)])
返回值RDD[(String, String)],其中Key是文件的名称,Value是文件的内容。
-
def main(args:
Array[
String]): Unit = {
-
val config =
new SparkConf().setAppName(
"DataSourceTest").setMaster(
"local[*]")
-
val sc =
new SparkContext(config)
-
sc.setLogLevel(
"WARN")
-
println(
"读取小文件")
-
val filesRDD: RDD[(
String,
String)] = sc.wholeTextFiles(
"D:\\data\\spark\\files", minPartitions =
3)
-
val linesRDD: RDD[
String] = filesRDD.flatMap(_._2.
split(
"\\r\\n"))
-
val wordsRDD: RDD[
String] = linesRDD.flatMap(_.
split(
" "))
-
wordsRDD.map((_,
1)).reduceByKey(_ + _).collect().foreach(println)
-
}
JDBC
Spark支持通过Java JDBC访问关系型数据库。需要使用JdbcRDD
数据入MySQL表代码演示
-
/**
-
* 写入数据到mysql数据库
-
* @param args
-
*/
-
def main(args: Array[String]):
Unit = {
-
val conf = new SparkConf().setAppName(
"JDBCDataSourceTest").setMaster(
"local[*]")
-
val sc = new SparkContext(conf)
-
sc.setLogLevel(
"warn")
-
//读取数据
-
val
data = sc.textFile(
"file:////E:\\user.txt")
-
val data29 = sc.textFile(
"input20200407/users.txt")
-
data.foreachPartition{
-
data1=>
-
val connection = DriverManager.getConnection(
-
"jdbc:mysql://localhost:3306/bigdata0407?characterEncoding=UTF-8",
-
"root",
"root")
-
val sql =
"INSERT INTO `user` values (NULL,?,?,?,?)"
-
data1.foreach{
-
data2=>{
-
val datas = data2.split(
" ")
-
val statement = connection.prepareStatement(sql)
-
statement.setString(
1, datas(
0))
-
statement.setString(
2, datas(
1))
-
statement.setString(
3, datas(
2))
-
statement.setString(
4, datas(
3))
-
statement.executeUpdate()
-
statement.close()
-
}
-
}
-
}
-
}
读取MySQL数据库中表数据代码演示
-
/**
-
* 读取MySQL表数据
-
* @param args
-
*/
-
def main(args:
Array[String]): Unit = {
-
val conf =
new SparkConf().setAppName(
"JDBCDataSourceTest").setMaster(
"local[*]")
-
val sc =
new SparkContext(conf)
-
sc.setLogLevel(
"warn")
-
var sql =
"select * from `user` where id between ? and ?"
-
val user =
new JdbcRDD(sc,
-
() => {
-
DriverManager.getConnection(
"jdbc:mysql://localhost:3306/bigdata0407?characterEncoding=UTF-8",
"root",
"root")
-
},
-
sql,
-
0,
-
8,
-
2,
-
result => {
-
println(
"id=" + result.getInt(
1) +
-
",username=" + result.getString(
2) +
-
",birthday=" + result.getString(
3) +
-
",sex=" + result.getString(
4)
-
)
-
}
-
)
-
user.collect()
-
}
HadoopAPI
Spark的整个生态系统与Hadoop是完全兼容的,所以对于Hadoop所支持的文件类型或者数据库类型,Spark也同样支持。
HadoopRDD、newAPIHadoopRDD、saveAsHadoopFile、saveAsNewAPIHadoopFile 是底层API
其他的API接口都是为了方便最终的Spark程序开发者而设置的,是这两个接口的高效实现版本.
代码演示:
-
def main(args: Array[String]): Unit = {
-
val config =
new SparkConf().setAppName(
"DataSourceTest").setMaster(
"local[*]")
-
val sc =
new SparkContext(config)
-
sc.setLogLevel(
"WARN")
-
-
System.setProperty(
"HADOOP_USER_NAME",
"root")
-
-
//1.HadoopAPI
-
println(
"HadoopAPI")
-
val dataRDD = sc.parallelize(Array((
1,
"hadoop"), (
2,
"hive"), (
3,
"spark")))
-
-
dataRDD.saveAsNewAPIHadoopFile(
"hdfs://node01:8020/spark_hadoop/",
-
classOf[LongWritable],
-
classOf[Text],
-
classOf[TextOutputFormat[LongWritable, Text]])
-
-
val inputRDD: RDD[(LongWritable, Text)] = sc.newAPIHadoopFile(
-
"hdfs://node01:8020/spark_hadoop/*",
-
classOf[TextInputFormat],
-
classOf[LongWritable],
-
classOf[Text],
-
conf = sc.hadoopConfiguration
-
)
-
inputRDD.
map(_._2.toString).foreach(
println)
-
}
SequenceFile文件
SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)。
读sc.sequenceFile[ keyClass, valueClass](path)
写RDD.saveAsSequenceFile(path)
要求键和值能够自动转为Writable类型。
-
def main(args: Array[String]):
Unit = {
-
val config = new SparkConf().setAppName(
"DataSourceTest").setMaster(
"local[*]")
-
val sc = new SparkContext(config)
-
sc.setLogLevel(
"WARN")
-
//3.操作SequenceFile
-
println(
"SequenceFile")
-
val dataRDD2: RDD[(
Int, String)] = sc.parallelize(List((
2,
"aa"), (
3,
"bb"), (
4,
"cc"), (
5,
"dd"), (
6,
"ee")))
-
dataRDD2.saveAsSequenceFile(
"D:\\data\\spark\\SequenceFile")
-
val sdata: RDD[(
Int, String)] = sc.sequenceFile[
Int, String](
"D:\\data\\spark\\SequenceFile\\*")
-
sdata.collect().foreach(println)
-
}
对象文件
对象文件是将对象序列化后保存的文件
读sc.objectFile[k,v](path) //因为是序列化所以要指定类型
写RDD.saveAsObjectFile()
-
def main(args:
Array[
String]): Unit = {
-
val config =
new SparkConf().setAppName(
"DataSourceTest").setMaster(
"local[*]")
-
val sc =
new SparkContext(config)
-
sc.setLogLevel(
"WARN")
-
println(
"ObjectFile")
-
val dataRDD3 = sc.parallelize(List((
2,
"aa"), (
3,
"bb"), (
4,
"cc"), (
5,
"dd"), (
6,
"ee")))
-
dataRDD3.saveAsObjectFile(
"D:\\data\\spark\\ObjectFile")
-
val objRDD = sc.objectFile[(
Int,
String)](
"D:\\data\\spark\\ObjectFile\\*")
-
objRDD.collect().foreach(println)
-
}
HBase
由于 org.apache.hadoop.hbase.mapreduce.TableInputFormat 类的实现,Spark 可以通过Hadoop输入格式访问HBase。
这个输入格式会返回键值对数据,
其中键的类型为org. apache.hadoop.hbase.io.ImmutableBytesWritable,
而值的类型为org.apache.hadoop.hbase.client.Result。
上代码:(创建hbase表,添加数据)
-
//创建SparkConf
-
val config =
new SparkConf().setAppName(
"sp").setMaster(
"local[*]")
-
val sc =
new SparkContext(config)
-
sc.setLogLevel(
"WARN")
-
//连接hbase
-
val conf = HBaseConfiguration.create()
-
conf.set(
"hbase.zookeeper.quorum",
"node01:2181,node02:2181,node03:2181")
-
//创建animal表
-
val animaltable = TableName.valueOf(
"animal")
-
//设置表结构
-
val TableDesc =
new HTableDescriptor(animaltable)
-
//设置列族
-
TableDesc.addFamily(
new HColumnDescriptor(
"info".getBytes))
-
//创建hbase管理员
-
val admin =
new HBaseAdmin(conf)
-
//判断是否存在表
-
if (admin.tableExists(animaltable)){
-
//废除表
-
admin.disableTable(animaltable)
-
//删除表
-
admin.deleteTable(animaltable)
-
}
-
//创建表
-
admin.createTable(TableDesc)
-
//插入数据
-
def convert(triple: (
String,
String,
String))={
-
//创建put对象,并指定rowkey
-
val put =
new Put(Bytes.toBytes(triple._1))
-
//创建列族下列名
-
put.addImmutable(Bytes.toBytes(
"info"),Bytes.toBytes(
"name"),Bytes.toBytes(triple._2))
-
put.addImmutable(Bytes.toBytes(
"info"),Bytes.toBytes(
"food"),Bytes.toBytes(triple._3))
-
//执行put
-
(
new ImmutableBytesWritable,put)
-
}
-
//创建数据
-
val dataRDD: RDD[(
String,
String,
String)] = sc.parallelize(List((
"1",
"老虎",
"肉"), (
"2",
"斑马",
"草"), (
"3",
"猪",
"饲料")))
-
//添加数据到方法
-
val targetRDD: RDD[(ImmutableBytesWritable, Put)] = dataRDD.map(convert)
-
//创建JobConf
-
val jobConf =
new JobConf(conf)
-
//创建输出流
-
jobConf.setOutputFormat(classOf[TableOutputFormat])
-
//输入流名
-
jobConf.set(TableOutputFormat.OUTPUT_TABLE,
"animal")
-
//写入数据
-
targetRDD.saveAsHadoopDataset(jobConf)
-
println(
"写入数据成功")
(读取hbase表数据)
-
//读取数据
-
//创建SparkConf
-
val config = new
SparkConf().setAppName(
"sp").setMaster(
"local[*]")
-
val sc = new
SparkContext(config)
-
sc.setLogLevel(
"WARN")
-
//连接hbase
-
val conf =
HBaseConfiguration.create()
-
conf.
set(
"hbase.zookeeper.quorum",
"node01:2181,node02:2181,node03:2181")
-
//设置输入流,读取表
-
conf.
set(
TableInputFormat.
INPUT_TABLE,
"animal")
-
//
-
val hbaseRDD:
RDD[(
ImmutableBytesWritable,
Result)] = sc.newAPIHadoopRDD(
-
conf,
-
classOf[
TableInputFormat],
//table
-
classOf[
ImmutableBytesWritable],
//table rowkey
-
classOf[
Result]
//resultset
-
)
-
//个数
-
val
count = hbaseRDD.
count()
-
println(
"hBaseRDD RDD Count:"+
count)
-
//便利
-
hbaseRDD.foreach {
-
case (
_, result) =>
-
//获取rowkey
-
val key =
Bytes.
toString(result.getRow)
-
//获取name
-
val name =
Bytes.
toString(result.getValue(
"info".getBytes,
"name".getBytes))
-
//获取food
-
val food =
Bytes.
toString(result.getValue(
"info".getBytes,
"food".getBytes))
-
//拼接输出
-
println(
"Row key:" + key +
" Name:" + name +
" food:" + food)
-
}
-
//关闭sc
-
sc.stop()
转载:https://blog.csdn.net/weixin_44036154/article/details/105391368