hadoop-hdfs-AP

hadoop-hdfs-API

在windows上部署hadoop包

windows配置环境变量

hadoop的bin目录,HADOOP_USER_NAME root

配置eclipse,从Linux上拉取core-sit.xml,hdfs-site.xml文件

java代码

Configuration conf;加载配置文件

FileSystem fs ;客户端

fs =FileSystem.get(conf);

创建目录

@Test public void mkdir() throws Exception{ Path ifile=new Path("/ooxx"); if(fs.exists(ifile)){ fs.delete(ifile,true); } fs.mkdirs(ifile); }

上传文件 @Test public void upload() throws Exception{ Path f = new Path("/ooxx/hello.txt/"); FSDataOutputStream output = fs.create(f); InputStream input = new BufferedInputStream(new FileInputStream(new File("D:reqlog.txt"))); IOUtils.copyBytes(input, output, conf,true);

}

}

查看块的信息

@Test

public void blks(){

Path i = new Path("/ooxx/hello.txt/");

FileStatus ifile = fs.getFileStatus(i);

BlockLocation[] blks = fs.getFileBlockLocations(ifile,0,ifile.getLen());

for (BlockLocation b : blks){

system.out.println(b);

}

}

hdfs dfs -mkdir -p /user/root 命令行上传文件

Hoodoop YARN

资源框架

用户写的分析程序------class------jar------>

ResourceManager(资源管理)监控所有NodeManager1,NodeManager2....所有资源,开始分配资源,通知NM启动AppMaster进程(执行jar包程序)

AppMaster向RM申请资源,并提交分片位置,RM按照就近原则返回资源的清单(1,node1,1个CPU,1G内存,2,node2,2,2),AppMaster按清单通知(所有的DataNode节点上都有NodeManager)node01启动相应的进程YarnChild进程

App Master资源调度 调动container(容器,资源约束)

一个作业有一个AppMaster调度,各作业间不受影响

1:1=NodeManager(监管本台机器上的资源):DataNode

yarn-daemon.sh start resourcemanager 启动

用户写好的计算程序----(jar包)----->ResourceManager(监控所有NodeManager所有的资源使用情况)------(通知NM启动AppMaster进程,根据分片的位置信息和个数)有空闲资源的NodeManager-----(申请map任务的个数)---------->ResourceManager(安照就近原则安排资源)------(返回资源清单)------>AppMaster根据清单通知各节点DataNode启动相应的进程(YarnChild进程),运行map任务-------->reduce任务先会优先选择数据所再的机器上运行。【就近原则:移动计算,不移动数据,数据在哪台机器上就优先分配哪几台机器上的资源】

WordCount

public class RunJob { public static void main(String[] args) { Configuration conf= new Configuration(); try { Job job=Job.getInstance(conf); FileSystem fs=FileSystem.get(conf); job.setJobName("wc"); job.setJarByClass(RunJob.class); job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //设置reducer的数量 job.setNumReduceTasks(1); FileInputFormat.addInputPath(job,new Path("/ooxx/test.txt")); //设置计算输出目录(该目录必须不存在,如果存在计算框架会出错) Path outpath=new Path("/output/wc"); if(fs.exists(outpath)){ fs.delete(outpath,true); } FileOutputFormat.setOutputPath(job, outpath); //开始执行 boolean f = job.waitForCompletion(true); if(f){ System.out.println("mapreduce程序执行成功!"); } } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { String[] wordsArray = value.toString().split(" "); for (int i = 0; i < wordsArray.length; i++) { String word=wordsArray[i]; Text outKey=new Text(word); IntWritable outValue=new IntWritable(1); context.write(outKey, outValue); } } }

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> iter, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int sum=0; for (IntWritable i:iter) { sum+=i.get(); } context.write(key, new IntWritable(sum)); } }

job.setNumReduceTasks(1);

KEYIN:输入数据的键类型

VALUEIN:输入数据的值类型

LongWritable 被重写了 可以序列化和反序列化

Text-----String重写

 

WordCountMapper extends Mapper<KEYIN(LongWritable),VALUEIN(Text),KEYOUT(Text),VALUEOUT(IntWritable)>{

protected void map(LongWritable key,Text value,Context context){

String[] wordArray=value.toString().split(" ");

for(){

 

}

}

}

hadoop jar ./my_wc.jar

原文地址:https://www.cnblogs.com/huzicourenao/p/11037795.html