通过FileSystem API读取数据

参照前面提到的通过Hadoop URL读取数据,有时根本不可能在应用中设置URLStreamHandlerFactory实例,在这种情况下,需要使用FileSystem API来打开一个文件的输入流

FIleSystem是一个通用文件系统的API,所以第一步是检索我们需要的文件系统的实例,这里是HDFS,获取FileSystem实例有两种方法:

public static FileSystem get(Configuration conf) throws IOException
public static FileSystem get(URI uri, Configuration conf) throws IOException

Configuration对象封装了客户端或者服务器的配置,通过设置配置文件读取路径来实现,如conf/core-site.xml,第一个方法返回的是默认文件系统(在conf/core-site.xml中指定的,没指定则默认的)。第二个方法通过给定的URI方案和权限来确定要使用的文件系统,如果给定URI中没有指定方案,则返回默认文件系统。有了FileSystem实例后,调用open()函数来获取文件的输入流:

public FSDataInputStream open(Path f) throws IOException
public abstract FSDataInputStream open(Path f, int bufferSize) throws IOException

使用FileSystem以标准输出格式显示Hadoop文件系统中的文件

 

程序如下:

package com.lcy.hadoop.file;

import java.io.InputStream;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

public class FileSystemCat {

    public static void main(String[] args) throws Exception{
        // TODO Auto-generated method stub
        String uri=args[0];
        Configuration conf=new Configuration();
        FileSystem fs=FileSystem.get(URI.create(uri),conf);
        InputStream in=null;
        try{
            in=fs.open(new Path(uri));
            IOUtils.copyBytes(in, System.out, 4096,false);
        }finally{
            IOUtils.closeStream(in);
        }
    }
}

 

运行示例:

 

 

实际上FileSystem对象中的open()方法返回的是FSDataInputStream对象,而不是标准的java.io类对象,这个类支持随机访问

 

使用seek()方法,将Hadoop文件系统中的一个文件在标准输出上显示2次

 

程序如下:

package com.lcy.hadoop.file;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

public class FileSystemDoubleCat {

    public static void main(String[] args) throws Exception{
        // TODO Auto-generated method stub
        String uri=args[0];
        Configuration conf=new Configuration();
        FileSystem fs=FileSystem.get(URI.create(uri),conf);
        FSDataInputStream in=null;
        try{
            in=fs.open(new Path(uri));
            IOUtils.copyBytes(in, System.out,4096,false);
            in.seek(0);
            IOUtils.copyBytes(in, System.out, 4096,false);
        }finally{
            IOUtils.closeStream(in);
        }
    }
}

 

运行示例:

原文地址:https://www.cnblogs.com/Murcielago/p/4318492.html