HDFS基本操作

HDFS基本操作

一、hadoop搭建     

1、修改主机名     

2、ip等     

3、主机映射     

4、关闭防火墙(两种)     

5、ssh免密登录(免密脚本)     

6、安装jdk 配置环境变量     

7、安装hadoop         

配置文件的修改         

  hadoop-env.sh    JAVA_HOME         

  core-site.xml    客户端参数    namenode在哪里         

  hdfs-site.xml    namenode和datanode存放的目录         

  mapred-site.xml    经过重命名之后才得到的文件,提交任务到哪里         

  yarn-site.xml    配置resourcemanager在哪里,资源的多少         

  HADOOP环境变量     

8、安装程序分发集群         

  hosts文件         

  jdk安装文件    /etc.profile         

  hadoop安装文件     

9、namenode进行格式化         

  hadoop namenode -format     

10、启动测试         

  start-dfs.sh         

  hadoop-deams.sh    单独启动单台机器的进程         

  start-yarn        是在resourcemaneger启动的

二、shell命令操作hdfs

     启动hadoop :start-dfs.sh

常用命令参数介绍     

  Shell客户端启动  

  Hadoop fs                 hdfs dfs

-help                      

  功能:输出这个命令参数手册     

-ls                           

  功能:显示目录信息         

  示例: hadoop fs -ls hdfs://hadoop-server01:9000/         

  备注:这些参数中,所有的hdfs路径都可以简写         

  -->hadoop fs -ls /   等同于上一条命令的效果     

-mkdir                       

  功能:在hdfs上创建目录         

  示例:hadoop fs  -mkdir  -p  /aaa/bbb/cc/dd     

-moveFromLocal                     

  功能:从本地剪切粘贴到hdfs         

  示例:hadoop  fs  - moveFromLocal  /home/hadoop/a.txt  /aaa/bbb/cc/dd     

-moveToLocal                       

  功能:从hdfs剪切粘贴到本地         

  示例:hadoop  fs  - moveToLocal   /aaa/bbb/cc/dd  /home/hadoop/a.txt      

--appendToFile           

  功能:追加一个文件到已经存在的文件末尾         

  示例:hadoop  fs  -appendToFile  ./hello.txt  hdfs://hadoop-server01:9000/hello.txt         

  可以简写为:         Hadoop  fs  -appendToFile  ./hello.txt  /hello.txt

-cat           

  功能:显示文件内容           

  示例:hadoop fs -cat  /hello.txt

-tail                          

  功能:显示一个文件的末尾         

  示例:hadoop  fs  -tail  /weblog/access_log.1     

-text                           

  功能:以字符形式打印一个文件的内容         

  示例:hadoop  fs  -text  /weblog/access_log.1     

-chgrp      

-chmod     

-chown         

  功能:linux文件系统中的用法一样,对文件所属权限         

  示例:         hadoop  fs  -chmod  666  /hello.txt         hadoop  fs  -chown  someuser:somegrp   /hello.txt     

-copyFromLocal             

  功能:从本地文件系统中拷贝文件到hdfs路径去         

  示例:hadoop  fs  -copyFromLocal  ./jdk.tar.gz  /aaa/     

-copyToLocal               

  功能:从hdfs拷贝到本地         

  示例:hadoop fs -copyToLocal /aaa/jdk.tar.gz     

-cp                       

  功能:从hdfs的一个路径拷贝hdfs的另一个路径         

  示例: hadoop  fs  -cp  /aaa/jdk.tar.gz  /bbb/jdk.tar.gz.2

-mv                              

  功能:在hdfs目录中移动文件         

  示例: hadoop  fs  -mv  /aaa/jdk.tar.gz  /     

-get                       

  功能:等同于copyToLocal,就是从hdfs下载文件到本地         

  示例:hadoop fs -get  /aaa/jdk.tar.gz     

-getmerge                      

  功能:合并下载多个文件         

  示例:比如hdfs的目录 /aaa/下有多个文件:log.1, log.2,log.3,...     hadoop fs -getmerge /aaa/log.* ./log.sum     

-put                         

  功能:等同于copyFromLocal         

  示例:hadoop  fs  -put  /aaa/jdk.tar.gz  /bbb/jdk.tar.gz.2

-rm                         

  功能:删除文件或文件夹         

  示例:hadoop fs -rm -r /aaa/bbb/

-rmdir                          

  功能:删除空目录         

  示例:hadoop  fs  -rmdir   /aaa/bbb/ccc     

-df                        

  功能:统计文件系统的可用空间信息         

  示例:hadoop  fs  -df  -h  /

-du          

  功能:统计文件夹的大小信息         

  示例:         hadoop  fs  -du  -s  -h /aaa/*

-count                  

  功能:统计一个指定目录下的文件节点数量         

  示例:hadoop fs -count /aaa/

-setrep                         

  功能:设置hdfs中文件的副本数量     

  示例:hadoop fs -setrep 3 /aaa/jdk.tar.gz

三、 java操作hdfs(Maven)

1.pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>cn.pengpeng</groupId>
	<artifactId>bigdata36-hadoop</artifactId>
	<version>0.0.1-SNAPSHOT</version>

	<dependencies>
		
		<!-- https://mvnrepository.com/artifact/junit/junit -->
		<dependency>
		    <groupId>junit</groupId>
		    <artifactId>junit</artifactId>
		    <version>4.12</version>
		    <scope>test</scope>
		</dependency>
		
		<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-client</artifactId>
			<version>2.8.3</version>
		</dependency>
		<!-- https://mvnrepository.com/artifact/log4j/log4j -->
		<dependency>
		    <groupId>log4j</groupId>
		    <artifactId>log4j</artifactId>
		    <version>1.2.17</version>
		</dependency>

	</dependencies>
	
	<!-- maven打包插件jdk的版本  -->
	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.1</version>
				<configuration>
					<source>1.8</source>
					<target>1.8</target>
				</configuration>
			</plugin>
		</plugins>
	</build>


</project>

2.简单的增删改查操作

package hdfs.test;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import javax.annotation.WillClose;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class hdfsDemo {
	FileSystem fs =null;
	@Before
	public void init() throws Exception{
		//连接hdfs
		Configuration conf = new Configuration();
		fs = FileSystem.get(new URI("hdfs://hadoop01:9000"),conf,"root");
		
	}
	/**
	 * 将本地文件上传到hdfs
	 * @throws IOException 
	 * @throws IllegalArgumentException 
	 */
	@Test
	public void testUpLoad() throws Exception{
		fs.copyFromLocalFile(new Path("D:\data\http.log"), new Path("/"));
	}
	
	/**
	 * 从hdfs下载文件
	 * @throws Exception 
	 * @throws IllegalArgumentException 
	 */
	@Test
	public void testDownLoad() throws  Exception{
		fs.copyToLocalFile(new Path("/http.log"), new Path("d:/http"));
	}

	/**
	 * 删除hdfs文件
	 * @throws Exception 
	 * @throws IllegalArgumentException 
	 */
	@Test
	public  void TestDel() throws Exception{
		fs.delete(new Path("/http.log"),true);
	}
	
	
	/**
	 * 创建文件夹
	 * @throws IOException 
	 * @throws IllegalArgumentException 
	 * @throws Exception 
	 */
	@Test
	public void testMkdir() throws IllegalArgumentException, IOException{
		fs.mkdirs(new Path("/files"));
	}
	/**
	 * 改名字和移动文件
	 * @throws IOException 
	 * @throws IllegalArgumentException 
	 * @throws Exception
	 */
	@Test
	public void rename() throws IllegalArgumentException, IOException{
		//如果文件夹不存在,移动不会成功,也不会报错。
		//fs.rename(new Path("/test.sh"), new Path("/test2.sh"));
		fs.rename(new Path("/test2.sh"), new Path("/files/test.sh"));
	}
	/**
	 * 查看文件状态
	 * @throws IOException 
	 * @throws IllegalArgumentException 
	 * @throws Exception
	 */
	@Test
	public void testStatus() throws IllegalArgumentException, IOException{
		/*FileStatus status = fs.getFileStatus(new Path("/files"));
		System.out.println(status);*/
		
		RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
		while(listFiles.hasNext()){
			LocatedFileStatus file = listFiles.next();
			System.out.println(file.getLen());
			System.out.println(file.getBlockSize());
			System.out.println(file.getPath());
			System.out.println(file.getAccessTime());	//最后修改时间 
			System.out.println(file.getReplication());	//副本
			System.out.println("******************");
			//输出每个块的存储位置
			BlockLocation[] blockLocations = file.getBlockLocations();
			for (BlockLocation blockLocation : blockLocations) {
				System.out.println(blockLocation);
			}
			System.out.println("++++++++++++++++++++++++");
		}
	}
	
	/**
	 * 遍历文件夹
	 * @throws IOException 
	 * @throws IllegalArgumentException 
	 * @throws FileNotFoundException 
	 * @throws Exception
	 */
	@Test
	public void listStatus() throws FileNotFoundException, IllegalArgumentException, IOException{
		FileStatus[] fileStatus = fs.listStatus(new Path("/"));
		for (FileStatus fileStatus2 : fileStatus) {
			if(fileStatus2.isDirectory())
				System.out.println("文件夹");
			if(fileStatus2.isFile())
				System.out.println("文件");
		}
		
		
		
	}
	@After
	public void close() throws Exception{
		fs.close();
	}
	
	
	
	
}

3.简单的流输入与输出

package hdfs.test;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class StreamTest {
	FileSystem fs =null;
	@Before
	public void init() throws IOException{
		System.setProperty("HADOOP_USER_NAME", "root");
		Configuration conf = new Configuration();
		conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
		fs = FileSystem.get(conf);
	}
	/**
	 * 向文件中插入数据
	 * @throws IOException 
	 * @throws IllegalArgumentException 
	 */
	@Test
	public void writeFile() throws IllegalArgumentException, IOException{
		FSDataOutputStream outputStream = fs.create(new Path("/writeFile.txt"));
		outputStream.writeDouble(3.1415);
		outputStream.writeUTF("嘿嘿");
		outputStream.write("哈哈".getBytes());
		outputStream.close();
		
	}
	
	/**
	 * 读取文件数据
	 * @throws IllegalArgumentException 
	 * @throws IOException
	 */
	@Test
	public void readFile() throws IllegalArgumentException, IOException{
		FSDataInputStream inputStream = fs.open(new Path("/writeFile.txt"));
		double readDouble = inputStream.readDouble();
		String string = inputStream.readUTF();
		byte[] b = new byte[6];
		int len = 0;
		while((len = inputStream.read(b))!=-1){
			System.out.println(new String(b));
		}
		System.out.println(readDouble);
		
		System.out.println(string);
		
		inputStream.close();
	}
	
	
	@After
	public void after() throws IOException{
		fs.close();
	}
}

4.通过流操作将本地文件上传到hdfs,并下载。

package hdfs.test;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Before;
import org.junit.Test;

public class homework {
	
	FileSystem fs =null;
	@Before
	public void before() throws IOException, InterruptedException, URISyntaxException{
		Configuration conf = new Configuration();
		fs = FileSystem.get(new URI("hdfs://hadoop01:9000"),conf,"root");
	}
	/**
	 * �������ļ�������hdfs
	 * @throws Exception
	 */
	
	@Test
	public void UpLoad() throws Exception{
		FSDataOutputStream stream = fs.create(new Path("/.txt"));
		FileReader fileReader = new FileReader(new File("D:\data\好友.txt"));
		int len = 0;
		while((len=fileReader.read())!=-1){
			stream.write(len);
		}
		stream.close();
		fileReader.close();
	}
	/**
	 * ������好友
	 * @throws Exception
	 */
	@Test
	public void downlod() throws Exception{
		//打开
		FSDataInputStream stream = fs.open(new Path("/好友.txt"));
		FileWriter fw = new FileWriter("d:/好友.txt");
		int len = 0;
		while((len = stream.read())!=-1){
			fw.write(len);
		}
		fw.close();
		stream.close();
	}
}

5.定时上传日志;日志超过24小时删除

TimerTest.java(定时控制)

package hdfs.test.day03.log;

import java.util.Timer;

public class TimerTest {
	public static void main(String[] args) {
		Timer timer = new Timer();
		timer.schedule(new CollactionTask(), 0, 2*60*1000);
		timer.schedule(new CleanTask(), 0, 24*60*60*1000);
	}
}

CollactionTask.java(上传日志)

package hdfs.test.day03.log;

import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.TimerTask;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/**
 * 日志收集步骤:
 * 1、从日志目录查看哪些文件需要上传
 * 2、把需要上传的文件移动到待上传目录
 * 3、上传到hdfs上
 * 4、移动到备份目录
 * @author hasee
 *
 */
public class CollactionTask extends TimerTask{

	@Override
	public void run() {
		try {
			
			SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd-HH-mm");
			String date = format.format(new Date());
			
		// TODO Auto-generated method stub
		//1、查看上传文件
		File logDir = new File("d:/testlog/");
		File[] listFiles = logDir.listFiles(new FilenameFilter() {
			//FileNameFilter 哪些文件是需要获取的
			@Override
			public boolean accept(File dir, String name) {
				// TODO Auto-generated method stub
				return name.startsWith("test.log.");
			}
		});
		//2、将文件移动到待上传目录
		for (File file : listFiles) {
			
				FileUtils.moveFileToDirectory(file, new File("d:/waitUpload"), true);
			}
		
		//3、将待上传的文件逐个上传到hdfs上并移动到备份目录.
		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), new Configuration(),"root");
		Path depath = new Path("/log/"+date.substring(0, 10));
		boolean exists = fs.exists(depath);
		//3 判断待上传的目录是否已经存在 不存在则创建一个
		if(!exists){
			fs.mkdirs(depath);
			
		}
		
		//判断备份目录是否存在
		File backDir = new File("d:/backDir/"+date);
		boolean exists2 = backDir.exists();
		if(!exists2){
			backDir.mkdirs();
		}
		
		
		//得到上传的是哪一个服务上的日志文件
		String hostName = InetAddress.getLocalHost().getHostName();
		//4 遍历待上传的目录
		File file = new File("d:/waitUpload");
		File[] list = file.listFiles();
		
		for (File f : list) {
			//上传到hdfs上
			fs.copyFromLocalFile(new Path(f.getPath()), new Path(depath,hostName+"_"+f.getName()+"_"+System.currentTimeMillis()));
			//cp到备份目录
			FileUtils.moveFileToDirectory(f, backDir, true);
		}
		
		
		}catch (Exception e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		
		
	}

删除超时日志

package hdfs.test.day03.log;

import java.io.File;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.TimerTask;

import org.apache.commons.io.FileUtils;

public class CleanTask extends TimerTask {
	/**
	 * 清理备份日志
	 * 1.遍历出来所有的日志记录文件夹
	 * 2.把文件夹名字 转化为时间
	 * 3.如果文件夹时间与当前时间 时间差大于24小时,则删除
	 */
	@Override
	public void run() {
		try{
		SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd-HH-mm");
		File file = new File("d:/backDir");
		Date date = new Date();
		File[] files = file.listFiles();
		for (File dir : files) {
			String name = dir.getName();
			Date date2 = format.parse(name);
			if(date.getTime()-date2.getTime() > 24*60*60*1000){
				FileUtils.deleteDirectory(dir);
			}
		}
		}catch(Exception e){
			e.printStackTrace();
		}
	}

}

6.统计词频(分布式统计)

第一步,分类:将相同的单词交给同一个节点。

MapTask.java

执行的时候,先右键run as-->java application一次,出错。然后右键 run as-->run configurations 配置参数。 1 /word.txt 0 40

package hdfs.test.day04;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class MapTask {

	public static void main(String[] args) throws Exception {
		/**
		 * taskId 标识哪台机器运行
		 * file 统计哪个文件
		 * startOffSet 从哪个位置开始
		 * lenth 读多长
		 */
		
		int taskId = Integer.parseInt(args[0]);
		String file = args[1];
		long startOffSet = Long.parseLong(args[2]);
		long lenth = Long.parseLong(args[3]);
		//连接hdfs
		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), new Configuration(),"root");
		FSDataInputStream inputStream = fs.open(new Path(file));
		
		//创建输出文件
		FSDataOutputStream out_tem_1 = fs.create(new Path("/wordcount/tmp/part-m"+taskId+"-1"),true);
		FSDataOutputStream out_tmp_2 = fs.create(new Path("/wordcount/tmp/part-m"+taskId+"-2"),true);
		//定位从哪里读
		inputStream.seek(startOffSet);
		//创建字符缓冲输入流
		BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
		
		//除了taskId=1的能读第一行,后面的task都需要跳过一行。
		if(taskId != 1){
			reader.readLine();
		}
		//读取并且写入
		long count = 0;
		String line = null;
		while((line = reader.readLine())!=null){
			String[] split = line.split(" ");
			for (String word : split) {
				if(word.hashCode()%2 == 0){
					out_tem_1.write((word+"	"+1+"
").getBytes());
				}else{
					out_tmp_2.write((word+"	"+1+"
").getBytes());
				}
				//累加每行的数据个数
				count += line.length();
				if(count > lenth){
					break;
				}
			}
		}
		
		reader.close();
		out_tem_1.close();
		out_tmp_2.close();
		fs.close();
			
		
	}

}

第二步,分布式计算。

Reduce.java

package hdfs.test.day04;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.yarn.webapp.hamlet.HamletSpec.BR;

public class ReduceTask {

	public static void main(String[] args) throws Exception {
		// TODO Auto-generated method stub
		//获取运行机器代号
		int taskId = Integer.parseInt(args[0]);
		//创建map用于存储数据
		Map<String,Integer> map = new HashMap<>();
		//连接hdfs
		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"),new Configuration(),"root");
		//遍历hdfs文件
		RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/wordcount/tmp/"), true);
		//开始计算
		while(listFiles.hasNext()){
			LocatedFileStatus file = listFiles.next();
			//判断是否是自己需要计算的文件
			if(file.getPath().getName().endsWith("-"+taskId)){
				//创建读文件对象
				FSDataInputStream inputStream = fs.open(file.getPath());
				//缓冲流
				BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
				String line = null;
				while ((line = reader.readLine()) != null){
					String[] word = line.split("	");
					//计算单词的个数
					Integer count = map.getOrDefault(word[0], 0);
					count += Integer.parseInt(word[1]);
					map.put(word[0], count);				
				}
				inputStream.close();
				reader.close();
			}
			
			
			//将结果写入到hdfs
			FSDataOutputStream outputStream = fs.create(new Path("/wordcount/result/part-r-"+taskId));
			Set<Entry<String,Integer>> entrySet = map.entrySet();
			for (Entry<String, Integer> entry : entrySet) {
				outputStream.write((entry.getKey()+"="+entry.getValue()+"
").getBytes());
			}
			outputStream.close();
			
			
		}
		
		
		fs.close();
		
		
		
		
		
	}

}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于 2018-10-18
原文地址:https://www.cnblogs.com/drunkPullBreeze/p/11653435.html