917每日博客

今天将昨天课上做的练习的代码完善一下:

新建一个类用来封装操作Hadoop的方法:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.awt.*;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class HDFSMethod {
private static FileSystem fs;
private static void init() throws URISyntaxException, IOException, InterruptedException{
Configuration configuration = new Configuration();
configuration.set("dfs.replication","1");
// FileSystem fs = FileSystem.get(new
URI uri=new URI("hdfs://hadoop102:8020");
String user="wang";
fs = FileSystem.get(uri, configuration,user);
}
private static void close() throws IOException {
// 3 关闭资源
fs.close();
}
public void mkdir() throws IOException, URISyntaxException, InterruptedException {
init();
fs.mkdirs(new Path("/deleted/"));
close();
}
public void addtext()throws IOException, URISyntaxException, InterruptedException{
init();
Path path = new Path("/shuguo1.txt");
//通过fs的append方法实现对文件的追加操作
FSDataOutputStream fos = fs.append(path);

fos.write("\n新输入".getBytes());

fos.close();
close();
}
public void insertext()throws IOException, URISyntaxException, InterruptedException{
init();
Path path = new Path("/shuguo1.txt");
//通过fs的append方法实现对文件的追加操作
FSDataOutputStream fos = fs.create(path,true);
fos.write("新输入111\n新输入222".getBytes());
fos.close();
close();
}
public void readtext()throws IOException, URISyntaxException, InterruptedException{
init();
Path path = new Path("/input/word.txt");
FSDataInputStream fis = fs.open(path);
int len = 0;
byte[] buf = new byte[4096];
while ((len = fis.read(buf)) != -1){
System.out.println(new String(buf, 0, len));
}
close();
}


public void HDFSInsert(String FPath,String oneitem)throws IOException, URISyntaxException, InterruptedException{
init();
Path path = new Path(FPath);
FSDataOutputStream fos = fs.append(path);
fos.write((oneitem+"\n").getBytes());
fos.close();
close();
}
//删除的方法在参数中在加入一个List的Bean类,然后循环执行写入的语句,遇到要删除的索引就不写入完成删除,把每一个Bean类都写一个对应的删除方法
//在参数中在加入一个Bean类用来存储数据减少读取次数
//在参数中在加入一个Bean类用来存储数据减少读取次数
//在参数中在加入一个Bean类用来存储数据减少读取次数
public void HDFSDelete(String FPath, int num)throws IOException, URISyntaxException, InterruptedException{
init();
Path path = new Path(FPath);
FSDataOutputStream fos = fs.create(path,true);

fos.write("待输入\n".getBytes());


fos.close();
close();
}
//修改的方法要在参数中再加入一个Bean类,然后循环执行写入的语句,遇到要修改的索引就是用第三个参数代表修改后的语句的,把每一个Bean类都写一个对应的修改方法
//在参数中在加入一个Bean类用来存储数据减少读取次数
//在参数中在加入一个Bean类用来存储数据减少读取次数
//在参数中在加入一个Bean类用来存储数据减少读取次数
public void HDFSChange(String FPath,int num,String ChangeStr)throws IOException, URISyntaxException, InterruptedException{
init();
Path path = new Path(FPath);


FSDataOutputStream fos = fs.create(path,true);
fos.write("待输入\n".getBytes());

fos.close();
close();
}
//查找所有的方法方法类型转换成Bean类,用来返回不同的读取,把每一个Bean类都写一个与之对应的查找所有的方法
public void HDFSFindAll(String FPath)throws IOException, URISyntaxException, InterruptedException{
init();
Path path = new Path(FPath);
FSDataInputStream fis = fs.open(path);
int len = 0;
byte[] buf = new byte[4096];
while ((len = fis.read(buf)) != -1){
System.out.println(new String(buf, 0, len));
}
fis.close();
close();
}
//查找某一条的方法根据需要在读取到每一行的时候进行判断,如果符合要求就将其加入到Bean的list中,把每一个精确查询的要求都写一个方法
public void HDFSSelect(String FPath,String need)throws IOException, URISyntaxException, InterruptedException{
init();
Path path = new Path(FPath);
FSDataInputStream fis = fs.open(path);
int len = 0;
byte[] buf = new byte[4096];
while ((len = fis.read(buf)) != -1){
System.out.println(new String(buf, 0, len));
}
fis.close();
close();
}
/*
init();
close();
*/


public void HDFSmkdir()throws IOException, URISyntaxException, InterruptedException{
init();
fs.mkdirs(new Path("/xiyou/huaguoshan2"));
close();
}


public void HDFSUpFile()throws IOException, URISyntaxException, InterruptedException{
init();
//参数1:是否删除本地的 参数二:是否覆盖已有同名的 参数三:原文件路径 参数四:目的路径
fs.copyFromLocalFile(false,true,new Path("D:\\hello.txt"),new Path("/input"));
close();
}

public void HDFSDownLoadFile()throws IOException, URISyntaxException, InterruptedException{
init();
//参数一:原文件是否删除 参数二:HDFS文件的路径 参数三:目标地址路径 参数四:是否开启校验
fs.copyToLocalFile(false,new Path("/input/hello.txt"),new Path("D:\\HDFSDownLoad"),true);
close();
}

public void HDFSDelete()throws IOException, URISyntaxException, InterruptedException{
init();
//删除文件
//参数一:HDFS文件的路径 参数二:是否递归删除
fs.delete(new Path("/input/hello.txt"),false);
close();
//删除空目录
fs.delete(new Path(""),false);
//删除非空目录
fs.delete(new Path(""),true);
}

//文件更名和移动
public void HDFSRenamMove()throws IOException, URISyntaxException, InterruptedException{
init();
//参数一: 源文件路径 参数二:目标地址路径
fs.rename(new Path("/input/hello.txt"),new Path("/input/hello1.txt"));

//移动并且更名
fs.rename(new Path("/input/hello.txt"),new Path("/hello1.txt"));



close();
}

//传入路径进行文件夹创建
public void hdfsMkdir(String MyPath)throws IOException, URISyntaxException, InterruptedException{
init();
fs.mkdirs(new Path(MyPath));
close();
}

//传入参数路径上传文件
public void hdfsUpFile(String LoaclPath,String UpPath)throws IOException, URISyntaxException, InterruptedException{
init();
//参数1:是否删除本地的 参数二:是否覆盖已有同名的 参数三:原文件路径 参数四:目的路径
fs.copyFromLocalFile(false,true,new Path(LoaclPath),new Path(UpPath));
close();
}

//传入参数删除文件
public void hdfsDeleteFile(String HPath)throws IOException, URISyntaxException, InterruptedException{
init();
fs.delete(new Path(HPath),false);
close();
}

//下载文件到本地
public void hdfsDownLoadFile(String HPath,String LocalPath)throws IOException, URISyntaxException, InterruptedException{
init();
//参数一:原文件是否删除 参数二:HDFS文件的路径 参数三:目标地址路径 参数四:是否开启校验
fs.copyToLocalFile(false,new Path(HPath),new Path(LocalPath),true);
close();
}

//读取文件并打印
public void hdfsShow(String HPath)throws IOException, URISyntaxException, InterruptedException{
init();
Path path = new Path(HPath);
FSDataInputStream fis = fs.open(path);
int len = 0;
byte[] buf = new byte[4096];
while ((len = fis.read(buf)) != -1){
System.out.println(new String(buf, 0, len));
}
fis.close();
close();
}
}
然后创建一个Main客户端使用入口:
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Scanner;

public class Main {

public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException {
HDFSMethod hdfs = new HDFSMethod();
System.out.println("选择要进行的操作:");
System.out.println("1:创建文件夹");
System.out.println("2:上传本地文件");
System.out.println("3:删除文件");
System.out.println("4:下载到本地");
System.out.println("5读取某一文件内容");
System.out.println("6:退出");
Scanner sc = new Scanner(System.in);
int input = sc.nextInt();
int stop = 1;
while (stop != 0) {
switch (input) {
case 1: {
System.out.println("输入新建文件夹的地址");
String MyPath = sc.next();
hdfs.hdfsMkdir(MyPath);
}
break;
case 2: {
System.out.println("输入本地文件地址");
String LocalPath = sc.next();
System.out.println("输入上传到的地址");
String HPath = sc.next();
hdfs.hdfsUpFile(LocalPath, HPath);
}
break;
case 3: {
System.out.println("输入删除的文件地址");
String HPath = sc.next();
hdfs.hdfsDeleteFile(HPath);
}
break;
case 4: {
System.out.println("输入HDFS上文件的地址");
String HPath = sc.next();
System.out.println("输入下载到的地址");
String LocalPath = sc.next();
hdfs.hdfsDownLoadFile(HPath, LocalPath);
}
break;
case 5: {
System.out.println("输入读取的文件地址");
String HPath = sc.next();
hdfs.hdfsShow(HPath);
}
break;
case 6:{
stop=0;
break;
}
}
System.out.println("选择接下来的操作");
input=sc.nextInt();
}
}
}
系统即可根据用户的选择完成相应的操作。

原文地址:https://www.cnblogs.com/ruangongwangxiansheng/p/14161598.html