MapReduce实例-基于内容的推荐(一)

环境:
  Hadoop1.x,CentOS6.5,三台虚拟机搭建的模拟分布式环境

  数据:下载的amazon产品共同采购网络元数据(需翻墙下载)http://snap.stanford.edu/data/amazon-meta.html

方案目标:

  从数据中提取出每个用户买过哪些商品,根据买过的商品以及商品之间的相关性来对用户进行推荐商品

   下载的数据如下所示为单位

Id: 1
ASIN: 0827229534
title: Patterns of Preaching: A Sermon Sampler
group: Book
salesrank: 396585
similar: 5 0804215715 156101074X 0687023955 0687074231 082721619X
categories: 2
|Books[283155]|Subjects[1000]|Religion & Spirituality[22]|Christianity[12290]
|Clergy[12360]|Preaching[12368]
|Books[283155]|Subjects[1000]|Religion & Spirituality[22]|Christianity[12290]
|Clergy[12360]|Sermons[12370]
reviews: total: 2 downloaded: 2 avg rating: 5
2000-7-28 cutomer: A2JW67OY8U6HHK rating: 5 votes: 10 helpful: 9
2003-12-14 cutomer: A2VE83MZF98ITY rating: 5 votes: 6 helpful: 5

 

思路:

  整套程序需要分解为两个步骤。1.提取每个用户买过哪些商品。2.根据第一步产生的数据,结合用户的感兴趣度与商品之间的关联生成推荐商品

本篇文章主要做第一步。

这一步骤的主要难点是对自定义输入格式的编写。

1.自定义格式化输入数据

  如上所示的数据, 需要自定义输入数据的格式来提取数据。

  job.setInputFormatClass(TestAmazonDataFormat.class);

  那怎么做自定义输入格式呢?

  这里我们需要了解文件在HDFS中的处理方式。我们知道文件在放入HDFS中时会进行分片。因此我们要对数据进行操作的时候,需要获取文件的信息(文件名、path、开始位置、长度、位于哪个节点等)。

传入文件信息:

//获取文件信息
public class TestAmazonDataFormat extends FileInputFormat<Text, Text> {

     TestAmazonDataReader datareader;
    @Override
    public RecordReader<Text, Text> createRecordReader(InputSplit inputSplit, TaskAttemptContext attempt)
            throws IOException, InterruptedException {
        datareader = new TestAmazonDataReader();
        datareader.initialize(inputSplit, attempt);    //传入文件信息
        // TODO Auto-generated method stub
        return datareader;
    }
    

}
View Code

读取文件:

package ren.snail;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

/**
 * @author Srinath Perera (hemapani@apache.org)
 */

public class TestAmazonDataReader extends RecordReader<Text, Text> {
    private static Pattern pattern1 = Pattern.compile(
            "\s+([^\s]+)\s+cutomer:\s+([^\s]+)\s+rating:\s+([^\s]+)\s+votes:\s+([^\s]+)\s+helpful:\s+([^\s]+).*");
    private BufferedReader reader;
    private int count = 0;
    private Text key;
    private Text value;
    private StringBuffer currentLineData = new StringBuffer();
    String line = null;

    public TestAmazonDataReader() {
    }

    public void initialize(InputSplit inputSplit, TaskAttemptContext attempt) throws IOException, InterruptedException {
        // TODO Auto-generated method stub
        Path path = ((FileSplit) inputSplit).getPath();
        FileSystem fs = FileSystem.get(URI.create(path.toString()), attempt.getConfiguration());     //这里需要注意:由于fs.open的格式为file:///,而path获取的为HDFS的hdfs://XXXXX,因此需要在此进行转换
        // FileSystem fs = FileSystem.get(attempt.getConfiguration());
        FSDataInputStream fsStream = fs.open(path);
        reader = new BufferedReader(new InputStreamReader(fsStream), 1024 * 100);
        while ((line = reader.readLine()) != null) {
            if (line.startsWith("Id:")) {
                break;
            }
        }
    }

    // define key and value
    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        // TODO Auto-generated method stub
         currentLineData = new StringBuffer();
            count++;
            boolean readingreview = false;
        while ((line = reader.readLine()) != null) {
             if(line.trim().length() == 0){
                    value = new Text(currentLineData.toString());
                    return true; 
                } 
             else {
                if (readingreview) {
                     
                        Matcher matcher = pattern1.matcher(line);
                        if(matcher.matches())
                        {
                            currentLineData.append("review=").append(matcher.group(2)).append("|")
                            .append(matcher.group(3)).append("|")
                            .append(matcher.group(4)).append("|")
                            .append(matcher.group(5)).append("#");
                        }
                        else{
                            System.out.println("review "+ line + "does not match");
                        }
                } else {
                     int indexOf = line.indexOf(":");
                        if(indexOf > 0){
                            String key = line.substring(0,indexOf).trim();
                            String value = line.substring(indexOf+1).trim();
                            if(value == null || value.length() == 0){
                                continue;
                            }
                            if(value.indexOf("#") > 0){
                                value = value.replaceAll("#", "_");
                            }
                            
                            if(key.equals("ASIN") || key.equals("Id") || key.equals("title") || key.equals("group") || key.equals("salesrank")){
                                if(key.equals("ASIN")){
                                    this.key = new Text(value);
                                }
                                currentLineData.append(key).append("=").append(value.replaceAll(",", "")).append("#");
                            }else  if(key.equals("similar")){
                                String[] tokens = value.split("\s+");
                                //yes we skip the first one
                                if(tokens.length >= 2){
                                    currentLineData.append(key).append("=");
                                    for(int i=1;i<tokens.length;i++){
                                        currentLineData.append(tokens[i].trim()).append("|");
                                    }
                                    currentLineData.append("#");
                                }
                            }else  if( key.equals("reviews")){
                                readingreview = true; 
                            }
                        }
                }
            }

        }
        return false;
    }

    @Override
    public Text getCurrentKey() throws IOException, InterruptedException {
        return key;
    }

    @Override
    public Text getCurrentValue() throws IOException, InterruptedException {
        return value;
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {
        return count;
    }

    @Override
    public void close() throws IOException {
        reader.close();
    }
}
View Code

Map和Reduce

代码Map中有对于Amazon元数据的方法,就不给出了。就是对input传入的value数据进行解析

package ren.snail;



import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.List;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.tools.GetConf;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import ren.snail.AmazonCustomer.ItemData;

 

/**
 * Find number of owner and replies received by each thread 
 * @author Srinath Perera (hemapani@apache.org)
 */
public class Main extends Configured implements Tool {
    public static SimpleDateFormat dateFormatter = new SimpleDateFormat("EEEE dd MMM yyyy hh:mm:ss z");

    public static class AMapper extends Mapper<Object, Text, Text, Text> {

public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
    //System.out.println(key + "="+ value);
    try {
    List<AmazonCustomer> customerList = AmazonCustomer.parseAItemLine(value.toString());
    for(AmazonCustomer customer: customerList){
        context.write(new Text(customer.customerID), new Text(customer.toString()));
        //System.out.println(customer.customerID + "=" + customer.toString());
    }
    } catch (Exception e) {
        e.printStackTrace();
        System.out.println("Error:" +e.getMessage());
    }
}
    }

    public static class AReducer extends Reducer<Text, Text, IntWritable, Text> {

        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            AmazonCustomer  customer = new AmazonCustomer(); 
            customer.customerID = key.toString(); 
            
            for(Text value: values){
                Set<ItemData> itemsBrought = new AmazonCustomer(value.toString()).itemsBrought; 
                for(ItemData itemData: itemsBrought){
                    customer.itemsBrought.add(itemData); 
                }
            }
//            if(customer.itemsBrought.size() > 5){
                context.write(new IntWritable(customer.itemsBrought.size()), new Text(customer.toString()));
//            }
        }
    }

    public static void main(String[] args) throws Exception {
        int result = ToolRunner.run(new Configuration(), new Main(), args);
        System.exit(result);
        
        
    }

    @Override
    public int run(String[] arg0) throws Exception {
        // TODO Auto-generated method stub
         
        Configuration configuration = getConf();
        Job job = new Job(configuration, "MostFrequentUserFinder");
        job.setJarByClass(Main.class);
        job.setMapperClass(AMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(Text.class);
        // Uncomment this to
        // job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(AReducer.class);
        job.setInputFormatClass(TestAmazonDataFormat.class);
        FileInputFormat.addInputPath(job, new Path(arg0[0]));
        FileOutputFormat.setOutputPath(job, new Path(arg0[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
        return 0;
    }
}
View Code

最终的输出如下:

customerID=A11NCO6YTE4BTJ,review=ASIN=0738700797#title=Candlemas: Feast of Flames#salesrank=168596#group=Book#rating=5#similar=0738700827|1567184960|1567182836|0738700525|0738700940|,

原文地址:https://www.cnblogs.com/ren-jie/p/5427056.html