大数据学习——mapreduce共同好友

数据 commonfriends.txt

A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

  <modelVersion>4.0.0</modelVersion>

  <groupId>com.cyf</groupId>
  <artifactId>MapReduceCases</artifactId>
  <packaging>jar</packaging>
  <version>1.0</version>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
  </properties>
  <dependencies>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>2.6.4</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-hdfs</artifactId>
      <version>2.6.4</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>2.6.4</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-core</artifactId>
      <version>2.6.4</version>
    </dependency>

    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.1.40</version>
    </dependency>

    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.36</version>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <artifactId>maven-assembly-plugin</artifactId>
        <configuration>
          <appendAssemblyId>false</appendAssemblyId>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
          <archive>
            <manifest>
              <mainClass>cn.itcast.mapreduce.commonfriends.CommonFriendsStepOne</mainClass>
            </manifest>
          </archive>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>assembly</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>

</project>
CommonFriendsStepOne.java打包并上传
package cn.itcast.mapreduce.commonfriends;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import cn.itcast.mapreduce.index.IndexStepTwo;
import cn.itcast.mapreduce.index.IndexStepTwo.IndexStepTwoMapper;
import cn.itcast.mapreduce.index.IndexStepTwo.IndexStepTwoReducer;

public class CommonFriendsStepOne {

    public static class CommonFriendsStepOneMapper extends Mapper<LongWritable, Text, Text, Text> {

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            String line = value.toString();
            String[] splits = line.split(":");
            String person = splits[0];
            String[] friends = splits[1].split(",");

            for (String fString : friends) {
                context.write(new Text(fString), new Text(person));
            }

        }
    }

    public static class CommonFriendsStepOneReducer extends Reducer<Text, Text, Text, Text> {

        @Override
        protected void reduce(Text friend, Iterable<Text> persons, Context context)
                throws IOException, InterruptedException {

            StringBuffer sBuffer = new StringBuffer();
            for (Text pText : persons) {
                sBuffer.append(pText).append("-");
            }

            context.write(friend, new Text(sBuffer.toString()));
        }
    }


    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

//        job.setJarByClass(CommonFriendsStepOne.class);
        //告诉框架,我们的程序所在jar包的位置
        job.setJar("/root/CommonFriendsStepOne.jar");
        //告诉程序,我们的程序所用的mapper类和reducer类是什么
        job.setMapperClass(CommonFriendsStepOneMapper.class);
        job.setReducerClass(CommonFriendsStepOneReducer.class);

        //告诉框架,我们程序输出的数据类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        //告诉框架,我们要处理的数据文件在那个路劲下
        FileInputFormat.setInputPaths(job, new Path("/commonfriends/input"));

        //告诉框架,我们的处理结果要输出到什么地方
        FileOutputFormat.setOutputPath(job, new Path("/commonfriends/output-1"));

        boolean res = job.waitForCompletion(true);

        System.exit(res ? 0 : 1);
    }


}

运行

 hadoop jar CommonFriendsStepOne.jar cn.itcast.mapreduce.index.CommonFriendsStepOne

生成结果

CommonFriendsStepTwo.java
package cn.itcast.mapreduce.commonfriends;

import java.io.IOException;
import java.util.Arrays;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class CommonFriendsStepTwo {

    /**
     * A    I-K-C-B-G-F-H-O-D-
     * B    A-F-J-E-
     * C    A-E-B-H-F-G-K-
     */
    public static class CommonFriendsStepTwoMapper extends Mapper<LongWritable, Text, Text, Text> {
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            String line = value.toString();
            String[] splits = line.split("	");
            String friend = splits[0];
            String[] persons = splits[1].split("-");

            Arrays.sort(persons);

            for (int i = 0; i < persons.length - 1; i++) {
                for (int j = i + 1; j < persons.length; j++) {
                    context.write(new Text(persons[i] + "-" + persons[j]), new Text(friend));
                }

            }

        }
    }


    public static class CommonFriendsStepTwoReducer extends Reducer<Text, Text, Text, Text> {
        @Override
        protected void reduce(Text person_pair, Iterable<Text> friends, Context context)
                throws IOException, InterruptedException {

            StringBuffer sBuffer = new StringBuffer();
            for (Text fText : friends) {
                sBuffer.append(fText).append(" ");
            }
            context.write(person_pair, new Text(sBuffer.toString()));
        }
    }


    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

//        job.setJarByClass(CommonFriendsStepTwo.class);
        //告诉框架,我们的程序所在jar包的位置
        job.setJar("/root/CommonFriendsStepTwo.jar");
        //告诉程序,我们的程序所用的mapper类和reducer类是什么
        job.setMapperClass(CommonFriendsStepTwoMapper.class);
        job.setReducerClass(CommonFriendsStepTwoReducer.class);

        //告诉框架,我们程序输出的数据类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        //告诉框架,我们要处理的数据文件在那个路劲下
        FileInputFormat.setInputPaths(job, new Path("/commonfriends/output-1"));

        //告诉框架,我们的处理结果要输出到什么地方
        FileOutputFormat.setOutputPath(job, new Path("/commonfriends/output-2"));

        boolean res = job.waitForCompletion(true);

        System.exit(res ? 0 : 1);
    }


}

修改pom.xml文件

打包并上传

运行

hadoop jar CommonFriendsStepTwo.jar cn.itcast.mapreduce.index.CommonFriendsStepTwo 

 运行结果

A-B    E C 
A-C    D F 
A-D    E F 
A-E    D B C 
A-F    O B C D E 
A-G    F E C D 
A-H    E C D O 
A-I    O 
A-J    O B 
A-K    D C 
A-L    F E D 
A-M    E F 
B-C    A 
B-D    A E 
B-E    C 
B-F    E A C 
B-G    C E A 
B-H    A E C 
B-I    A 
B-K    C A 
B-L    E 
B-M    E 
B-O    A 
C-D    A F 
C-E    D 
C-F    D A 
C-G    D F A 
C-H    D A 
C-I    A 
C-K    A D 
C-L    D F 
C-M    F 
C-O    I A 
D-E    L 
D-F    A E 
D-G    E A F 
D-H    A E 
D-I    A 
D-K    A 
D-L    E F 
D-M    F E 
D-O    A 
E-F    D M C B 
E-G    C D 
E-H    C D 
E-J    B 
E-K    C D 
E-L    D 
F-G    D C A E 
F-H    A D O E C 
F-I    O A 
F-J    B O 
F-K    D C A 
F-L    E D 
F-M    E 
F-O    A 
G-H    D C E A 
G-I    A 
G-K    D A C 
G-L    D F E 
G-M    E F 
G-O    A 
H-I    O A 
H-J    O 
H-K    A C D 
H-L    D E 
H-M    E 
H-O    A 
I-J    O 
I-K    A 
I-O    A 
K-L    D 
K-O    A 
L-M    E F 


原文地址:https://www.cnblogs.com/feifeicui/p/10222446.html