小言_互联网的博客

HDFS文件系统操作与编程

491人阅读  评论(0)

由于这是自己的个人博客迁移过来的,就没有截图了,主要提供一些代码的参考。

HDFS常用Shell命令操作

我这里已经添加环境变量

gedit ~/.bashrc
export PATH=$PATH:/usr/local/hadoop/sbin:/usr/local/hadoop/bin:/usr/local/hbase/bin

所以直接运行hadoop:start-dfs.sh
说明一下:

  1. hadoop fs
  2. hadoop dfs
  3. hdfs dfs
    hadoop fs适用于任何不同的文件系统,比如本地文件系统和HDFS文件系统
    hadoop dfs只能适用于HDFS文件系统
    hdfs dfs跟hadoop dfs的命令作用一样,也只能适用于HDFS文件系统
    查看put命令如何使用:
hadoop fs -help put
hdfs dfs -help put

创建文件目录/user/hadoop,-p表示多级目录一起创建

hdfs dfs -mkdir -p /user/hadoop

查看:

hdfs dfs -ls

创建目录:

hdfs dfs -mkdir /input
hdfs dfs -mkdir input
  • 至于两个的区别嘛,自己进入web管理界面去发现咯。3.X的端口为9870
http://localhost:9870/explorer.html#/

删除目录:

hdfs dfs -rm -r /input
  • 这里给大家讲一个小知识:/home/wangyang其实可以写成~/,你可以打开桌面的终端输入pwd查看当前的目录看看
    上传文件:(test.txt自己创建)
hdfs dfs -put ~/test.txt input

下载到本地的下载目录:

hdfs dfs -get input/test.txt ~/下载

复制文件的命令:

hdfs dfs -cp input/test.txt /input

Java,HDFS文件数据载入

如何新建项目我就不过多阐述,主要是导入jar包,直接导入全部,避免出错,导入如下目录的所有包:

  • /usr/local/hadoop/share/hadoop/common”该目录下的所有JAR包(不包括子目录下面的);
  • /usr/local/hadoop/share/hadoop/common/lib”目录下的所有JAR包;
  • /usr/local/hadoop/share/hadoop/hdfs”目录下的所有JAR包(不包括子目录下面的);
  • /usr/local/hadoop/share/hadoop/hdfs/lib”目录下的所有JAR包。

1.判定文件是否存在

代码如下:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class HDFSFileIfExist {
    public static void main(String[] args){
        try{
            String fileName = "/user/wangyang/input/test.txt";//你的文件路径,没有就显示不存在
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS", "hdfs://localhost:9000");
            conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
            FileSystem fs = FileSystem.get(conf);
            if(fs.exists(new Path(fileName))){
                System.out.println("文件存在");
            }else{
                System.out.println("文件不存在");
            }
 
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

2.Linux本地文件上传到HDFS

代码如下:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class LinuxToHdfs {
	public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://localhost:9000");
        conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
        String localDir = "/home/wangyang/test2.txt";//本地路径
        String hdfsDir = "/user/wangyang/input/";//HDFS文件路径
        try{
            Path localPath = new Path(localDir);
            Path hdfsPath = new Path(hdfsDir);
            FileSystem hdfs = FileSystem.get(conf);
            hdfs.copyFromLocalFile(localPath,hdfsPath);
            System.out.println("上传成功");
        }catch(Exception e){
            e.printStackTrace();
        }
    }
}

3.将文本写入HDFS文件

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
public class InputFile {
	public static void main(String[] args) {
		try {
                Configuration conf = new Configuration();
                conf.set("fs.defaultFS","hdfs://localhost:9000");
                conf.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem");
                FileSystem fs = FileSystem.get(conf);
                byte[] buff = "www.wangyang0821.club".getBytes(); // 要写入的内容
                String filename = "/user/wangyanng/input/test.txt"; //要写入的文件名
                FSDataOutputStream os = fs.create(new Path(filename));
                os.write(buff,0,buff.length);
                System.out.println("Create:"+ filename);
                os.close();
                fs.close();
        } catch (Exception e) {
                e.printStackTrace();
        }
   }
}

4.读取HDFS文本文件内容

import java.io.BufferedReader;
import java.io.InputStreamReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataInputStream;
public class OutFile {
    public static void main(String[] args) {
        try {
                Configuration conf = new Configuration();
                conf.set("fs.defaultFS","hdfs://localhost:9000");
                conf.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem");
                FileSystem fs = FileSystem.get(conf);
                Path file = new Path("/user/wangyang/input/test.txt"); 
                FSDataInputStream getIt = fs.open(file);
                BufferedReader d = new BufferedReader(new InputStreamReader(getIt));
                String content = d.readLine(); //读取文件一行
                System.out.println(content);
                d.close(); //关闭文件
                fs.close(); //关闭hdfs
        } catch (Exception e) {
                e.printStackTrace();
        }
    }
}

5.文件的过滤与合并

import java.io.IOException;
import java.io.PrintStream;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
public class FilterMergeFile {
    Path inputPath = null;  //待合并的文件所在的目录的路径
    Path outputPath = null; //输出文件的路径
    public FilterMergeFile(String input, String output){
        this.inputPath = new Path(input);
        this.outputPath = new Path(output);
    }
    public void doMerge() throws IOException{
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS","hdfs://localhost:9000" );
        conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
        FileSystem fsSource = FileSystem.get(URI.create(inputPath.toString()),conf);
        FileSystem fsDst = FileSystem.get(URI.create(outputPath.toString()),conf);
        FileStatus[] sourceStatus = fsSource.listStatus(inputPath, new myPathFilter(".*\\.abc"));   //过滤掉目录中后缀为.abc的文件
        FSDataOutputStream fsdos = fsDst.create(outputPath);
        //下面分别读取过滤之后的每个文件的内容,并输出到同一个文件中
        for(int i=0;i<sourceStatus.length;i++){
            System.out.println("路径: " + sourceStatus[i].getPath()+ "   文件大小: " + sourceStatus[i].getLen() + "   权限: " + sourceStatus[i].getPermission() + "  内容: ");
            FSDataInputStream fsdis = fsSource.open(sourceStatus[i].getPath());
            byte[] data = new byte[1024];
            int read = -1;
            PrintStream ps = new PrintStream(System.out);
            while((read = fsdis.read(data)) > 0){
                ps.write(data, 0, read);
                fsdos.write(data, 0, read);
            }
        }
        fsdos.close();
    }
    public static void main(String args[]) throws IOException{
        FilterMergeFile merge = new FilterMergeFile("/user/wangyang/input", "/user/wangyang/input/Merge.txt");
        merge.doMerge();
    }
}
class myPathFilter implements PathFilter{  //过滤掉文件名满足特定条件的文件
    String reg = null;
    myPathFilter(String reg){
        this.reg = reg;
    }
    public boolean accept(Path path) {
        if(!(path.toString().matches(reg)))
            return true;
        return false;
    }
}


转载:https://blog.csdn.net/qq_45213986/article/details/105861855
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场