课堂测试—数据清洗

Result文件数据说明:

Ip106.39.41.166,(城市)

Date10/Nov/2016:00:01:02 +0800,(日期)

Day10,(天数)

Traffic: 54 ,(流量)

Type: video,(类型:视频video或文章article

Id: 8701(视频或者文章的id

测试要求:

1、 数据清洗:按照进行数据清洗,并将清洗后的数据导入hive数据库中

两阶段数据清洗:

1)第一阶段:把需要的信息从原始日志中提取出来

ip:    199.30.25.88

time:  10/Nov/2016:00:01:03 +0800

traffic:  62

文章: article/11325

视频: video/3235

2)第二阶段:根据提取出来的信息做精细化操作

ip--->城市 cityIP

date--> time:2016-11-10 00:01:03

day: 10

traffic:62

type:article/video

id:11325

3hive数据库表结构:

create table data(  ip string,  time string , day string, traffic bigint,

type string, id   string )

2、数据处理:

·统计最受欢迎的视频/文章的Top10访问次数 (video/article

·按照地市统计最受欢迎的Top10课程 (ip

·按照流量统计最受欢迎的Top10课程 (traffic

3、数据可视化:将统计结果倒入MySql数据库中,通过图形化展示的方式展现出来。

目前只完成了第一步:数据的清洗,格式转换。

package mapreduce;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class Filter {
	
	public static class Map extends Mapper<Object, Text, Text, NullWritable> {
		private static Text newKey = new Text();

		/*public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
			String line = value.toString();
			System.out.println(line);
			String arr[] = line.split(" ");
			newKey.set(arr[1]);
			context.write(newKey, NullWritable.get());
			System.out.println(newKey);
		}
	}*/
	public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
		String S1 = value.toString	();
		 LogParser parser = new LogParser();
		    final String[] array = parser.parse(S1);
		System.out.println(S1);
		/*System.out.format(
	            "解析结果:  ip=%s, time=%s,day=%s, traffic=%s, type=%s,id=%s",
	            array[0], array[1], array[2], array[3], array[4],array[5]);*/
		String a=array[0];
		String u=array[1];
		String c=array[2];
		String d=array[3];
		String e=array[4];
		String f=array[5];
		
		String str = a +" "+u +" "+c+" "+d+" "+e+" "+f;
		
		newKey.set(str);
		context.write(newKey, NullWritable.get());
		System.out.println(newKey);
	}
}

	public static class Reduce extends Reducer<Text, NullWritable, Text, NullWritable> {
		public void reduce(Text key, Iterable<NullWritable> values, Context context)
				throws IOException, InterruptedException {
			context.write(key, NullWritable.get());
		}
	}

	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		Configuration conf = new Configuration();
		System.out.println("start");
		
	
		Job job = new Job(conf, "filter");
		job.setJarByClass(Filter.class);
		job.setMapperClass(Map.class);
		job.setReducerClass(Reduce.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		Path in = new Path("hdfs://localhost:9000/test/in/result");
		Path out = new Path("hdfs://localhost:9000/test/out");
		FileInputFormat.addInputPath(job, in);
		FileOutputFormat.setOutputPath(job, out);
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
	static class MyMapper extends
    Mapper<LongWritable, Text, LongWritable, Text> {
LogParser logParser = new LogParser();
Text outputValue = new Text();

protected void map(
        LongWritable key,
        Text value,
        org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, LongWritable, Text>.Context context)
        throws java.io.IOException, InterruptedException {
    final String[] parsed = logParser.parse(value.toString());

    // step1.过滤掉静态资源访问请求
    if (parsed[2].startsWith("GET /static/")
            || parsed[2].startsWith("GET /uc_server")) {
        return;
    }
    // step2.过滤掉开头的指定字符串
    if (parsed[2].startsWith("GET /")) {
        parsed[2] = parsed[2].substring("GET /".length());
    } else if (parsed[2].startsWith("POST /")) {
        parsed[2] = parsed[2].substring("POST /".length());
    }
    // step3.过滤掉结尾的特定字符串
    if (parsed[2].endsWith(" HTTP/1.1")) {
        parsed[2] = parsed[2].substring(0, parsed[2].length()
                - " HTTP/1.1".length());
    }
    // step4.只写入前三个记录类型项
    outputValue.set(parsed[0] + "	" + parsed[1] + "	" + parsed[2]);
    context.write(key, outputValue);
}
}

static class MyReducer extends
    Reducer<LongWritable, Text, Text, NullWritable> {
protected void reduce(
        LongWritable k2,
        java.lang.Iterable<Text> v2s,
        org.apache.hadoop.mapreduce.Reducer<LongWritable, Text, Text, NullWritable>.Context context)
        throws java.io.IOException, InterruptedException {
    for (Text v2 : v2s) {
        context.write(v2, NullWritable.get());
    }
};
}

/*
* 日志解析类
*/
static class  LogParser {
public static final SimpleDateFormat FORMAT = new SimpleDateFormat(
        "d/MMM/yyyy:HH:mm:ss", Locale.ENGLISH);
public static final SimpleDateFormat dateformat1 = new SimpleDateFormat(
        "yyyy-MM-dd HH:mm:ss");



/**
 * 解析英文时间字符串
 * 
 * @param string
 * @return
 * @throws ParseException
 */
private Date parseDateFormat(String string) {
    Date parse = null;
    try {
        parse = FORMAT.parse(string);
    } catch (ParseException e) {
        e.printStackTrace();
    }
    return parse;
}

/**
 * 解析日志的行记录
 * 
 * @param line
 * @return 数组含有5个元素,分别是ip、时间、日期、状态、流量
 */
public String[] parse(String line) {
    String ip = parseIP(line);
    String time = parseTime(line);
    String day = parseday(line);
    String traffic = parseTraffic(line);
    String  type = parsertype(line);
    String  id = parseid( line);


    return new String[] { ip, time, day,traffic , type, id };
}
private String parseIP(String line) {
    String ip = line.split(",")[0].trim();
    return ip;
}

private String parseTime(String line) {
    final int first = line.indexOf(",");
    final int last = line.indexOf(" +0800,");
    String time = line.substring(first + 1, last).trim();
    Date date = parseDateFormat(time);
    return dateformat1.format(date);
}

private String parseday(String line) {
    String riqi = line.split(",")[2].trim();
    return riqi;
}
private String parseTraffic(String line) {
    String riqi = line.split(",")[3].trim();
    return riqi;
}
//private String parseTraffic(String line) {
   // final String trim = line.substring(line.lastIndexOf(",") + 1)
      //      .trim();
    //String traffic = trim.split(" ")[0];
    //return traffic;
//}

//private String parsertype(String line) {
 //   final int first = line.indexOf(",");
   // final int last = line.lastIndexOf(",");
  //  String url = line.substring(first + 1, last);
  //  return url;
//}
private String parsertype(String line) {
    String riqi = line.split(",")[4].trim();
    return riqi;
}

private String parseid(String line) {
    final String trim = line.substring(line.lastIndexOf(",") + 1)
            .trim();
    String id = trim.split(" ")[0];
    return id;
}
}
}

  

 建表语句:create table data(ip string,time string,day string,traffic bigint,type string, id string) ;

导入数据:load data inpath '/test/out/part-r-00000' overwrite into table data;

原文地址:https://www.cnblogs.com/ljm-zsy/p/11853773.html