伪分布模式下使用java接口,访问hdfs

 1 package com.bq.pro;
 2 
 3 import java.io.IOException;
 4 
 5 import org.apache.hadoop.conf.Configuration;
 6 import org.apache.hadoop.fs.FSDataInputStream;
 7 import org.apache.hadoop.fs.FSDataOutputStream;
 8 import org.apache.hadoop.fs.FileSystem;
 9 import org.apache.hadoop.fs.FileStatus;
10 import org.apache.hadoop.fs.Path;
11 import org.apache.hadoop.util.GenericOptionsParser;
12 import org.apache.hadoop.util.Progressable;
13 
14 public class MergeFiles {
15     @SuppressWarnings("deprecation")
16     public static void main(String[] args) throws IOException {
17         Configuration conf=new Configuration();
18         FileSystem localInputFs=FileSystem.getLocal(conf);
19         //FileSystem hdfsOutputFs=FileSystem.get(conf);
20         String[] otherargs=new GenericOptionsParser(conf,args).getRemainingArgs();
21         Path inputLocalDir=new Path(otherargs[0]);
22         Path hdfsPathOutput=new Path(otherargs[1]);
23         FileSystem hdfsOutputFs=hdfsPathOutput.getFileSystem(conf);
24         try {
25             FileStatus[] inputFiles=localInputFs.listStatus(inputLocalDir);
26             FSDataOutputStream out=hdfsOutputFs.create(hdfsPathOutput,new Progressable() {
27                 
28                 @Override
29                 public void progress() {
30                     System.out.print(".");
31                     
32                 }
33             });
34             for(int i=0;i<inputFiles.length;i++)
35             {
36                 FSDataInputStream in=localInputFs.open(inputFiles[i].getPath());
37                 byte[] buffer=new byte[100];
38                 int byteRead=0;
39                 while((byteRead=in.read(buffer))>0)
40                 {
41                     out.write(buffer, 0, byteRead);
42                     
43                 }
44                 in.close();
45             }
46             out.close();
47             //localInputFs.delete(inputLocalDir);
48             
49             
50         } catch (IOException e) {
51             // TODO: handle exception
52             e.printStackTrace();
53         }
54     }
55 
56 }

在参数中输入:/home/haduser/workspace/MergeCDFiles/resources/    hdfs://localhost:9000/opt/hadoop/tmp/test/a.txt

伪分布模式下用FileSystem.get(conf) 获取hdfs会出现:

Exception in thread "main" java.lang.IllegalArgumentException: Wrong FS: hdfs://localhost:9000/opt/hadoop/tmp/test, expected: file:///

    at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:390)
    at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:55)
    at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:340)
    at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:492)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:377)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:364)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:564)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:464)
    at com.bq.pro.MergeFiles.main(MergeFiles.java:26)

使用FileSystem hdfsOutputFs=hdfsPathOutput.getFileSystem(conf);来代替的时候就不会用问题,具体原因不详,应该是伪分布下的问题,在集群里应该不会有问题。




原文地址:https://www.cnblogs.com/wzyj/p/3437059.html