caogao

package com.laiwang.algo.antispam.event.job;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;


/**
* Created by gray on 14-8-3.
*/
public class GetVersion extends Configured implements Tool {

public static class GetVersionMap extends Mapper<Object, Text, Text, Text> {

@Override
public void map(Object key, Text value, Context context)
throws IOException ,InterruptedException {
List<String> requests = new ArrayList<String>();
String[] parts = value.toString().split("1",-1);
String url = parts[2] + parts[6];
if(!url.contains("/v2/"))
return;
String[] tmps = url.split("\?",-1);
if(tmps.length == 1)
return;
int len = tmps[1].length();
String temp = "";
boolean falg = true;
for(int i = 0; i < len; i++) {
if(tmps[1].charAt(i) == '=') {
requests.add(temp);
temp = "";
falg = false;
}
if(falg)
temp += tmps[1].charAt(i);
if(tmps[1].charAt(i) == '&') {
falg = true;
}
}
int state = 0;
for(int i = 0; i < requests.size(); i++) {
if(requests.get(i).equals("_s_"))
state ^= 1;
if(requests.get(i).equals("_v_"))
state ^= 2;
if(requests.get(i).equals("_c_"))
state ^= 4;
if(requests.get(i).equals("_t_"))
state ^= 8;
}
if(state == 15) {
int index = parts[16].indexOf("(");

}
}
}

public static class Reduce extends Reducer<Text, Text, Text, NullWritable> {

@Override
public void reduce(Text key,Iterable<Text> values, Context context)
throws IOException, InterruptedException {


}
}

@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
Job job = new Job();
job.setJobName("GetVersion");
job.setJarByClass(GetVersion.class);

FileInputFormat.addInputPath(job, new Path(conf.get("")));
FileOutputFormat.setOutputPath(job, new Path(conf.get("")));

job.setMapperClass(GetVersionMap.class);
job.setReducerClass(Reduce.class);

job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

job.waitForCompletion(true);

return 0;
}


public static void main(String[] args) throws Exception {
ToolRunner.run(new AggregateUidBySession(), args);
}

}

import java.io.*;
import java.util.Random;

/**
* Created by gray on 14-8-4.
*/


public class ReadFiles {

public static void main(String[] args) throws IOException {
String in = "G:\homeG\data\readfile\in.txt";
String out = "G:\homeG\data\readfile\out.txt";
BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(new File(in)), "UTF-8"));
BufferedWriter pr = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(new File(out)),"UTF-8"));
String line;
int time = 7300000;
int end = 7302400;
while((line = br.readLine()) != null) {
String[] parts = line.split(" ",-1);
int now = Integer.parseInt(parts[0]);
while (now > time) {
pr.append(time + " 0 ");
if(time % 100 == 59) {
time -= 59;
time += 100;
}
else {
time ++;
}
}
if(now == time) {
if(time % 100 == 59) {
time -= 59;
time += 100;
}
else {
time ++;
}
pr.append(line + " ");
}
}
while(end > time) {
pr.append(time + " 0 ");
if(time % 100 == 59) {
time -= 59;
time += 100;
}
else {
time ++;
}
}
br.close();
pr.close();
}
}

@Override
public void reduce(Text key,Iterable<Text> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for(Text value : values) {
sum += 1;
context.write(key,value);
}
context.write(new Text(key),new Text(String.valueOf(sum)));
}

  1 import org.apache.hadoop.conf.Configuration;
  2 import org.apache.hadoop.conf.Configured;
  3 import org.apache.hadoop.fs.FSDataInputStream;
  4 import org.apache.hadoop.fs.FileStatus;
  5 import org.apache.hadoop.fs.FileSystem;
  6 import org.apache.hadoop.fs.Path;
  7 import org.apache.hadoop.io.Text;
  8 import org.apache.hadoop.mapreduce.Job;
  9 import org.apache.hadoop.mapreduce.Mapper;
 10 import org.apache.hadoop.mapreduce.Reducer;
 11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 12 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 14 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 15 import org.apache.hadoop.util.Tool;
 16 import org.apache.hadoop.util.ToolRunner;
 17 
 18 import java.io.BufferedReader;
 19 import java.io.IOException;
 20 import java.io.InputStreamReader;
 21 import java.util.HashSet;
 22 import java.util.Set;
 23 
 24 /**
 25  * Created by gray on 14-8-5.
 26  */
 27 public class UnnormalUid  extends Configured implements Tool {
 28 
 29     public static class UnnormalUidMap extends Mapper<Object, Text, Text, Text> {
 30 
 31         private Set<String> uids = new HashSet<String>();
 32 
 33         @Override
 34         public void setup(Context context) {
 35             try {
 36                 Configuration conf = context.getConfiguration();
 37                 FileSystem fs = FileSystem.get(conf);
 38                 Path path = new Path(conf.get(""));
 39                 FileStatus[] status = fs.listStatus(path);
 40                 for (FileStatus file : status) {
 41                     Path filePath = new Path(path.toString()
 42                             + "//" + file.getPath().getName());
 43                     FSDataInputStream fin = fs.open(filePath);
 44                     BufferedReader br = new BufferedReader(
 45                             new InputStreamReader(fin, "utf-8"));
 46                     String line = br.readLine();
 47                     while (line != null) {
 48                         String[] tmps = line.split("	",-1);
 49                         uids.add(tmps[0]);
 50                         line = br.readLine();
 51                     }
 52                     br.close();
 53                     fin.close();
 54                 }
 55             } catch (IOException e) {
 56                 e.printStackTrace();
 57             }
 58         }
 59 
 60         @Override
 61         public void map(Object key, Text value, Context context)
 62                 throws IOException,InterruptedException {
 63             String[] parts = value.toString().split("1",-1);
 64             String uid = parts[26];
 65             if(!uids.contains(uid))
 66                 return;
 67             String url = parts[2] + parts[6];
 68             String httpstate = parts[8];
 69             String restate = parts[5];
 70             String time = parts[4];
 71             String refer = parts[14];
 72             String agent = parts[16];//n-5 n-4
 73             context.write(new Text(uid + "	" + httpstate),new Text(url + "	" + refer + "	" + time + "	" + restate + "	" + agent));
 74         }
 75     }
 76 
 77     public static class Reduce extends Reducer<Text, Text, Text, Text> {
 78 
 79         @Override
 80         public void reduce(Text key,Iterable<Text> values, Context context)
 81                 throws IOException, InterruptedException {
 82             for(Text value : values) {
 83                 context.write(key,value);
 84             }
 85         }
 86     }
 87 
 88     @Override
 89     public int run(String[] args) throws Exception {
 90         Configuration conf = getConf();
 91         Job job = new Job();
 92         job.setJobName("UnnormalUid");
 93         job.setJarByClass(UnnormalUid.class);
 94 
 95         FileInputFormat.addInputPath(job, new Path(conf.get("")));
 96         FileOutputFormat.setOutputPath(job, new Path(conf.get("")));
 97 
 98         job.setMapperClass(UnnormalUidMap.class);
 99         job.setReducerClass(Reduce.class);
100 
101         job.setInputFormatClass(SequenceFileInputFormat.class);
102         job.setOutputFormatClass(TextOutputFormat.class);
103 
104         job.setMapOutputKeyClass(Text.class);
105         job.setMapOutputValueClass(Text.class);
106         job.setOutputKeyClass(Text.class);
107         job.setOutputValueClass(Text.class);
108 
109         job.waitForCompletion(true);
110 
111         return 0;
112     }
113 
114 
115     public static void main(String[] args) throws Exception {
116         ToolRunner.run(new UnnormalUid(), args);
117     }
118 
119 }
View Code
  1 import org.apache.hadoop.conf.Configuration;
  2 import org.apache.hadoop.conf.Configured;
  3 import org.apache.hadoop.fs.Path;
  4 import org.apache.hadoop.io.Text;
  5 import org.apache.hadoop.mapreduce.Job;
  6 import org.apache.hadoop.mapreduce.Mapper;
  7 import org.apache.hadoop.mapreduce.Reducer;
  8 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  9 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 10 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 11 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 12 import org.apache.hadoop.util.Tool;
 13 import org.apache.hadoop.util.ToolRunner;
 14 
 15 import java.io.IOException;
 16 import java.util.ArrayList;
 17 import java.util.List;
 18 
 19 
 20 /**
 21  * Created by gray on 14-8-3.
 22  */
 23 public class GetVersion extends Configured implements Tool {
 24 
 25     public static class GetVersionMap extends Mapper<Object, Text, Text, Text> {
 26 
 27         @Override
 28         public void map(Object key, Text value, Context context)
 29                 throws IOException ,InterruptedException {
 30             List<String> requests = new ArrayList<String>();
 31             String[] parts = value.toString().split("1",-1);
 32             String url = parts[2] + parts[6];
 33             String httpstate = parts[4];
 34             String uid = parts[26];
 35             if(!url.contains("/v2/") || httpstate.equals("POST")) {
 36                 //can't do with post
 37                 return;
 38             }
 39             String[] tmps = url.split("\?",-1);
 40             if(tmps.length == 1) {
 41                 context.write(new Text(uid), new Text(url));
 42                 return;
 43             }
 44             int len = tmps[1].length();
 45             String temp = "";
 46             boolean falg = true;
 47             for(int i = 0; i < len; i++) {
 48                 if(tmps[1].charAt(i) == '=') {
 49                     requests.add(temp);
 50                     temp = "";
 51                     falg = false;
 52                 }
 53                 if(falg)
 54                     temp += tmps[1].charAt(i);
 55                 if(tmps[1].charAt(i) == '&') {
 56                     falg = true;
 57                 }
 58             }
 59             int state = 0;
 60             for(int i = 0; i < requests.size(); i++) {
 61                 if(requests.get(i).equals("_s_"))
 62                     state ^= 1;
 63                 if(requests.get(i).equals("_v_"))
 64                     state ^= 2;
 65                 if(requests.get(i).equals("_c_"))
 66                     state ^= 4;
 67                 if(requests.get(i).equals("_t_"))
 68                     state ^= 8;
 69             }
 70             if((state & 8) != 0) {
 71                 context.write(new Text(uid), new Text(tmps[0] + requests.toString()));
 72             } else if(state < 7) {
 73                 context.write(new Text(uid), new Text(tmps[0] + requests.toString()));
 74             }
 75         }
 76     }
 77 
 78     public static class Reduce extends Reducer<Text, Text, Text, Text> {
 79 
 80         @Override
 81         public void reduce(Text key,Iterable<Text> values, Context context)
 82                 throws IOException, InterruptedException {
 83             String out = "";
 84             for(Text value : values) {
 85                 //context.write(key,value);
 86                 out += value.toString();
 87             }
 88             context.write(key,new Text(out));
 89         }
 90     }
 91 
 92     @Override
 93     public int run(String[] args) throws Exception {
 94         Configuration conf = getConf();
 95         Job job = new Job();
 96         job.setJobName("GetVersion");
 97         job.setJarByClass(GetVersion.class);
 98 
 99         FileInputFormat.addInputPath(job, new Path(conf.get("")));
100         FileOutputFormat.setOutputPath(job, new Path(conf.get("")));
101 
102         job.setMapperClass(GetVersionMap.class);
103         job.setReducerClass(Reduce.class);
104 
105         job.setInputFormatClass(SequenceFileInputFormat.class);
106         job.setOutputFormatClass(TextOutputFormat.class);
107 
108         job.setMapOutputKeyClass(Text.class);
109         job.setMapOutputValueClass(Text.class);
110         job.setOutputKeyClass(Text.class);
111         job.setOutputValueClass(Text.class);
112 
113         job.waitForCompletion(true);
114 
115         return 0;
116     }
117 
118 
119     public static void main(String[] args) throws Exception {
120         ToolRunner.run(new GetVersion(), args);
121     }
122 
123 }
View Code
原文地址:https://www.cnblogs.com/gray035/p/3889091.html