2020112401

实在是没想到能通过控制变量找到MapReduce实验的问题所在:300多MB的文件生成的LinkedHashMap对于Map阶段来说还是太大了。

分别拿两个缩减了数据的表文件和完整的表文件后发现的。也在正式运行之后体会到了MapReduce在大量数据的处理上的优势:对于1GB的数据文件,使用MySQL之类的关系型数据库,光是导入就可能需要数小时,而两表的合并对于MapReduce只需要一至数分钟就能完成。

当然使用关系型数据库的SQL语句进行计算也有自己的优势:无需编程的同时又支持各种逻辑判断,一行语句就能完成非常复杂的功能(当然,一般不建议这么做),而且在数据量不大时能快速获得结果。而MapReduce在开始Map之前的准备都要不少时间,对于比较复杂的操作需要分很多次的MapReduce。比如这次的实验第一步的合并计算的步骤,采用MapReduce就分为了第一次MapReduce将两个文件合并(key为发票ID),第二次的MapReduce将同种商品的进销进行合并(key为商品名称+规格+单位)。而使用SQL只用JOIN并SUM之后GROUP BY一下,一句语句就能搞定,求合并排序也是如此。

不过个人本次实验采用的纯MapReduce,因而对最后的判断无能为力,只能通过最原始的进出货数量进行比对,保留单位也是为了方便具体比对,但是如何将其数值化有些无能为力。因而最后只是普通的数值求合然后排序了一遍。

如果要很好地进行分析,需要借助第三张表生成决策树/训练神经网络来对结果进行分析。

合并两表:

package konoha.pkg.pkg01;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Vector;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

class DealInfo {
    public String buyerId = null;
    public String sellerId = null;

    public DealInfo(String buyer, String seller) {
        buyerId = buyer;
        sellerId = seller;
    }
}

public class MapRedStep1 {

    public static class Type0Mapper extends Mapper<Object, Text, Text, Text> {

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String filePath = ((FileSplit) context.getInputSplit()).getPath().toString();
            String line = value.toString();
            line = line.substring(1, line.length() - 1);
            // System.out.println("[DEBUG]Processing Line:" + line);
            String str[] = line.split(",");
            if (filePath.contains("hwmx")) {
                if (str.length == 10) {
                    if (!str[9].equals("Y"))
                        context.write(new Text(str[0]),
                                new Text("nya" + str[2] + "	" + str[3] + "	" + str[4] + "," + str[5]));
                }
            } else {
                if (str.length == 9) {
                    context.write(new Text(str[0]), new Text("kon" + str[1] + "	" + str[2]));
                }

            }

        }
    }

    public static class Type0Reducer extends Reducer<Text, Text, Text, Text> {
        @Override
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            String buyer = null;
            String seller = null;
            Vector<String> nya = new Vector<String>();
            String tmp = null;
            String str[] = null;
            for (Text val : values) {
                tmp = val.toString();
                if (tmp.startsWith("kon")) {
                    str = tmp.substring(3).split("	");
                    buyer = str[0];
                    seller = str[1];
                } else if (tmp.startsWith("nya")) {
                    nya.add(tmp.substring(3));
                }
            }
            if (buyer != null && seller != null) {
                for (int j = 0; j < nya.size(); j++) {
                    str = nya.get(j).split(",");
                    if (!str[1].equals("null")) {
                        context.write(new Text(seller), new Text(str[0] + "	" + (-Double.parseDouble(str[1]))));
                        context.write(new Text(buyer), new Text(str[0] + "	" + Double.parseDouble(str[1])));
                    }
                }
            }
        }
    }

    public static void main(String[] args)
            throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
        Configuration conf = new Configuration();
        System.out.println("[INFO]Starting Job...");
        Job job = new Job(conf, "MyAverageStep1");
        System.out.print("[INFO]Setting Jar By Class...");
        job.setJarByClass(MapRedStep1.class);
        System.out.println("[DONE]");
        System.out.print("[INFO]Setting Mapper Class...");
        job.setMapperClass(Type0Mapper.class);
        System.out.println("[DONE]");
        System.out.print("[INFO]Setting Reducer Class...");
        job.setReducerClass(Type0Reducer.class);
        System.out.println("[DONE]");
        System.out.print("[INFO]Setting Output Key Class...");
        job.setOutputKeyClass(Text.class);
        System.out.println("[DONE]");
        System.out.print("[INFO]Setting Output Value Class...");
        job.setOutputValueClass(Text.class);
        System.out.println("[DONE]");
        System.out.print("[INFO]Setting Input Format Class...");
        job.setInputFormatClass(TextInputFormat.class);
        System.out.println("[DONE]");
        System.out.print("[INFO]Setting OutputFormatClass...");
        job.setOutputFormatClass(TextOutputFormat.class);
        System.out.println("[DONE]");
        System.out.print("[INFO]Creating Path Varable(s)...");
        Path left = new Path("hdfs://localhost:9000/user/test/zzsfp");
        Path right = new Path("hdfs://localhost:9000/user/test/zzsfp_hwmx");
        Path out = new Path("hdfs://localhost:9000/user/test/temp");
        System.out.println("[DONE]");
        System.out.print("[INFO]Adding Left Input Path...");
        FileInputFormat.addInputPath(job, left);
        System.out.println("[DONE]");
        System.out.print("[INFO]Adding Right Input Path...");
        FileInputFormat.addInputPath(job, right);
        System.out.println("[DONE]");
        System.out.print("[INFO]Setting Output Path...");
        FileOutputFormat.setOutputPath(job, out);
        System.out.println("[DONE]");
        System.out.println("[INFO]Process Now Running...");
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
MapRedStep1.java

分项求和:

package konoha.pkg.pkg01;

import java.io.IOException;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class MapRedStep2 {
    public static class Type0Mapper extends Mapper<Object, Text, Text, DoubleWritable> {

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String str[] = line.split("	");
            if (str.length == 5) {
                context.write(new Text(str[0] + "	" + str[1] + "	" + str[2] + "	" + str[3]),
                        new DoubleWritable(Double.parseDouble(str[4])));
            }

        }
    }

    public static class Type0Reducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
        @Override
        public void reduce(Text key, Iterable<DoubleWritable> values, Context context)
                throws IOException, InterruptedException {
            double sum = 0.0;
            for (DoubleWritable val : values) {
                sum = sum + val.get();
            }
            context.write(new Text(key), new DoubleWritable(sum));
        }
    }

    public static void main(String[] args)
            throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
        Configuration conf = new Configuration();
        System.out.println("[INFO]Starting Job...");
        Job job = new Job(conf, "MyAverageStep2");
        System.out.print("[INFO]Setting Jar By Class...");
        job.setJarByClass(MapRedStep2.class);
        System.out.println("[DONE]");
        System.out.print("[INFO]Setting Mapper Class...");
        job.setMapperClass(Type0Mapper.class);
        System.out.println("[DONE]");
        System.out.print("[INFO]Setting Reducer Class...");
        job.setReducerClass(Type0Reducer.class);
        System.out.println("[DONE]");
        System.out.print("[INFO]Setting Output Key Class...");
        job.setOutputKeyClass(Text.class);
        System.out.println("[DONE]");
        System.out.print("[INFO]Setting Output Value Class...");
        job.setOutputValueClass(DoubleWritable.class);
        System.out.println("[DONE]");
        System.out.print("[INFO]Setting Input Format Class...");
        job.setInputFormatClass(TextInputFormat.class);
        System.out.println("[DONE]");
        System.out.print("[INFO]Setting OutputFormatClass...");
        job.setOutputFormatClass(TextOutputFormat.class);
        System.out.println("[DONE]");
        System.out.print("[INFO]Creating Path Varable(s)...");
        Path in = new Path("hdfs://localhost:9000/user/test/temp/part-r-00000");
        Path out = new Path("hdfs://localhost:9000/user/test/result");
        System.out.println("[DONE]");
        System.out.print("[INFO]Adding Left Input Path...");
        FileInputFormat.addInputPath(job, in);
        System.out.println("[DONE]");
        System.out.print("[INFO]Setting Output Path...");
        FileOutputFormat.setOutputPath(job, out);
        System.out.println("[DONE]");
        System.out.println("[INFO]Process Now Running...");
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
MapRedStep2.java

单纯的进货出货差的统计并按差值排序的MapReduce(共分3步,差值的绝对值求合(最大数据值不采用方差也极大),按最大数据值排序(由于MapReduce是递增排序,故先全部转为负数,再在输出时转回正数),合并公司表和输出结果):

 1 package konoha.pkg.pkg01;
 2 
 3 import java.io.IOException;
 4 import java.net.URISyntaxException;
 5 
 6 import org.apache.hadoop.conf.Configuration;
 7 import org.apache.hadoop.fs.Path;
 8 import org.apache.hadoop.io.DoubleWritable;
 9 import org.apache.hadoop.io.Text;
10 import org.apache.hadoop.mapreduce.Job;
11 import org.apache.hadoop.mapreduce.Mapper;
12 import org.apache.hadoop.mapreduce.Reducer;
13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
14 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
16 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
17 
18 public class MapRedExtraStep1_1 {
19     public static class Type0Mapper extends Mapper<Object, Text, Text, DoubleWritable> {
20 
21         public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
22             String line = value.toString();
23             String str[] = line.split("	");
24             if (str.length == 5) {
25                 context.write(new Text(str[0]), new DoubleWritable(Double.parseDouble(str[4])));
26             }
27         }
28     }
29 
30     public static class Type0Reducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
31         @Override
32         public void reduce(Text key, Iterable<DoubleWritable> values, Context context)
33                 throws IOException, InterruptedException {
34             double sum = 0.0;
35             for (DoubleWritable val : values) {
36                 sum = sum + Math.abs(val.get());
37             }
38             context.write(new Text(key), new DoubleWritable(sum));
39         }
40     }
41 
42     public static void main(String[] args)
43             throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
44         Configuration conf = new Configuration();
45         System.out.println("[INFO]Starting Job...");
46         Job job = new Job(conf, "MyAverageExtraStep1_1");
47         System.out.print("[INFO]Setting Jar By Class...");
48         job.setJarByClass(MapRedExtraStep1_1.class);
49         System.out.println("[DONE]");
50         System.out.print("[INFO]Setting Mapper Class...");
51         job.setMapperClass(Type0Mapper.class);
52         System.out.println("[DONE]");
53         System.out.print("[INFO]Setting Reducer Class...");
54         job.setReducerClass(Type0Reducer.class);
55         System.out.println("[DONE]");
56         System.out.print("[INFO]Setting Output Key Class...");
57         job.setOutputKeyClass(Text.class);
58         System.out.println("[DONE]");
59         System.out.print("[INFO]Setting Output Value Class...");
60         job.setOutputValueClass(DoubleWritable.class);
61         System.out.println("[DONE]");
62         System.out.print("[INFO]Setting Input Format Class...");
63         job.setInputFormatClass(TextInputFormat.class);
64         System.out.println("[DONE]");
65         System.out.print("[INFO]Setting OutputFormatClass...");
66         job.setOutputFormatClass(TextOutputFormat.class);
67         System.out.println("[DONE]");
68         System.out.print("[INFO]Creating Path Varable(s)...");
69         Path in = new Path("hdfs://localhost:9000/user/test/result/part-r-00000");
70         Path out = new Path("hdfs://localhost:9000/user/test/none_output");
71         System.out.println("[DONE]");
72         System.out.print("[INFO]Adding Left Input Path...");
73         FileInputFormat.addInputPath(job, in);
74         System.out.println("[DONE]");
75         System.out.print("[INFO]Setting Output Path...");
76         FileOutputFormat.setOutputPath(job, out);
77         System.out.println("[DONE]");
78         System.out.println("[INFO]Process Now Running...");
79         System.exit(job.waitForCompletion(true) ? 0 : 1);
80     }
81 }
MapRedExtraStep1_1
 1 package konoha.pkg.pkg01;
 2 
 3 import java.io.IOException;
 4 import java.net.URISyntaxException;
 5 
 6 import org.apache.hadoop.conf.Configuration;
 7 import org.apache.hadoop.fs.Path;
 8 import org.apache.hadoop.io.DoubleWritable;
 9 import org.apache.hadoop.io.Text;
10 import org.apache.hadoop.mapreduce.Job;
11 import org.apache.hadoop.mapreduce.Mapper;
12 import org.apache.hadoop.mapreduce.Reducer;
13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
14 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
16 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
17 
18 public class MapRedExtraStep1_2 {
19     public static class Type0Mapper extends Mapper<Object, Text, DoubleWritable, Text> {
20 
21         public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
22             String line = value.toString();
23             String str[] = line.split("	");
24             if (str.length == 2) {
25                 context.write(new DoubleWritable(-Double.parseDouble(str[1])), new Text(str[0]));
26             }
27         }
28     }
29 
30     public static class Type0Reducer extends Reducer<DoubleWritable, Text, Text, DoubleWritable> {
31         @Override
32         public void reduce(DoubleWritable key, Iterable<Text> values, Context context)
33                 throws IOException, InterruptedException {
34             for (Text val : values) {
35                 context.write(val, new DoubleWritable(-key.get()));
36             }
37         }
38     }
39 
40     public static void main(String[] args)
41             throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
42         Configuration conf = new Configuration();
43         System.out.println("[INFO]Starting Job...");
44         Job job = new Job(conf, "MyAverageExtraStep1_1");
45         System.out.print("[INFO]Setting Jar By Class...");
46         job.setJarByClass(MapRedExtraStep1_2.class);
47         System.out.println("[DONE]");
48         System.out.print("[INFO]Setting Mapper Class...");
49         job.setMapperClass(Type0Mapper.class);
50         System.out.println("[DONE]");
51         System.out.print("[INFO]Setting Reducer Class...");
52         job.setReducerClass(Type0Reducer.class);
53         System.out.println("[DONE]");
54         System.out.print("[INFO]Setting Output Key Class...");
55         job.setOutputKeyClass(Text.class);
56         System.out.println("[DONE]");
57         System.out.print("[INFO]Setting Output Value Class...");
58         job.setOutputValueClass(DoubleWritable.class);
59         System.out.println("[DONE]");
60         System.out.print("[INFO]Setting Input Format Class...");
61         job.setInputFormatClass(TextInputFormat.class);
62         System.out.println("[DONE]");
63         System.out.print("[INFO]Setting Output Format Class...");
64         job.setOutputFormatClass(TextOutputFormat.class);
65         System.out.println("[DONE]");
66         System.out.print("[INFO]Setting Map Output Key Class...");
67         job.setMapOutputKeyClass(DoubleWritable.class);
68         System.out.println("[DONE]");
69         System.out.print("[INFO]Setting Map Output Value Class...");
70         job.setMapOutputValueClass(Text.class);
71         System.out.println("[DONE]");
72         System.out.print("[INFO]Creating Path Varable(s)...");
73         Path in = new Path("hdfs://localhost:9000/user/test/none_output/part-r-00000");
74         Path out = new Path("hdfs://localhost:9000/user/test/none_output1");
75         System.out.println("[DONE]");
76         System.out.print("[INFO]Adding Left Input Path...");
77         FileInputFormat.addInputPath(job, in);
78         System.out.println("[DONE]");
79         System.out.print("[INFO]Setting Output Path...");
80         FileOutputFormat.setOutputPath(job, out);
81         System.out.println("[DONE]");
82         System.out.println("[INFO]Process Now Running...");
83         System.exit(job.waitForCompletion(true) ? 0 : 1);
84     }
85 }
MapRedExtraStep1_2
  1 package konoha.pkg.pkg01;
  2 
  3 import java.io.BufferedReader;
  4 import java.io.FileReader;
  5 import java.io.IOException;
  6 import java.net.URI;
  7 import java.net.URISyntaxException;
  8 import java.util.HashMap;
  9 
 10 import org.apache.hadoop.conf.Configuration;
 11 import org.apache.hadoop.fs.Path;
 12 import org.apache.hadoop.io.Text;
 13 import org.apache.hadoop.mapreduce.Job;
 14 import org.apache.hadoop.mapreduce.Mapper;
 15 import org.apache.hadoop.mapreduce.Reducer;
 16 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 17 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 18 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 19 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 20 
 21 public class MapRedExtraStep1_3 {
 22     public static double limit = 5000;
 23 
 24     public static class Type0Mapper extends Mapper<Object, Text, Text, Text> {
 25         public HashMap<String, String> corp = new HashMap<String, String>();
 26 
 27         @Override
 28         public void setup(Context context) throws IOException {
 29             String fileName = context.getLocalCacheFiles()[0].getName();
 30             BufferedReader reader = new BufferedReader(new FileReader(fileName));
 31             String line = null;
 32             String str[] = null;
 33             for (int i = 0; i < 300 && null != (line = reader.readLine()); i++) {
 34                 str = line.split("	");
 35                 if (str.length == 2) {
 36                     corp.put(str[0], "");
 37                 }
 38             }
 39             reader.close();
 40         }
 41 
 42         @Override
 43         public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
 44             String line = value.toString();
 45             String str[] = line.split("	");
 46             if (str.length == 5 && corp.containsKey(str[0])) {
 47                 context.write(new Text(str[0]), new Text(str[1] + "	" + str[2] + "	" + str[3] + "," + str[4]));
 48             }
 49         }
 50     }
 51 
 52     public static class Type0Reducer extends Reducer<Text, Text, Text, Text> {
 53         @Override
 54         public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
 55             String str[] = null;
 56             for (Text val : values) {
 57                 str = val.toString().split(",");
 58                 if (str.length == 2 && Math.abs(Double.parseDouble(str[1])) >= limit) {
 59                     String res = "虚开发票(商品缺失)";
 60                     if (Double.parseDouble(str[1]) > 0) {
 61                         res = "漏开发票(商品溢出)";
 62                     }
 63                     context.write(key, new Text(str[0] + "	" + str[1] + "	" + res));
 64                 }
 65             }
 66         }
 67     }
 68 
 69     public static void main(String[] args)
 70             throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
 71         Configuration conf = new Configuration();
 72         System.out.println("[INFO]Starting Job...");
 73         Job job = new Job(conf, "MyAverageExtraStep1_1");
 74         System.out.print("[INFO]Setting Jar By Class...");
 75         job.setJarByClass(MapRedExtraStep1_3.class);
 76         System.out.println("[DONE]");
 77         System.out.print("[INFO]Setting Mapper Class...");
 78         job.setMapperClass(Type0Mapper.class);
 79         System.out.println("[DONE]");
 80         System.out.print("[INFO]Setting Reducer Class...");
 81         job.setReducerClass(Type0Reducer.class);
 82         System.out.println("[DONE]");
 83         System.out.print("[INFO]Setting Output Key Class...");
 84         job.setOutputKeyClass(Text.class);
 85         System.out.println("[DONE]");
 86         System.out.print("[INFO]Setting Output Value Class...");
 87         job.setOutputValueClass(Text.class);
 88         System.out.println("[DONE]");
 89         System.out.print("[INFO]Setting Input Format Class...");
 90         job.setInputFormatClass(TextInputFormat.class);
 91         System.out.println("[DONE]");
 92         System.out.print("[INFO]Setting Output Format Class...");
 93         job.setOutputFormatClass(TextOutputFormat.class);
 94         System.out.println("[DONE]");
 95         System.out.print("[INFO]Setting Map Output Key Class...");
 96         job.setMapOutputKeyClass(Text.class);
 97         System.out.println("[DONE]");
 98         System.out.print("[INFO]Setting Map Output Value Class...");
 99         job.setMapOutputValueClass(Text.class);
100         System.out.println("[DONE]");
101         System.out.print("[INFO]Creating Path Varable(s)...");
102         Path in = new Path("hdfs://localhost:9000/user/test/result/part-r-00000");
103         URI uri = new URI("hdfs://localhost:9000/user/test/none_output1/part-r-00000");
104         Path out = new Path("hdfs://localhost:9000/user/test/none_output2");
105         System.out.println("[DONE]");
106         System.out.print("[INFO]Adding Input Path...");
107         FileInputFormat.addInputPath(job, in);
108         System.out.println("[DONE]");
109         System.out.print("[INFO]Adding Input URI...");
110         job.addCacheFile(uri);
111         System.out.println("[DONE]");
112         System.out.print("[INFO]Setting Output Path...");
113         FileOutputFormat.setOutputPath(job, out);
114         System.out.println("[DONE]");
115         System.out.println("[INFO]Process Now Running...");
116         System.exit(job.waitForCompletion(true) ? 0 : 1);
117     }
118 }
MapRedExtraStep1_3

-----------------------------------------------------------------------------------26.11.2020更新-----------------------------------------------------------------------------------

发现第一步的合并表的废票判断貌似写反了,修改且改为将两表大多数信息一并合并的MapRedStep1如下

  1 package konoha.pkg.pkg01;
  2 
  3 import java.io.IOException;
  4 import java.net.URISyntaxException;
  5 import java.util.Vector;
  6 
  7 import org.apache.hadoop.conf.Configuration;
  8 import org.apache.hadoop.fs.Path;
  9 import org.apache.hadoop.io.Text;
 10 import org.apache.hadoop.mapreduce.Job;
 11 import org.apache.hadoop.mapreduce.Mapper;
 12 import org.apache.hadoop.mapreduce.Reducer;
 13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 14 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 15 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 17 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 18 
 19 class DealInfo {
 20     public String buyerId = null;
 21     public String sellerId = null;
 22 
 23     public DealInfo(String buyer, String seller) {
 24         buyerId = buyer;
 25         sellerId = seller;
 26     }
 27 }
 28 
 29 public class MapRedStep1 {
 30 
 31     public static class Type0Mapper extends Mapper<Object, Text, Text, Text> {
 32 
 33         public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
 34             String filePath = ((FileSplit) context.getInputSplit()).getPath().toString();
 35             String line = value.toString();
 36             line = line.substring(1, line.length() - 1);
 37             // System.out.println("[DEBUG]Processing Line:" + line);
 38             String str[] = line.split(",");
 39             if (filePath.contains("hwmx")) {
 40                 if (str.length == 10) {
 41                     context.write(new Text(str[0]), new Text("nya" + str[1] + "	" + str[2] + "	" + str[3] + "	"
 42                             + str[4] + "	" + str[6] + "	" + str[7] + "	" + str[8] + "	" + str[9] + "," + str[5]));
 43                 }
 44             } else {
 45                 if (str.length == 9) {
 46                     if (!str[8].equals("Y"))
 47                         context.write(new Text(str[0]), new Text("kon" + str[1] + "	" + str[2]));
 48                 }
 49 
 50             }
 51 
 52         }
 53     }
 54 
 55     public static class Type0Reducer extends Reducer<Text, Text, Text, Text> {
 56         @Override
 57         public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
 58             String buyer = null;
 59             String seller = null;
 60             Vector<String> nya = new Vector<String>();
 61             String tmp = null;
 62             String str[] = null;
 63             for (Text val : values) {
 64                 tmp = val.toString();
 65                 if (tmp.startsWith("kon")) {
 66                     str = tmp.substring(3).split("	");
 67                     buyer = str[0];
 68                     seller = str[1];
 69                 } else if (tmp.startsWith("nya")) {
 70                     nya.add(tmp.substring(3));
 71                 }
 72             }
 73             if (buyer != null && seller != null) {
 74                 for (int j = 0; j < nya.size(); j++) {
 75                     str = nya.get(j).split(",");
 76                     if (!str[1].equals("null")) {
 77                         context.write(new Text(seller), new Text(str[0] + "	" + (-Double.parseDouble(str[1]))));
 78                         context.write(new Text(buyer), new Text(str[0] + "	" + Double.parseDouble(str[1])));
 79                     }
 80                 }
 81             }
 82         }
 83     }
 84 
 85     public static void main(String[] args)
 86             throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
 87         Configuration conf = new Configuration();
 88         System.out.println("[INFO]Starting Job...");
 89         Job job = new Job(conf, "MyAverageStep1");
 90         System.out.print("[INFO]Setting Jar By Class...");
 91         job.setJarByClass(MapRedStep1.class);
 92         System.out.println("[DONE]");
 93         System.out.print("[INFO]Setting Mapper Class...");
 94         job.setMapperClass(Type0Mapper.class);
 95         System.out.println("[DONE]");
 96         System.out.print("[INFO]Setting Reducer Class...");
 97         job.setReducerClass(Type0Reducer.class);
 98         System.out.println("[DONE]");
 99         System.out.print("[INFO]Setting Output Key Class...");
100         job.setOutputKeyClass(Text.class);
101         System.out.println("[DONE]");
102         System.out.print("[INFO]Setting Output Value Class...");
103         job.setOutputValueClass(Text.class);
104         System.out.println("[DONE]");
105         System.out.print("[INFO]Setting Input Format Class...");
106         job.setInputFormatClass(TextInputFormat.class);
107         System.out.println("[DONE]");
108         System.out.print("[INFO]Setting OutputFormatClass...");
109         job.setOutputFormatClass(TextOutputFormat.class);
110         System.out.println("[DONE]");
111         System.out.print("[INFO]Creating Path Varable(s)...");
112         Path left = new Path("hdfs://localhost:9000/user/test/zzsfp");
113         Path right = new Path("hdfs://localhost:9000/user/test/zzsfp_hwmx");
114         Path out = new Path("hdfs://localhost:9000/user/test/temp");
115         System.out.println("[DONE]");
116         System.out.print("[INFO]Adding Left Input Path...");
117         FileInputFormat.addInputPath(job, left);
118         System.out.println("[DONE]");
119         System.out.print("[INFO]Adding Right Input Path...");
120         FileInputFormat.addInputPath(job, right);
121         System.out.println("[DONE]");
122         System.out.print("[INFO]Setting Output Path...");
123         FileOutputFormat.setOutputPath(job, out);
124         System.out.println("[DONE]");
125         System.out.println("[INFO]Process Now Running...");
126         System.exit(job.waitForCompletion(true) ? 0 : 1);
127     }
128 }
MapRedStep1.java(修复并修改合并后结构)
原文地址:https://www.cnblogs.com/minadukirinno/p/14033211.html