如何使用Java API读写HDFS

如题所述

package com.wyc.hadoop.fs;
 
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.Date;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;
 
public class FSOptr {
 
    /**
     * @param args
     */
    public static void main(String[] args) throws Exception {
        // TODO Auto-generated method stub
        Configuration conf = new Configuration();
        makeDir(conf);
        rename(conf);
        delete(conf);
 
    }
 
    // åˆ›å»ºæ–‡ä»¶ç›®å½•
    private static void makeDir(Configuration conf) throws Exception {
        FileSystem fs = FileSystem.get(conf);
        Path dir = new Path("/user/hadoop/data/20140318");
        boolean result = fs.mkdirs(dir);// åˆ›å»ºæ–‡ä»¶å¤¹
        System.out.println("make dir :" + result);
 
        // åˆ›å»ºæ–‡ä»¶ï¼Œå¹¶å†™å…¥å†…容
        Path dst = new Path("/user/hadoop/data/20140318/tmp");
        byte[] buff = "hello,hadoop!".getBytes();
        FSDataOutputStream outputStream = fs.create(dst);
        outputStream.write(buff, 0, buff.length);
        outputStream.close();
        FileStatus files[] = fs.listStatus(dst);
        for (FileStatus file : files) {
            System.out.println(file.getPath());
        }
        fs.close();
    }
 
    // é‡å‘½åæ–‡ä»¶
    private static void rename(Configuration conf) throws Exception {
 
        FileSystem fs = FileSystem.get(conf);
        Path oldName = new Path("/user/hadoop/data/20140318/1.txt");
        Path newName = new Path("/user/hadoop/data/20140318/2.txt");
        fs.rename(oldName, newName);
 
        FileStatus files[] = fs.listStatus(new Path(
                "/user/hadoop/data/20140318"));
        for (FileStatus file : files) {
            System.out.println(file.getPath());
        }
        fs.close();
    }
 
    // åˆ é™¤æ–‡ä»¶
    @SuppressWarnings("deprecation")
    private static void delete(Configuration conf) throws Exception {
        FileSystem fs = FileSystem.get(conf);
        Path path = new Path("/user/hadoop/data/20140318");
        if (fs.isDirectory(path)) {
            FileStatus files[] = fs.listStatus(path);
            for (FileStatus file : files) {
                fs.delete(file.getPath());
            }
        } else {
            fs.delete(path);
        }
 
        // æˆ–者
        fs.delete(path, true);
 
        fs.close();
    }
 
    /**
     * ä¸‹è½½,将hdfs文件下载到本地磁盘
     * 
     * @param localSrc1
     *            æœ¬åœ°çš„文件地址,即文件的路径
     * @param hdfsSrc1
     *            å­˜æ”¾åœ¨hdfs的文件地址
     */
    public boolean sendFromHdfs(String hdfsSrc1, String localSrc1) {
 
        Configuration conf = new Configuration();
        FileSystem fs = null;
        try {
            fs = FileSystem.get(URI.create(hdfsSrc1), conf);
            Path hdfs_path = new Path(hdfsSrc1);
            Path local_path = new Path(localSrc1);
 
            fs.copyToLocalFile(hdfs_path, local_path);
 
            return true;
        } catch (IOException e) {
            e.printStackTrace();
        }
        return false;
    }
 
    /**
     * ä¸Šä¼ ï¼Œå°†æœ¬åœ°æ–‡ä»¶copy到hdfs系统中
     * 
     * @param localSrc
     *            æœ¬åœ°çš„文件地址,即文件的路径
     * @param hdfsSrc
     *            å­˜æ”¾åœ¨hdfs的文件地址
     */
    public boolean sendToHdfs1(String localSrc, String hdfsSrc) {
        InputStream in;
        try {
            in = new BufferedInputStream(new FileInputStream(localSrc));
            Configuration conf = new Configuration();// å¾—到配置对象
            FileSystem fs; // æ–‡ä»¶ç³»ç»Ÿ
            try {
                fs = FileSystem.get(URI.create(hdfsSrc), conf);
                // è¾“出流,创建一个输出流
                OutputStream out = fs.create(new Path(hdfsSrc),
                        new Progressable() {
                            // é‡å†™progress方法
                            public void progress() {
                                // System.out.println("上传完一个设定缓存区大小容量的文件!");
                            }
                        });
                // è¿žæŽ¥ä¸¤ä¸ªæµï¼Œå½¢æˆé€šé“,使输入流向输出流传输数据,
                IOUtils.copyBytes(in, out, 10240, true); // in为输入流对象,out为输出流对象,4096为缓冲区大小,true为上传后关闭流
                return true;
            } catch (IOException e) {
                e.printStackTrace();
            }
 
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }
        return false;
    }
 
    /**
     * ç§»åŠ¨
     * 
     * @param old_st原来存放的路径
     * @param new_st移动到的路径
     */
    public boolean moveFileName(String old_st, String new_st) {
 
        try {
 
            // ä¸‹è½½åˆ°æœåŠ¡å™¨æœ¬åœ°
            boolean down_flag = sendFromHdfs(old_st, "/home/hadoop/文档/temp");
            Configuration conf = new Configuration();
            FileSystem fs = null;
 
            // åˆ é™¤æºæ–‡ä»¶
            try {
                fs = FileSystem.get(URI.create(old_st), conf);
                Path hdfs_path = new Path(old_st);
                fs.delete(hdfs_path);
            } catch (IOException e) {
                e.printStackTrace();
            }
 
            // ä»ŽæœåŠ¡å™¨æœ¬åœ°ä¼ åˆ°æ–°è·¯å¾„
            new_st = new_st + old_st.substring(old_st.lastIndexOf("/"));
            boolean uplod_flag = sendToHdfs1("/home/hadoop/文档/temp", new_st);
 
            if (down_flag && uplod_flag) {
                return true;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }
 
    // copy本地文件到hdfs
    private static void CopyFromLocalFile(Configuration conf) throws Exception {
        FileSystem fs = FileSystem.get(conf);
        Path src = new Path("/home/hadoop/word.txt");
        Path dst = new Path("/user/hadoop/data/");
        fs.copyFromLocalFile(src, dst);
        fs.close();
    }
 
    // èŽ·å–给定目录下的所有子目录以及子文件
    private static void getAllChildFile(Configuration conf) throws Exception {
        FileSystem fs = FileSystem.get(conf);
        Path path = new Path("/user/hadoop");
        getFile(path, fs);
    }
 
    private static void getFile(Path path, FileSystem fs)throws Exception {
        FileStatus[] fileStatus = fs.listStatus(path);
        for (int i = 0; i < fileStatus.length; i++) {
            if (fileStatus[i].isDir()) {
                Path p = new Path(fileStatus[i].getPath().toString());
                getFile(p, fs);
            } else {
                System.out.println(fileStatus[i].getPath().toString());
            }
        }
    }
     
     
    //判断文件是否存在
    private static boolean isExist(Configuration conf,String path)throws Exception{
        FileSystem fileSystem = FileSystem.get(conf);
        return fileSystem.exists(new Path(path));
    }
     
    //获取hdfs集群所有主机结点数据
    private static void getAllClusterNodeInfo(Configuration conf)throws Exception{
        FileSystem fs = FileSystem.get(conf);
        DistributedFileSystem hdfs = (DistributedFileSystem)fs;
        DatanodeInfo[] dataNodeStats = hdfs.getDataNodeStats();
        String[] names = new String[dataNodeStats.length];
        System.out.println("list of all the nodes in HDFS cluster:"); //print info
 
        for(int i=0; i < dataNodeStats.length; i++){
            names[i] = dataNodeStats[i].getHostName();
            System.out.println(names[i]); //print info
 
        }
    }
     
    //get the locations of a file in HDFS
    private static void getFileLocation(Configuration conf)throws Exception{
        FileSystem fs = FileSystem.get(conf);
        Path f = new Path("/user/cluster/dfs.txt");
        FileStatus filestatus = fs.getFileStatus(f);
        BlockLocation[] blkLocations = fs.getFileBlockLocations(filestatus,0,filestatus.getLen());
        int blkCount = blkLocations.length;
        for(int i=0; i < blkCount; i++){
            String[] hosts = blkLocations[i].getHosts();
            //Do sth with the block hosts
 
            System.out.println(hosts);
        }
    }
     
     //get HDFS file last modification time
    private static void getModificationTime(Configuration conf)throws Exception{
        FileSystem fs = FileSystem.get(conf);
        Path f = new Path("/user/cluster/dfs.txt");
        FileStatus filestatus = fs.getFileStatus(f);
         
        long modificationTime = filestatus.getModificationTime(); // measured in milliseconds since the epoch
 
        Date d = new Date(modificationTime);
        System.out.println(d);
    }
     
}
温馨提示:内容为网友见解,仅供参考
无其他回答

使用Java API操作HDFS时,_方法用于获取文件列表?
当使用 Java API 操作 HDFS 时,可以使用 FileSystem.listFiles() 方法来获取文件列表。该方法接受一个 Path 对象,表示要列举文件的目录,并返回一个 RemoteIterator<LocatedFileStatus> 对象,该对象可用于迭代目录中的文件。例如,下面的代码演示了如何使用 listFiles() 方法来获取 HDFS 上的文件列表:\/...

Hadoop--HDFS的API环境搭建、在IDEA里对HDFS简单操作
在Windows系统中,首先安装Hadoop。安装完成后,可以利用Maven将其与Hadoop集成,便于管理和操作。在项目的resources目录中,创建一个名为"log4j.properties"的配置文件,以配置日志相关设置。接着,在Java项目中,创建一个名为"hdfs"的包,然后在其中创建一个类。这个类将用于执行对HDFS的基本操作,例如创建...

java api 连接HDFS出现报错
050 public static void renameFileOrDirectoryOnHDFS()throws Exception{ 051 052 FileSystem fs=FileSystem.get(conf);053 Path p1 =new Path("hdfs:\/\/10.2.143.5:9090\/root\/myfile\/my.txt");054 Path p2 =new Path("hdfs:\/\/10.2.143.5:9090\/root\/myfile\/my2.txt");055...

sqoop2 1.99.7 java api 如何设置file format
\/\/流读入和写入InputStreamin=null;\/\/获取HDFS的conf\/\/读取HDFS上的文件系统FileSystemhdfs=FileSystem.get(conf);\/\/使用缓冲流,进行按行读取的功能BufferedReaderbuff=null;\/\/获取日志文件的根目录Pathlistf=newPath("hdfs:\/\/10

HDFS 系统架构
应用访问HDFS有很多方式。原生的,HDFS 提供了 FileSystem Java API 来给应用调用。还提供了 C language wrapper for this Java API 和 REST API 。另外,还支持HTTP浏览器查看HDFS实例的文件。 通过使用 NFS gateway ,HDFS还可以挂载到客户端作为本地文件系统的一部分。 HDFS的用户数据是以文件和目录的形式组织的...

Hadoop读写文件时内部工作机制是怎样的
例如,使用java API写一个本地文件,我们可以保证在调用flush()和同步化后可以看到已写入的内容: FileOutputStream out = new FileOutputStream(localFile); out.write("content".getBytes("UTF-8")); out.flush(); \/\/ flush to operating system out.getFD().sync(); \/\/ sync to disk (getFD()返回与该...

如何使用Hadoop读写数据库
如何直接使用Hadoop1.2.0的MR来读写操作数据库,hadoop的API提供了DBOutputFormat和 DBInputFormat这两个类,来进行与数据库交互,除此之外,我们还需要定义一个类似JAVA Bean的实体类,来与数据库的每行记录进行对应,通常这个类要实现Writable和DBWritable接口,来重写里面的4个方法以对应获取每行记 ...

hadoop c api里面有支持读写一行的函数吗
可以查阅手册。FSDataOutputStream不允许在文件中定位(而FSDataInputStream可以),这是因为hadoop只允许在一个已打开文件顺序写入或在文件尾追加数据,不允许在结尾之外其他文件写入数据。在使用append的操作时可能返回异常dfs.support.append未设置为true,只要才hdfs-site.xml中把该属性设置为true ...

体系里表明与远程支持场所关系和接口的文件有哪些
(1)接口 hadoop是使用Java编写的。而Hadoop中不同文件系统之间的交互是由Java API进行调节的。事实上,前面使用的文件系统的shell就是一个java应用,它使用java文件系统来提供文件系统操作。即使其他文件系统比如FTP、S3都有自己的访问工具,这些接口在HDFS中还是广泛使用,主要用来进行hadoop文件系统之间的...

hadoop学习需要java的哪些基础知识
熟悉对大数据的 分析\/使用 方法(spark\/map-reduce技术,都有scala 和 java两种api)。因此,学习hadoop必须掌握scala或者java中的一门开发语言,然后在学习hadoop数据操作命令,api,spark\/map-reduce分析技术。另外,还可以学习hbase这种基于hdfs的结构化大数据存储技术,和flume大数据采集技术。

相似回答