很多视频网站都有电视剧热度排名,一般是依据用户在自己站的行为数据所体现出的受欢迎程度来排名。这里有一份来自优酷、爱奇艺、搜索视频等五大视频网站的一份视频播放数据,我们利用这份数据做些有意义的事情。
金婚第一部 3 9851 0 0 0 0
金婚第一部 3 9851 0 0 0 0
金婚第一部 3 9851 0 0 0 0
金婚第一部 3 9851 0 0 0 0
金婚第一部 3 9851 0 0 0 0
金子,轻松出来吧 1 4715 0 5 0 0
金子,轻松出来吧 1 2685 0 3 0 0
金枝欲孽 1 52307 9 14 0 4
金枝欲孽 3 8174 0 0 0 0
金枝欲孽 1 50709 3 28 0 6
金枝欲孽 3 8710 0 0 0 4
金枝欲孽 1 55621 0 10 0 2
注意:1-5数字和5大视频的关系:1优酷2搜狐3土豆4爱奇艺5迅雷看看
一、项目需求
自定义输入格式 完成统计任务 输出多个文件
输入数据:5个网站的 每天电视剧的 播放量 收藏数 评论数 踩数 赞数
输出数据:按网站类别 统计每个电视剧的每个指标的总量
任务目标:自定义输入格式 完成统计任务 输出多个文件
二、项目实现
第一步:既然要输出电视剧的一系列数据,那我们就需要定义一个电视剧热度数据的类,直接上代码。
package com.hadoop.mapreduce.test; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; /** * 电视剧数据读写类 * 数据格式参考:继承者们 1 4105447 202 844 48 671 * @author Sparks.Li * 需要自定义一个 VideoWritable 类实现 WritableComparable 接口,将每天电视剧的 播放量 收藏数 评论数 踩数 赞数 封装起来。 */ //注意: Hadoop通过Writable接口实现的序列化机制,不过没有提供比较功能,所以和java中的Comparable接口合并,提供一个接口WritableComparable。(自定义比较) //Writable接口提供两个方法(write和readFields)。 public class VideoWritable implements WritableComparable< Object > { private Long PlayNum; private Long FavoriteNum; private Long CommentNum; private Long DownNum; private Long LikeNum; // 问:这里我们自己编程时,是一定要创建一个带有参的构造方法,为什么还要显式的写出来一个带无参的构造方法呢? // 答:构造器其实就是构造对象实例的方法,无参数的构造方法是默认的,但是如果你创造了一个带有参数的构造方法,那么无参的构造方法必须显式的写出来,否则会编译失败。 public VideoWritable(){} public VideoWritable(Long PlayNum,Long FavoriteNum,Long CommentNum,Long DownNum,Long LikeNum){//java����вι��캯��,�������ڴ�������ʱ��ʼ������ this.PlayNum = PlayNum; this.FavoriteNum = FavoriteNum; this.CommentNum = CommentNum; this.DownNum = DownNum; this.LikeNum = LikeNum; } public void set(Long PlayNum,Long FavoriteNum,Long CommentNum,Long DownNum,Long LikeNum){ this.PlayNum = PlayNum; this.FavoriteNum = FavoriteNum; this.CommentNum = CommentNum; this.DownNum = DownNum; this.LikeNum = LikeNum; } public Long getPlayNum() { return PlayNum; } public void setPlayNum(Long PlayNum){ this.PlayNum = PlayNum; } public Long getFavoriteNum() { return FavoriteNum; } public void setFavoriteNum(Long FavoriteNum){ this.FavoriteNum = FavoriteNum; } public Long getCommentNum() { return CommentNum; } public void setCommentNum(Long CommentNum){ this.CommentNum = CommentNum; } public Long getDownNum() { return DownNum; } public void setDownNum(Long DownNum){ this.DownNum = DownNum; } public Long getLikeNum() { return LikeNum; } public void setLikeNum(Long LikeNum){ this.LikeNum = LikeNum; } // 实现WritableComparable的readFields()方法 // 对象不能传输的,需要转化成字节流! // 将对象转换为字节流并写入到输出流out中是序列化,write 的过程 // 从输入流in中读取字节流反序列化为对象 是反序列化,readFields的过程 public void readFields(DataInput in) throws IOException { PlayNum = in.readLong(); FavoriteNum = in.readLong(); CommentNum = in.readLong(); DownNum = in.readLong(); LikeNum = in.readLong(); // in.readByte() // in.readChar() // in.readDouble() // in.readLine() // in.readFloat() // in.readLong() // in.readShort() } public void write(DataOutput out) throws IOException { out.writeLong(PlayNum); out.writeLong(FavoriteNum); out.writeLong(CommentNum); out.writeLong(DownNum); out.writeLong(LikeNum); // out.writeByte() // out.writeChar() // out.writeDouble() // out.writeFloat() // out.writeLong() // out.writeShort() // out.writeUTF() } public int compareTo(Object o) { return 0; } }
第二步:通过查看数据集可知,hadoop自带的数据输入格式已经不满足我们的需求了,这时候我们就需要来实现自己的InputFormat类啦。
package com.hadoop.mapreduce.test; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.util.LineReader; /** * 自定义电视剧数据读写类 * 数据格式参考:继承者们 1 4105447 202 844 48 671 * @author Sparks.Li * 需要自定义一个 VideoWritable 类实现 WritableComparable 接口,将每天电视剧的 播放量 收藏数 评论数 踩数 赞数 封装起来。 */ //自定义输入格式 VideoInputFormat 类,首先继承 FileInputFormat,然后分别重写 isSplitable() 方法和 createRecordReader() 方法。 public class VideoInputFormat extends FileInputFormat<Text,VideoWritable > {//自定义数据输入格式,其实这都是模仿源码的!可以去看 // 线路是: boolean isSplitable() -> RecordReader<Text,VideoWritable> createRecordReader() -> VideoRecordReader extends RecordReader<Text, VideoWritable > @Override protected boolean isSplitable(JobContext context, Path filename) {//这是InputFormat的isSplitable方法 // isSplitable(),如果是压缩文件就不切分,整个文件封装到一个InputSplit // isSplitable(),如果是非压缩文件就切,切分64MB大小的一块一块,再封装到InputSplit return false; //整个文件封装到一个InputSplit } @Override public RecordReader<Text,VideoWritable> createRecordReader(InputSplit inputsplit,TaskAttemptContext context) throws IOException, InterruptedException { // RecordReader<k1, v1>是返回类型,返回的RecordReader对象的封装 // createRecordReader是方法,在这里是,VideoInputFormat.createRecordReader。VideoInputFormat是InputFormat类的实例 //这里默认是系统实现的的RecordReader,按行读取,下面我们自定义这个类VideoRecordReader。 return new VideoRecordReader();//新建一个VideoRecordReader实例,所有才有了上面RecordReader<Text,VideoWritable>,所以才如下VideoRecordReader,写我们自己的 } //RecordReader中的两个参数分别填写我们期望返回的key/value类型,我们期望key为Text类型,value为VideoWritable类型 public static class VideoRecordReader extends RecordReader<Text, VideoWritable > {//RecordReader<k1, v1>是一个整体 public LineReader in;//行读取器 public Text line;//每行数据类型 public Text lineKey;//自定义key类型,即k1 public VideoWritable lineValue;//自定义value类型,即v1 @Override public void close() throws IOException {//关闭输入流 if(in !=null){ in.close(); } } @Override public Text getCurrentKey() throws IOException, InterruptedException {//获取当前的key,即CurrentKey return lineKey;//返回类型是Text,即Text lineKey } @Override public VideoWritable getCurrentValue() throws IOException,InterruptedException {//获取当前的Value,即CurrentValue return lineValue;//返回类型是VideoWritable,即VideoWritable lineValue } @Override public float getProgress() throws IOException, InterruptedException {//获取进程,即Progress return 0;//返回类型是float,即float 0 } @Override public void initialize(InputSplit input, TaskAttemptContext context) throws IOException, InterruptedException {//初始化,都是模板 FileSplit split=(FileSplit)input; Configuration job=context.getConfiguration(); Path file=split.getPath(); FileSystem fs=file.getFileSystem(job); FSDataInputStream filein=fs.open(file); in=new LineReader(filein,job);//输入流in line=new Text();//每行数据类型 lineKey=new Text();//自定义key类型,即k1。//新建一个Text实例作为自定义格式输入的key lineValue = new VideoWritable();//自定义value类型,即v1。//新建一个TVPlayData实例作为自定义格式输入的value } //此方法读取每行数据,完成自定义的key和value @Override public boolean nextKeyValue() throws IOException, InterruptedException {//这里面,才是篡改的重点 int linesize=in.readLine(line);//line是每行数据,我们这里用到的是in.readLine(str)这个构造函数,默认读完读到文件末尾。其实这里有三种。 // 是SplitLineReader.readLine -> SplitLineReader extends LineReader -> org.apache.hadoop.util.LineReader // in.readLine(str)//这个构造方法执行时,会首先将value原来的值清空。默认读完读到文件末尾 // in.readLine(str, maxLineLength)//只读到maxLineLength行 // in.readLine(str, maxLineLength, maxBytesToConsume)//这个构造方法来实现不清空,前面读取的行的值 if(linesize==0) return false; String[] pieces = line.toString().split("\t");//解析每行数据 //因为,有些视频名中含有空格,所以这里使用 来切割!!!注意 if(pieces.length != 7){ throw new IOException("Invalid record received"); } //将学生的每门成绩转换为 float 类型 Long a,b,c,d,e; try{ a = Long.parseLong(pieces[2].trim());//将String类型,如pieces[2]转换成,float类型,给a b = Long.parseLong(pieces[3].trim()); c = Long.parseLong(pieces[4].trim()); d = Long.parseLong(pieces[5].trim()); e = Long.parseLong(pieces[6].trim()); }catch(NumberFormatException nfe){ throw new IOException("Error parsing floating poing value in record"); } lineKey.set(pieces[0]+" "+pieces[1]);//完成自定义key数据 lineValue.set(a, b, c, d, e);//封装自定义value数据 return true; } } }
第三步:这时候我们就可以写MapReduce统计程序来得出我们想要的结果啦。
package com.hadoop.mapreduce.test; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * 学生成绩统计Hadoop程序 * 数据格式参考:继承者们 1 4105447 202 844 48 671 * @author sparks.li */ public class VideoCount extends Configured implements Tool{ public static class VideoMapper extends Mapper<Text,VideoWritable,Text,VideoWritable>{ @Override protected void map(Text key, VideoWritable value, Context context)throws IOException, InterruptedException{ context.write(key, value); //写入key是k2,value是v2 // context.write(new Text(key), new ScoreWritable(value));等价 } } public static class VideoReducer extends Reducer<Text,VideoWritable,Text,Text>{ private Text result = new Text(); private Text reduceKey = new Text(); private MultipleOutputs< Text, Text> multipleOutputs; @Override protected void setup(Context context) throws IOException ,InterruptedException{ multipleOutputs = new MultipleOutputs< Text, Text>(context); } protected void reduce(Text Key, Iterable< VideoWritable > Values, Context context)throws IOException, InterruptedException{ Long PlayNum= new Long(0); Long FavoriteNum= new Long(0); Long CommentNum= new Long(0); Long DownNum= new Long(0); Long LikeNum= new Long(0); for(VideoWritable ss:Values){ PlayNum += ss.getPlayNum(); FavoriteNum += ss.getFavoriteNum(); CommentNum += ss.getCommentNum(); DownNum += ss.getDownNum(); LikeNum += ss.getLikeNum(); } result.set(PlayNum + " " + FavoriteNum + " " + CommentNum + " " + DownNum + " " + LikeNum); String[] webs = {"nothing","youku","souhu","tudou","aiqiyi","xunlei"}; String[] pieces = Key.toString().split("\t+");//解析 key 数据 //line是Text类型。pieces是String[],即String数组。 if(pieces.length != 2){ throw new IOException("Invalid reduce key received"); } // 设置key为视频名字 reduceKey.set(pieces[0] + " "); // 设置输出路径为视频网站名 String filename = webs[Integer.parseInt(pieces[1])]; multipleOutputs.write(reduceKey, result, filename); } @Override protected void cleanup(Context context) throws IOException ,InterruptedException{ multipleOutputs.close(); } } public int run(String[] args) throws Exception{ Configuration conf = new Configuration();//读取配置文件 Path mypath = new Path(args[1]); FileSystem hdfs = mypath.getFileSystem(conf);//创建输出路径 if (hdfs.isDirectory(mypath)) { hdfs.delete(mypath, true); } Job job = new Job(conf, "Video Count");//新建任务 job.setJarByClass(VideoCount.class);//设置主类 FileInputFormat.addInputPath(job, new Path(args[0]));// 输入路径 FileOutputFormat.setOutputPath(job, new Path(args[1]));// 输出路径 job.setMapperClass(VideoMapper.class);// Mapper job.setReducerClass(VideoReducer.class);// Reducer job.setMapOutputKeyClass(Text.class);// Mapper key输出类型 job.setMapOutputValueClass(VideoWritable.class);// Mapper value输出类型 job.setInputFormatClass(VideoInputFormat.class);//设置自定义输入格式 job.waitForCompletion(true); return 0; } public static void main(String[] args) throws Exception{ String[] args0 = { "hdfs://sparks:9000/middle/video/tvplay.txt", "hdfs://sparks:9000/middle/video/out" }; int ec = ToolRunner.run(new Configuration(), new VideoCount(), args0); System.exit(ec); } }
第四步:万事俱备只欠数据啦,我们把本地数据上传到HDFS上,并运行程序
hadoop fs -put 数据集文件 hdfs路径
三、项目结果
搞定!