Mapreduce——视频播放数据分类统计

很多视频网站都有电视剧热度排名,一般是依据用户在自己站的行为数据所体现出的受欢迎程度来排名。这里有一份来自优酷、爱奇艺、搜索视频等五大视频网站的一份视频播放数据,我们利用这份数据做些有意义的事情。

金婚第一部 3 9851 0 0 0 0
金婚第一部 3 9851 0 0 0 0
金婚第一部 3 9851 0 0 0 0
金婚第一部 3 9851 0 0 0 0
金婚第一部 3 9851 0 0 0 0
金子,轻松出来吧 1 4715 0 5 0 0
金子,轻松出来吧 1 2685 0 3 0 0
金枝欲孽 1 52307 9 14 0 4
金枝欲孽 3 8174 0 0 0 0
金枝欲孽 1 50709 3 28 0 6
金枝欲孽 3 8710 0 0 0 4
金枝欲孽 1 55621 0 10 0 2

注意:1-5数字和5大视频的关系:1优酷2搜狐3土豆4爱奇艺5迅雷看看

一、项目需求

自定义输入格式 完成统计任务 输出多个文件

输入数据:5个网站的 每天电视剧的 播放量 收藏数 评论数 踩数 赞数

输出数据:按网站类别 统计每个电视剧的每个指标的总量

任务目标:自定义输入格式 完成统计任务 输出多个文件

二、项目实现

第一步:既然要输出电视剧的一系列数据,那我们就需要定义一个电视剧热度数据的类,直接上代码。

package com.hadoop.mapreduce.test;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
/**
* 电视剧数据读写类
* 数据格式参考:继承者们    1    4105447    202    844    48    671
* @author Sparks.Li
* 需要自定义一个 VideoWritable 类实现 WritableComparable 接口,将每天电视剧的 播放量 收藏数 评论数 踩数 赞数 封装起来。
*/

//注意:    Hadoop通过Writable接口实现的序列化机制,不过没有提供比较功能,所以和java中的Comparable接口合并,提供一个接口WritableComparable。(自定义比较)
//Writable接口提供两个方法(write和readFields)。

public class VideoWritable implements WritableComparable< Object > {

    private Long PlayNum;
    private Long FavoriteNum;
    private Long CommentNum;
    private Long DownNum;
    private Long LikeNum;
    
    
//  问:这里我们自己编程时,是一定要创建一个带有参的构造方法,为什么还要显式的写出来一个带无参的构造方法呢?
//  答:构造器其实就是构造对象实例的方法,无参数的构造方法是默认的,但是如果你创造了一个带有参数的构造方法,那么无参的构造方法必须显式的写出来,否则会编译失败。
  
    public VideoWritable(){}

    public VideoWritable(Long PlayNum,Long FavoriteNum,Long CommentNum,Long DownNum,Long LikeNum){//java����вι��캯��,�������ڴ�������ʱ��ʼ������  
        this.PlayNum = PlayNum;
        this.FavoriteNum = FavoriteNum;
        this.CommentNum = CommentNum;
        this.DownNum = DownNum;
        this.LikeNum = LikeNum;
    }
    

    
    public void set(Long PlayNum,Long FavoriteNum,Long CommentNum,Long DownNum,Long LikeNum){
        this.PlayNum = PlayNum;
        this.FavoriteNum = FavoriteNum;
        this.CommentNum = CommentNum;
        this.DownNum = DownNum;
        this.LikeNum = LikeNum;
    }
    
    
    public Long getPlayNum() {
        return PlayNum;
    }
    public void setPlayNum(Long PlayNum){
        this.PlayNum = PlayNum;
    }
    public Long getFavoriteNum() {
        return FavoriteNum;
    }
    public void setFavoriteNum(Long FavoriteNum){
        this.FavoriteNum = FavoriteNum;
    }
    public Long getCommentNum() {
        return CommentNum;
    }
    public void setCommentNum(Long CommentNum){
        this.CommentNum = CommentNum;
    }
    public Long getDownNum() {
        return DownNum;
    }
    public void setDownNum(Long DownNum){
        this.DownNum = DownNum;
    }
    public Long getLikeNum() {
        return LikeNum;
    }
    public void setLikeNum(Long LikeNum){
        this.LikeNum = LikeNum;
    }
    
    // 实现WritableComparable的readFields()方法
//  对象不能传输的,需要转化成字节流!
//  将对象转换为字节流并写入到输出流out中是序列化,write 的过程
//  从输入流in中读取字节流反序列化为对象      是反序列化,readFields的过程
    
    public void readFields(DataInput in) throws IOException {
        PlayNum = in.readLong();
        FavoriteNum = in.readLong();
        CommentNum = in.readLong();
        DownNum = in.readLong();
        LikeNum = in.readLong();
//      in.readByte()
//      in.readChar()
//      in.readDouble()
//      in.readLine() 
//      in.readFloat()
//      in.readLong()
//      in.readShort()

    }
    

    public void write(DataOutput out) throws IOException {
        out.writeLong(PlayNum);
        out.writeLong(FavoriteNum);
        out.writeLong(CommentNum);
        out.writeLong(DownNum);
        out.writeLong(LikeNum);
//      out.writeByte()
//      out.writeChar()
//      out.writeDouble()
//      out.writeFloat()
//      out.writeLong()
//      out.writeShort()
//      out.writeUTF()
    }
    
    public int compareTo(Object o) {
        return 0;
    }
    
    
    
}
VideoWritable

第二步:通过查看数据集可知,hadoop自带的数据输入格式已经不满足我们的需求了,这时候我们就需要来实现自己的InputFormat类啦。

package com.hadoop.mapreduce.test;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;
/**
* 自定义电视剧数据读写类
* 数据格式参考:继承者们    1    4105447    202    844    48    671
* @author Sparks.Li
* 需要自定义一个 VideoWritable 类实现 WritableComparable 接口,将每天电视剧的 播放量 收藏数 评论数 踩数 赞数 封装起来。
*/


//自定义输入格式 VideoInputFormat 类,首先继承 FileInputFormat,然后分别重写 isSplitable() 方法和 createRecordReader() 方法。

public class VideoInputFormat extends FileInputFormat<Text,VideoWritable > {//自定义数据输入格式,其实这都是模仿源码的!可以去看

//    线路是: boolean  isSplitable()   ->   RecordReader<Text,VideoWritable> createRecordReader()   ->   VideoRecordReader extends RecordReader<Text, VideoWritable > 
    
    @Override
    protected boolean isSplitable(JobContext context, Path filename) {//这是InputFormat的isSplitable方法
//      isSplitable(),如果是压缩文件就不切分,整个文件封装到一个InputSplit
//      isSplitable(),如果是非压缩文件就切,切分64MB大小的一块一块,再封装到InputSplit
        return false;    //整个文件封装到一个InputSplit

    }
    
    @Override
    public RecordReader<Text,VideoWritable> createRecordReader(InputSplit inputsplit,TaskAttemptContext context) throws IOException, InterruptedException {
//        RecordReader<k1, v1>是返回类型,返回的RecordReader对象的封装
//        createRecordReader是方法,在这里是,VideoInputFormat.createRecordReader。VideoInputFormat是InputFormat类的实例

        
        //这里默认是系统实现的的RecordReader,按行读取,下面我们自定义这个类VideoRecordReader。

        return new VideoRecordReader();//新建一个VideoRecordReader实例,所有才有了上面RecordReader<Text,VideoWritable>,所以才如下VideoRecordReader,写我们自己的
    }
    
    
    //RecordReader中的两个参数分别填写我们期望返回的key/value类型,我们期望key为Text类型,value为VideoWritable类型
    public static class VideoRecordReader extends RecordReader<Text, VideoWritable > {//RecordReader<k1, v1>是一个整体
        public LineReader in;//行读取器
        public Text line;//每行数据类型
        public Text lineKey;//自定义key类型,即k1
        public VideoWritable lineValue;//自定义value类型,即v1
        
        @Override
        public void close() throws IOException {//关闭输入流
            if(in !=null){
                in.close();
            }
        }
        @Override
        public Text getCurrentKey() throws IOException, InterruptedException {//获取当前的key,即CurrentKey
            return lineKey;//返回类型是Text,即Text lineKey
        }
        @Override
        public VideoWritable getCurrentValue() throws IOException,InterruptedException {//获取当前的Value,即CurrentValue
            return lineValue;//返回类型是VideoWritable,即VideoWritable lineValue
        }
        @Override
        public float getProgress() throws IOException, InterruptedException {//获取进程,即Progress
            return 0;//返回类型是float,即float 0
        }
        @Override
        public void initialize(InputSplit input, TaskAttemptContext context) throws IOException, InterruptedException {//初始化,都是模板
            FileSplit split=(FileSplit)input;
            Configuration job=context.getConfiguration();
            Path file=split.getPath();
            FileSystem fs=file.getFileSystem(job);
            
            FSDataInputStream filein=fs.open(file);
            in=new LineReader(filein,job);//输入流in
            line=new Text();//每行数据类型
            lineKey=new Text();//自定义key类型,即k1。//新建一个Text实例作为自定义格式输入的key
            lineValue = new VideoWritable();//自定义value类型,即v1。//新建一个TVPlayData实例作为自定义格式输入的value
        }
        
        //此方法读取每行数据,完成自定义的key和value
        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {//这里面,才是篡改的重点
            int linesize=in.readLine(line);//line是每行数据,我们这里用到的是in.readLine(str)这个构造函数,默认读完读到文件末尾。其实这里有三种。
            
//            是SplitLineReader.readLine  ->  SplitLineReader  extends   LineReader  ->  org.apache.hadoop.util.LineReader
            
//            in.readLine(str)//这个构造方法执行时,会首先将value原来的值清空。默认读完读到文件末尾
//            in.readLine(str, maxLineLength)//只读到maxLineLength行
//            in.readLine(str, maxLineLength, maxBytesToConsume)//这个构造方法来实现不清空,前面读取的行的值

            if(linesize==0) return false;
            
            
            String[] pieces = line.toString().split("\t");//解析每行数据
                    //因为,有些视频名中含有空格,所以这里使用	来切割!!!注意
            
            if(pieces.length != 7){
                throw new IOException("Invalid record received");
            }
            //将学生的每门成绩转换为 float 类型
            Long a,b,c,d,e;
            try{
                a = Long.parseLong(pieces[2].trim());//将String类型,如pieces[2]转换成,float类型,给a
                b = Long.parseLong(pieces[3].trim());
                c = Long.parseLong(pieces[4].trim());
                d = Long.parseLong(pieces[5].trim());
                e = Long.parseLong(pieces[6].trim());
            }catch(NumberFormatException nfe){
                throw new IOException("Error parsing floating poing value in record");
            }
            lineKey.set(pieces[0]+"	"+pieces[1]);//完成自定义key数据
            lineValue.set(a, b, c, d, e);//封装自定义value数据

            
            
            
            return true;
        }        
    }
}
VideoInputFormat

第三步:这时候我们就可以写MapReduce统计程序来得出我们想要的结果啦。

package com.hadoop.mapreduce.test;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* 学生成绩统计Hadoop程序
* 数据格式参考:继承者们    1    4105447    202    844    48    671
* @author sparks.li
*/
public class VideoCount extends Configured implements Tool{
    public static class VideoMapper extends Mapper<Text,VideoWritable,Text,VideoWritable>{
        @Override
        protected void map(Text key, VideoWritable value, Context context)throws IOException, InterruptedException{
            context.write(key, value);    //写入key是k2,value是v2
//            context.write(new Text(key), new ScoreWritable(value));等价           
        }
    }
    
    public static class VideoReducer extends Reducer<Text,VideoWritable,Text,Text>{
        private Text result = new Text();
        private Text reduceKey = new Text();
        private MultipleOutputs< Text, Text> multipleOutputs;

        @Override
        protected void setup(Context context) throws IOException ,InterruptedException{
            multipleOutputs = new MultipleOutputs< Text, Text>(context);
        }
        protected void reduce(Text Key, Iterable< VideoWritable > Values, Context context)throws IOException, InterruptedException{
            Long PlayNum= new Long(0);
            Long FavoriteNum= new Long(0);
            Long CommentNum= new Long(0);
            Long DownNum= new Long(0);
            Long LikeNum= new Long(0);
            for(VideoWritable ss:Values){
                PlayNum += ss.getPlayNum();
                FavoriteNum += ss.getFavoriteNum();
                CommentNum += ss.getCommentNum();
                DownNum += ss.getDownNum();
                LikeNum += ss.getLikeNum();
            }
            result.set(PlayNum + "	" + FavoriteNum + "	" + CommentNum + "	" + DownNum
                     + "	" + LikeNum);
            
            String[] webs = {"nothing","youku","souhu","tudou","aiqiyi","xunlei"};
            
            String[] pieces = Key.toString().split("\t+");//解析 key 数据
            //line是Text类型。pieces是String[],即String数组。
            
            if(pieces.length != 2){
                throw new IOException("Invalid reduce key received");
            }
            // 设置key为视频名字
            reduceKey.set(pieces[0] + "	");
            // 设置输出路径为视频网站名
            String filename = webs[Integer.parseInt(pieces[1])];
            
            multipleOutputs.write(reduceKey, result, filename);
        }
        @Override
        protected void cleanup(Context context) throws IOException ,InterruptedException{
            multipleOutputs.close();
        }
    }
    

    public int run(String[] args) throws Exception{
        Configuration conf = new Configuration();//读取配置文件
        
        Path mypath = new Path(args[1]);
        FileSystem hdfs = mypath.getFileSystem(conf);//创建输出路径
        if (hdfs.isDirectory(mypath)) 
        {
            hdfs.delete(mypath, true);
        }
        
        Job job = new Job(conf, "Video Count");//新建任务
        job.setJarByClass(VideoCount.class);//设置主类
        
        FileInputFormat.addInputPath(job, new Path(args[0]));// 输入路径
        FileOutputFormat.setOutputPath(job, new Path(args[1]));// 输出路径
        
        job.setMapperClass(VideoMapper.class);// Mapper
        job.setReducerClass(VideoReducer.class);// Reducer
        
        job.setMapOutputKeyClass(Text.class);// Mapper key输出类型
        job.setMapOutputValueClass(VideoWritable.class);// Mapper value输出类型
                
        job.setInputFormatClass(VideoInputFormat.class);//设置自定义输入格式
        
        job.waitForCompletion(true);        
        return 0;
    }
    
    
    
    public static void main(String[] args) throws Exception{
        String[] args0 = 
                { 
                "hdfs://sparks:9000/middle/video/tvplay.txt",
                "hdfs://sparks:9000/middle/video/out" 
                };
        
        
        int ec = ToolRunner.run(new Configuration(), new VideoCount(), args0);
        System.exit(ec);
    }
}
VideoCount

第四步:万事俱备只欠数据啦,我们把本地数据上传到HDFS上,并运行程序

hadoop fs -put 数据集文件 hdfs路径

  

三、项目结果

搞定!

原文地址:https://www.cnblogs.com/LiCheng-/p/7349101.html