小言_互联网的博客

spark -- RDD数据源 (读取小文件 数据写入MySQL并读取 spark-HadoopAPI SequenceFile 对象文件 数据写入hbase并读取 )

305人阅读  评论(0)

 

RDD数据源

 普通文本文件

sc.textFile("./dir/*.txt")

如果传递目录,则将目录下的所有文件读取作为RDD。文件路径支持通配符。

但是这样对于大量的小文件读取效率并不高,应该使用wholeTextFiles

def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)])

返回值RDD[(String, String)],其中Key是文件的名称Value是文件的内容


  
  1. def main(args: Array[ String]): Unit = {
  2. val config = new SparkConf().setAppName( "DataSourceTest").setMaster( "local[*]")
  3. val sc = new SparkContext(config)
  4. sc.setLogLevel( "WARN")
  5. println( "读取小文件")
  6. val filesRDD: RDD[( String, String)] = sc.wholeTextFiles( "D:\\data\\spark\\files", minPartitions = 3)
  7. val linesRDD: RDD[ String] = filesRDD.flatMap(_._2. split( "\\r\\n"))
  8. val wordsRDD: RDD[ String] = linesRDD.flatMap(_. split( " "))
  9. wordsRDD.map((_, 1)).reduceByKey(_ + _).collect().foreach(println)
  10. }

 

JDBC

Spark支持通过Java JDBC访问关系型数据库。需要使用JdbcRDD

数据入MySQL表代码演示


  
  1. /**
  2. * 写入数据到mysql数据库
  3. * @param args
  4. */
  5. def main(args: Array[String]): Unit = {
  6. val conf = new SparkConf().setAppName( "JDBCDataSourceTest").setMaster( "local[*]")
  7. val sc = new SparkContext(conf)
  8. sc.setLogLevel( "warn")
  9. //读取数据
  10. val data = sc.textFile( "file:////E:\\user.txt")
  11. val data29 = sc.textFile( "input20200407/users.txt")
  12. data.foreachPartition{
  13. data1=>
  14. val connection = DriverManager.getConnection(
  15. "jdbc:mysql://localhost:3306/bigdata0407?characterEncoding=UTF-8",
  16. "root", "root")
  17. val sql = "INSERT INTO `user` values (NULL,?,?,?,?)"
  18. data1.foreach{
  19. data2=>{
  20. val datas = data2.split( " ")
  21. val statement = connection.prepareStatement(sql)
  22. statement.setString( 1, datas( 0))
  23. statement.setString( 2, datas( 1))
  24. statement.setString( 3, datas( 2))
  25. statement.setString( 4, datas( 3))
  26. statement.executeUpdate()
  27. statement.close()
  28. }
  29. }
  30. }
  31. }

读取MySQL数据库中表数据代码演示


  
  1. /**
  2. * 读取MySQL表数据
  3. * @param args
  4. */
  5. def main(args: Array[String]): Unit = {
  6. val conf = new SparkConf().setAppName( "JDBCDataSourceTest").setMaster( "local[*]")
  7. val sc = new SparkContext(conf)
  8. sc.setLogLevel( "warn")
  9. var sql = "select * from `user` where id between ? and ?"
  10. val user = new JdbcRDD(sc,
  11. () => {
  12. DriverManager.getConnection( "jdbc:mysql://localhost:3306/bigdata0407?characterEncoding=UTF-8", "root", "root")
  13. },
  14. sql,
  15. 0,
  16. 8,
  17. 2,
  18. result => {
  19. println( "id=" + result.getInt( 1) +
  20. ",username=" + result.getString( 2) +
  21. ",birthday=" + result.getString( 3) +
  22. ",sex=" + result.getString( 4)
  23. )
  24. }
  25. )
  26. user.collect()
  27. }

 

 HadoopAPI

Spark的整个生态系统与Hadoop是完全兼容的,所以对于Hadoop所支持的文件类型或者数据库类型,Spark也同样支持。

HadoopRDD、newAPIHadoopRDD、saveAsHadoopFile、saveAsNewAPIHadoopFile 是底层API

其他的API接口都是为了方便最终的Spark程序开发者而设置的,是这两个接口的高效实现版本.

代码演示:


  
  1. def main(args: Array[String]): Unit = {
  2. val config = new SparkConf().setAppName( "DataSourceTest").setMaster( "local[*]")
  3. val sc = new SparkContext(config)
  4. sc.setLogLevel( "WARN")
  5. System.setProperty( "HADOOP_USER_NAME", "root")
  6. //1.HadoopAPI
  7. println( "HadoopAPI")
  8. val dataRDD = sc.parallelize(Array(( 1, "hadoop"), ( 2, "hive"), ( 3, "spark")))
  9. dataRDD.saveAsNewAPIHadoopFile( "hdfs://node01:8020/spark_hadoop/",
  10. classOf[LongWritable],
  11. classOf[Text],
  12. classOf[TextOutputFormat[LongWritable, Text]])
  13. val inputRDD: RDD[(LongWritable, Text)] = sc.newAPIHadoopFile(
  14. "hdfs://node01:8020/spark_hadoop/*",
  15. classOf[TextInputFormat],
  16. classOf[LongWritable],
  17. classOf[Text],
  18. conf = sc.hadoopConfiguration
  19. )
  20. inputRDD. map(_._2.toString).foreach( println)
  21. }

 

SequenceFile文件

SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)。

读sc.sequenceFile[ keyClass, valueClass](path)

写RDD.saveAsSequenceFile(path)

要求键和值能够自动转为Writable类型。


  
  1. def main(args: Array[String]): Unit = {
  2. val config = new SparkConf().setAppName( "DataSourceTest").setMaster( "local[*]")
  3. val sc = new SparkContext(config)
  4. sc.setLogLevel( "WARN")
  5. //3.操作SequenceFile
  6. println( "SequenceFile")
  7. val dataRDD2: RDD[( Int, String)] = sc.parallelize(List(( 2, "aa"), ( 3, "bb"), ( 4, "cc"), ( 5, "dd"), ( 6, "ee")))
  8. dataRDD2.saveAsSequenceFile( "D:\\data\\spark\\SequenceFile")
  9. val sdata: RDD[( Int, String)] = sc.sequenceFile[ Int, String]( "D:\\data\\spark\\SequenceFile\\*")
  10. sdata.collect().foreach(println)
  11. }

 

对象文件

对象文件是将对象序列化后保存的文件

读sc.objectFile[k,v](path) //因为是序列化所以要指定类型

写RDD.saveAsObjectFile()


  
  1. def main(args: Array[ String]): Unit = {
  2. val config = new SparkConf().setAppName( "DataSourceTest").setMaster( "local[*]")
  3. val sc = new SparkContext(config)
  4. sc.setLogLevel( "WARN")
  5. println( "ObjectFile")
  6. val dataRDD3 = sc.parallelize(List(( 2, "aa"), ( 3, "bb"), ( 4, "cc"), ( 5, "dd"), ( 6, "ee")))
  7. dataRDD3.saveAsObjectFile( "D:\\data\\spark\\ObjectFile")
  8. val objRDD = sc.objectFile[( Int, String)]( "D:\\data\\spark\\ObjectFile\\*")
  9. objRDD.collect().foreach(println)
  10. }

 

HBase

由于 org.apache.hadoop.hbase.mapreduce.TableInputFormat 类的实现,Spark 可以通过Hadoop输入格式访问HBase。

这个输入格式会返回键值对数据,

其中键的类型为org. apache.hadoop.hbase.io.ImmutableBytesWritable,

而值的类型为org.apache.hadoop.hbase.client.Result。

上代码:(创建hbase表,添加数据)


  
  1. //创建SparkConf
  2. val config = new SparkConf().setAppName( "sp").setMaster( "local[*]")
  3. val sc = new SparkContext(config)
  4. sc.setLogLevel( "WARN")
  5. //连接hbase
  6. val conf = HBaseConfiguration.create()
  7. conf.set( "hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181")
  8. //创建animal表
  9. val animaltable = TableName.valueOf( "animal")
  10. //设置表结构
  11. val TableDesc = new HTableDescriptor(animaltable)
  12. //设置列族
  13. TableDesc.addFamily( new HColumnDescriptor( "info".getBytes))
  14. //创建hbase管理员
  15. val admin = new HBaseAdmin(conf)
  16. //判断是否存在表
  17. if (admin.tableExists(animaltable)){
  18. //废除表
  19. admin.disableTable(animaltable)
  20. //删除表
  21. admin.deleteTable(animaltable)
  22. }
  23. //创建表
  24. admin.createTable(TableDesc)
  25. //插入数据
  26. def convert(triple: ( String, String, String))={
  27. //创建put对象,并指定rowkey
  28. val put = new Put(Bytes.toBytes(triple._1))
  29. //创建列族下列名
  30. put.addImmutable(Bytes.toBytes( "info"),Bytes.toBytes( "name"),Bytes.toBytes(triple._2))
  31. put.addImmutable(Bytes.toBytes( "info"),Bytes.toBytes( "food"),Bytes.toBytes(triple._3))
  32. //执行put
  33. ( new ImmutableBytesWritable,put)
  34. }
  35. //创建数据
  36. val dataRDD: RDD[( String, String, String)] = sc.parallelize(List(( "1", "老虎", "肉"), ( "2", "斑马", "草"), ( "3", "猪", "饲料")))
  37. //添加数据到方法
  38. val targetRDD: RDD[(ImmutableBytesWritable, Put)] = dataRDD.map(convert)
  39. //创建JobConf
  40. val jobConf = new JobConf(conf)
  41. //创建输出流
  42. jobConf.setOutputFormat(classOf[TableOutputFormat])
  43. //输入流名
  44. jobConf.set(TableOutputFormat.OUTPUT_TABLE, "animal")
  45. //写入数据
  46. targetRDD.saveAsHadoopDataset(jobConf)
  47. println( "写入数据成功")

(读取hbase表数据)


  
  1. //读取数据
  2. //创建SparkConf
  3. val config = new SparkConf().setAppName( "sp").setMaster( "local[*]")
  4. val sc = new SparkContext(config)
  5. sc.setLogLevel( "WARN")
  6. //连接hbase
  7. val conf = HBaseConfiguration.create()
  8. conf. set( "hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181")
  9. //设置输入流,读取表
  10. conf. set( TableInputFormat. INPUT_TABLE, "animal")
  11. //
  12. val hbaseRDD: RDD[( ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(
  13. conf,
  14. classOf[ TableInputFormat], //table
  15. classOf[ ImmutableBytesWritable], //table rowkey
  16. classOf[ Result] //resultset
  17. )
  18. //个数
  19. val count = hbaseRDD. count()
  20. println( "hBaseRDD RDD Count:"+ count)
  21. //便利
  22. hbaseRDD.foreach {
  23. case ( _, result) =>
  24. //获取rowkey
  25. val key = Bytes. toString(result.getRow)
  26. //获取name
  27. val name = Bytes. toString(result.getValue( "info".getBytes, "name".getBytes))
  28. //获取food
  29. val food = Bytes. toString(result.getValue( "info".getBytes, "food".getBytes))
  30. //拼接输出
  31. println( "Row key:" + key + " Name:" + name + " food:" + food)
  32. }
  33. //关闭sc
  34. sc.stop()

 


转载:https://blog.csdn.net/weixin_44036154/article/details/105391368
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场