大数据学习——mapreduce倒排索引

数据

a.txt

hello jerry
hello tom

b.txt

allen tom
allen jerry
allen hello

c.txt

hello jerry
hello tom

1 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

  <modelVersion>4.0.0</modelVersion>

  <groupId>com.cyf</groupId>
  <artifactId>MapReduceCases</artifactId>
  <packaging>jar</packaging>
  <version>1.0</version>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
  </properties>
  <dependencies>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>2.6.4</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-hdfs</artifactId>
      <version>2.6.4</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>2.6.4</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-core</artifactId>
      <version>2.6.4</version>
    </dependency>

    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.1.40</version>
    </dependency>

    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.36</version>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <artifactId>maven-assembly-plugin</artifactId>
        <configuration>
          <appendAssemblyId>false</appendAssemblyId>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
          <archive>
            <manifest>
              <mainClass>cn.itcast.mapreduce.index.IndexStepOne</mainClass>
            </manifest>
          </archive>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>assembly</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>

</project>

 2   IndexStepOne.java

package cn.itcast.mapreduce.index;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;


public class IndexStepOne {
    
    public static class IndexStepOneMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
        
        Text k = new Text();
        IntWritable v = new IntWritable(1);
        
        @Override
        protected void map(LongWritable key, Text value,Context context)
                throws IOException, InterruptedException {

            String line = value.toString();
            String[] words = line.split(" ");
            
            FileSplit Split = (FileSplit)context.getInputSplit();
            String filename = Split.getPath().getName();
            
            //输出key :单词--文件名  value:1
            for(String word : words){
                k.set(word +"--"+ filename);
                
                context.write(k, v);
            }
        
        }
    }
    
    public static class IndexStepOneReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
        
        IntWritable v = new IntWritable();
        
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values,
                Context context) throws IOException, InterruptedException {

            int count = 0;
            for(IntWritable value : values){
                count += value.get();
            }
            
            v.set(count);
            context.write(key, v);
        }
    }

    
    
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        
//        job.setJarByClass(IndexStepOne.class);
        //告诉框架,我们的程序所在jar包的位置
        job.setJar("/root/IndexStepOne.jar");
        //告诉程序,我们的程序所用的mapper类和reducer类是什么
        job.setMapperClass(IndexStepOneMapper.class);
        job.setReducerClass(IndexStepOneReducer.class);
        
        //告诉框架,我们程序输出的数据类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        //这里可以进行combiner组件的设置
        job.setCombinerClass(IndexStepOneReducer.class);
        
        //告诉框架,我们程序使用的数据读取组件 结果输出所用的组件是什么
        //TextInputFormat是mapreduce程序中内置的一种读取数据组件  准确的说 叫做 读取文本文件的输入组件
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        
        //告诉框架,我们要处理的数据文件在那个路劲下
        FileInputFormat.setInputPaths(job, new Path("/index/input"));
        
        //告诉框架,我们的处理结果要输出到什么地方
        FileOutputFormat.setOutputPath(job, new Path("/index/output-1"));
        
        boolean res = job.waitForCompletion(true);
        
        System.exit(res?0:1);
         
        
    }
}

 打包重命名并把该jar上传到hdfs

创建文件夹,并把a.txt  b.txt  c.txt传到该路径 

 hadoop fs -mkdir -p /index/input

运行

hadoop jar IndexStepOne.jar cn.itcast.mapreduce.index.IndexStepOne

输出结果

修改pom文件


打包并上传到hdfs

IndexStepTwo.java

package cn.itcast.mapreduce.index;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class IndexStepTwo {

    public static class IndexStepTwoMapper extends Mapper<LongWritable, Text, Text, Text> {

        Text k = new Text();
        Text v = new Text();

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            String line = value.toString();
            String[] fields = line.split("	");
            String word_file = fields[0];
            String count = fields[1];
            String[] split = word_file.split("--");
            String word = split[0];
            String file = split[1];

            k.set(word);
            v.set(file + "--" + count);

            context.write(k, v);

        }
    }


    public static class IndexStepTwoReducer extends Reducer<Text, Text, Text, Text> {

        Text v = new Text();

        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {

            StringBuffer sBuffer = new StringBuffer();
            for (Text value : values) {
                sBuffer.append(value.toString()).append(" ");
            }
            v.set(sBuffer.toString());
            context.write(key, v);
        }

    }


    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

//        job.setJarByClass(IndexStepTwo.class);
        job.setJar("/root/IndexStepTwo.jar");
        //告诉程序,我们的程序所用的mapper类和reducer类是什么
        job.setMapperClass(IndexStepTwoMapper.class);
        job.setReducerClass(IndexStepTwoReducer.class);

        //告诉框架,我们程序输出的数据类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        //这里可以进行combiner组件的设置
        job.setCombinerClass(IndexStepTwoReducer.class);

        //告诉框架,我们要处理的数据文件在那个路劲下
        FileInputFormat.setInputPaths(job, new Path("/index/output-1"));

        //告诉框架,我们的处理结果要输出到什么地方
        FileOutputFormat.setOutputPath(job, new Path("/index/output-2"));

        boolean res = job.waitForCompletion(true);

        System.exit(res ? 0 : 1);

    }


}
IndexStepTwo.jar
运行 hadoop jar IndexStepTwo.jar cn.itcast.mapreduce.index.IndexStepTwo 

运行结果如下:





原文地址:https://www.cnblogs.com/feifeicui/p/10222398.html