实现计算出用户间的共同好友和二度好友

问题需求:下面是用户的好友关系列表,每一行代表一个用户和他的好友列表。求出任意两个人之间的共同好友都有谁(好友关系是单向的,也就是说1的好友里面有2,但是2的好友里面不一定有1)。

1    2,3,4,5,6
3    1,5,6,7,9
2    3,5,7,9,11,12

思路:

1.首先把用户作为value,他好友列表拆分后作为key,类似倒排操作,如下:

2     1  
3     1  
4     1  
5     1  
6     1  
1     3  
5     3  
6     3  
7     3  
9     3  
3     2  
5     2  
7     2  
9     2  
11    2  
12    2 

2.进行reduceByKey操作,value需要进行排序,这样value的任意两个人都有共同好友为key的那个人:

11  2
1   3
7   2,3
9   2,3
4   1
6   1,3
3   1,2
5   1,2,3
2   1
12  2

3.如果求两两好友就双重for循环value作为下次的key,三个的共同好友原理一样,三层for循环就行:

1_2  3
1_2  5
2_3  9
2_3  7
1_3  6
1_3  5
2_3  5

4.再次进行reduceByKey操作就可以得到共同好友了。

1_3    5,6        
2_3    7,5,9      
1_2    3,5  

 Mapreduce实现代码:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class FindFrients1 {
    public static class FindFrients1Mapper extends Mapper<LongWritable, Text, Text, Text> {
        /**
         * map输出的key:Text     guid
         * map输出的value:Text   0_1 和 1_1   是否点击_曝光
         */
        Text keyOut = new Text();
        Text valueOut = new Text();

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();

            String uid = line.split("	")[0];

            String[] friends = line.split("	")[1].split(",");

            for (String fre: friends) {
                context.write(new Text(fre),new Text(uid));
            }
        }
    }


    public static class FindFrients1Reducer extends Reducer<Text, Text, Text, Text> {
        /**
         * reduce输出的key: Text
         * reduce输出的value: NullWritable
         */
        Text valueOut = new Text();

        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            // guid   [0_1,0_1,1_1]   ---> guid  1  3

            StringBuilder sb = new StringBuilder();
            for (Text fre: values) {
                sb.append(fre.toString()).append(",");
            }
            String re=sb.substring(0,sb.length()-1);

            context.write(key,new Text(re));

        }
    }


    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "FindFrients1");
        job.setMapperClass(FindFrients1Mapper.class);
        job.setReducerClass(FindFrients1Reducer.class);
//        //设置combiner运行的reducer类【重要】
//        job.setCombinerClass(AvgrageScoreCombiner.AvgScoreCombiner.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputValueClass(Text.class);
        job.setNumReduceTasks(1);


        FileInputFormat.addInputPath(job, new Path("E:\tmp\badou\data\hadoop_test\findFriend\test_friend.txt"));
        Path outputPath = new Path("E:\tmp\badou\data\hadoop_test\findFriend\output1");
        FileSystem fs = FileSystem.get(conf);
        if (fs.exists(outputPath)) {
            fs.delete(outputPath, true);//递归删除
            System.out.println("outputpath:【" + outputPath.toString() + "】 delete success!");
        }


        FileOutputFormat.setOutputPath(job, outputPath);
        boolean isSuccess = job.waitForCompletion(true);
        int status = isSuccess ? 0 : 1;
        System.exit(status);

    }

}


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.Arrays;

public class FindFrients2 {
    public static class FindFrients2Mapper extends Mapper<LongWritable, Text, Text, Text> {
        /**
         * map输出的key:Text     guid
         * map输出的value:Text   0_1 和 1_1   是否点击_曝光
         */
        Text keyOut = new Text();
        Text valueOut = new Text();

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            /// 孙初丹   刘领会_李四_王五

            String line = value.toString();

            String[] lines= line.split("	");
            //拿到朋友
            String uid = lines[0];
            //拿到朋友
            String[] fri = lines[1].split(",");
            //针对朋友一个排序
//        [刘领会,李四,王五]
            //  刘灵会,李四   孙初丹
            Arrays.sort(fri);
            for (int i = 0; i <fri.length-1 ; i++) {
                for (int j = i+1; j <fri.length ; j++) {
                    context.write(new Text(fri[i]+"_"+fri[j]),new Text(uid));
                }
            }

        }
    }


    public static class FindFrients2Reducer extends Reducer<Text, Text, Text, Text> {
        /**
         * reduce输出的key: Text
         * reduce输出的value: NullWritable
         */
        Text valueOut = new Text();

        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            // guid   [0_1,0_1,1_1]   ---> guid  1  3

            StringBuilder sb = new StringBuilder();
            for (Text fre:values) {
                sb.append(fre+",");
            }
            String re = sb.substring(0,sb.length()-1);
            context.write(new Text(key),new Text(re));

        }
    }


    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "FindFrients2");
        job.setMapperClass(FindFrients2Mapper.class);
        job.setReducerClass(FindFrients2Reducer.class);
//        //设置combiner运行的reducer类【重要】
//        job.setCombinerClass(AvgrageScoreCombiner.AvgScoreCombiner.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputValueClass(Text.class);
        job.setNumReduceTasks(1);


        FileInputFormat.addInputPath(job, new Path("E:\tmp\badou\data\hadoop_test\findFriend\output1\"));
        Path outputPath = new Path("E:\tmp\badou\data\hadoop_test\findFriend\output2");
        FileSystem fs = FileSystem.get(conf);
        if (fs.exists(outputPath)) {
            fs.delete(outputPath, true);//递归删除
            System.out.println("outputpath:【" + outputPath.toString() + "】 delete success!");
        }


        FileOutputFormat.setOutputPath(job, outputPath);
        boolean isSuccess = job.waitForCompletion(true);
        int status = isSuccess ? 0 : 1;
        System.exit(status);

    }

}
View Code

 Spark实现代码:

package com.badou.function


import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.collection.mutable.ListBuffer
object Friends {
  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("feat_eg")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      //      .config("hive.metastore.uris",
      //        "thrift://"+"192.168.10.42"+":9083")
      //      .config("spark.storage.memoryFraction",0.6)
      .enableHiveSupport()
      .getOrCreate()

    val orcPath = "E:\tmp\badou\data\hadoop_test\findFriend\test_friend.txt"
    var userLogDF: DataFrame = sparkSession.read.format("csv").option("sep", "	").load(orcPath)
    userLogDF.createOrReplaceTempView("user_friend")
    userLogDF = sparkSession.sql("SELECT explode(split(_c1,','))  AS friend,_c0 AS uid FROM user_friend")
    userLogDF.createOrReplaceTempView("user_friends")
    userLogDF = sparkSession.sql("SELECT friend,concat_ws(',',collect_list(uid)) AS uid FROM user_friends  GROUP BY friend")
//    userLogDF.show()
    val rdd = userLogDF.rdd
    val rdd2 = rdd.map(f => {
      val str = f.getString(0)
      val uidSort = f.getString(1).split(",").sortWith(_ < _)
      var sb1 =new StringBuffer()
      for(uid <- uidSort){
        sb1.append(uid+",")
      }
      val uStr = sb1.substring(0,sb1.length()-1)
//      println("map:"+str+" uid:"+sb1)
      (str,uStr)
    })
//    rdd2.foreach(f=>println(f._1+"  "+f._2))
    val value = rdd2.mapPartitions(f => {
      val list = f.toList
      var sb2 = new ListBuffer[(String,String)]
      for (one <- list) {
        val value1 = one._1
        val list1 = one._2.split(",").toList
        if(list1.length>1){
          for (i <- 0 to list1.length - 2) {
            for (j <- 1 to list1.length-1) {
              if(i!=j){
                sb2.append((list1(i) + "_" + list1(j) , value1))
              }
            }
          }
        }

      }
      sb2.toIterator
    })
    value.foreach(f=>println(f._1+"  "+f._2))
    import sparkSession.implicits._
    var friendDF = value.toDF("friends", "same_friend")
    friendDF.createOrReplaceTempView("user_friends")
    friendDF = sparkSession.sql("SELECT friends,concat_ws(',',collect_list(same_friend)) AS same_friend FROM user_friends  GROUP BY friends")
    friendDF.show(false)
  }
}
View Code

 

5、二度好友:双方有一个以上共同的好友,这时朋友网可以计算出你们有几个共同的好友并且呈现数字给你(好友的好友,但并不是直接好友)。你们的关系是: 你->朋友->陌生人

当前用户的好友排除掉两两共同的好友就是二度好友。

 原始好友处理成如下:

1_3    2,3,4,5,6
1_2    2,3,4,5,6
3_1    1,5,6,7,9
3_2    1,5,6,7,9
2_1    3,5,7,9,11,12
2_3    3,5,7,9,11,12

共同好友第4步的结果处理成:

1_3    5,6       
2_3    7,5,9     
1_2    3,5       
2_1    3,5       
3_2    7,5,9     
3_1    5,6       

这样根据相同的key,排除掉共同好友就是二度好友

1_3    2,3,4
1_2    2,4,6
3_1    1,7,9
3_2    1,6
2_1    7,9,11,12
2_3    3,11,12

  Spark实现共同好友和二度好友的全部代码:

import java.util
import org.apache.spark.sql.{DataFrame, SparkSession}

import scala.collection.mutable.ListBuffer
object Friends {
  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("feat_eg")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      //      .config("hive.metastore.uris",
      //        "thrift://"+"192.168.10.42"+":9083")
      //      .config("spark.storage.memoryFraction",0.6)
      .enableHiveSupport()
      .getOrCreate()



    val orcPath = "E:\tmp\badou\data\hadoop_test\findFriend\test_friend.txt"
    var userLogDF: DataFrame = sparkSession.read.format("csv").option("sep", "	").load(orcPath)
    userLogDF.createOrReplaceTempView("user_friend")
    var friendODF: DataFrame = sparkSession.sql("SELECT _c0 AS uid,_c1 AS friend FROM user_friend")
    friendODF.createOrReplaceTempView("user_friend")

    userLogDF = sparkSession.sql("SELECT explode(split(friend,','))  AS friend,uid AS uid FROM user_friend")
    userLogDF.createOrReplaceTempView("user_friends")
    userLogDF = sparkSession.sql("SELECT friend,concat_ws(',',collect_list(uid)) AS uid FROM user_friends  GROUP BY friend")
//    userLogDF.show()
    val rdd = userLogDF.rdd
    val rdd2 = rdd.map(f => {
      val str = f.getString(0)
      val uidSort = f.getString(1).split(",").sortWith(_ < _)
      var sb1 =new StringBuffer()
      for(uid <- uidSort){
        sb1.append(uid+",")
      }
      val uStr = sb1.substring(0,sb1.length()-1)
//      println("map:"+str+" uid:"+sb1)
      (str,uStr)
    })
//    rdd2.foreach(f=>println(f._1+"  "+f._2))
    val value = rdd2.mapPartitions(f => {
      val list = f.toList
      var sb2 = new ListBuffer[(String,String)]
      for (one <- list) {
        val value1 = one._1
        val list1 = one._2.split(",").toList
        if(list1.length>1){
          for (i <- 0 to list1.length - 2) {
            for (j <- 1 to list1.length-1) {
              if(i!=j){
                sb2.append((list1(i) + "_" + list1(j) , value1))
                sb2.append((list1(j) + "_" + list1(i) , value1))
              }
            }
          }
        }
      }
      sb2.toIterator
    })
    value.foreach(f=>println(f._1+"  "+f._2))
    import sparkSession.implicits._
    var commonFriendDF = value.toDF("friends", "same_friend")
    commonFriendDF.createOrReplaceTempView("user_friends")
    commonFriendDF = sparkSession.sql("SELECT friends,concat_ws(',',collect_list(same_friend)) AS same_friend FROM user_friends  GROUP BY friends")
    commonFriendDF.show(false)


    //下面是求二度好友
    friendODF.show(false)
    friendODF.createOrReplaceTempView("user_friend")
    var allUidDF =sparkSession.sql("SELECT uid,friend,'1' AS cout  FROM user_friend ")
    allUidDF.createOrReplaceTempView("user_friend")
    allUidDF = sparkSession.sql("SELECT concat_ws(',',collect_list(uid)) AS all_uid FROM user_friend  GROUP BY cout")
    friendODF.createOrReplaceTempView("user_friend")
    allUidDF.createOrReplaceTempView("all_uid")
    allUidDF = sparkSession.sql("SELECT t1.*,t2.* FROM user_friend t1 CROSS JOIN all_uid t2 ")
    allUidDF.show(false)

    val allUidRdd = allUidDF.rdd
    var value1 = allUidRdd.mapPartitions(f => {
      val list = f.toList
      var sb2 = new ListBuffer[(String, String)]
      for (one <- list) {
        val uid = one.getString(0)
        val friend = one.getString(1)
        val all_uid = one.getString(2).split(",")
        for(oneid <- all_uid){
          if(!uid.equals(oneid)){
            sb2.append((uid + "_" + oneid , friend))
          }
        }
      }
      sb2.toIterator
    })
    value1.foreach(f=>println(f._1+"allUidRdd"+f._2))
//    import sparkSession.implicits._
//    var friendsDF = value1.toDF("uids", "friends")
//    friendsDF.createOrReplaceTempView("all_uid")

    val commonFriendBro = commonFriendDF.collectAsList()
    val commonFriend = sparkSession.sparkContext.broadcast(commonFriendBro)

    val value2 = value1.map(f => {
      var key =""
      var values =""
      var sb1 =new StringBuffer()
      var commonFriendValue = commonFriend.value
      import scala.collection.JavaConversions._  //用java集合的隐式转换

      for (oneSet <- commonFriendValue) {
        val str0 = oneSet.getString(0)
        val commonFriends = oneSet.getString(1)
        if (f._1.equals(str0)) {
          val stringArr = f._2.split(",")
          var list1 = new util.ArrayList[String]()
          for (str <- stringArr) {
            list1.add(str)
          }
          val commonFriArr = commonFriends.split(",")
          var listCom = new util.ArrayList[String]()
          for (arr <- commonFriArr) {
            listCom.add(arr)
          }
          list1.removeAll(listCom);

          for (li <- list1) {
            sb1.append(li+",")
          }
          key=str0
          values = sb1.substring(0,sb1.length()-1)
        }
      }
      (key,values)
    })
    value2.foreach(f=>println(f._1+"    "+f._2))
  }
}
View Code

参考:1、MapReduce简单实践:两步实现查找共同好友 

   2、两个MapReduce实现计算出用户间的共同好友 

     3、Spark 计算人员二度关系

原文地址:https://www.cnblogs.com/yangms/p/15410332.html