7月29日

今天突然发生问题一开始只有xshell连接hadoop102连接失败 103104是正常的 我就点了几次那个xshell的打开 然后vm就卡了 开始未响应  卡了很久 我只能关机 再开虚拟机还是连不上然后一下子又未响应 又关机 几次未响应之后三台都连不上了

经过好几个小时的调试,调试各种配置, 我的所有配置都没有问题,,是虚拟机自己的卡顿,过了一段时间后就恢复了

后面学习了序列化,跟着学习了一个简单的案例

** map reduce在两个不同服务器上,传递bean数据需要进行序列化,将内存中的数据存储到硬盘,从另一个服务器上将硬盘数据反序列化到内存

 

 要自己写一个类

 

 

 

FlowBean
 
* 1 定义类实现writable接口
 
* 2 重写序列化和反序列化分方法
 
* 3 重写空参构造
 
* 4 toString方法
 
*/
public class FlowBean implements Writable {
    private long upFlow;//上行流量
   
private long downFlow;//下行流量
   
private long sumFlow;//总流量

   
//空参构造
   
public FlowBean() {
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }
    public void setSumFlow() {
        this.sumFlow = this.downFlow+this.upFlow;
    }
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.upFlow=in.readLong();
        this.downFlow=in.readLong();
        this.sumFlow=in.readLong();
    }

    @Override
    public String toString() {
        return upFlow +" " + downFlow + " " + sumFlow ;
    }
}

FlowMapper类

public class FlowMapper extends Mapper<LongWritable, Text,Text,FlowBean> {
    private Text outK=new Text();
    private FlowBean outv=new FlowBean();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //1 获取一行
       
String line=value.toString();
        //2 切割
       
String[] split = line.split(" ");
        //3 抓取需要内容
       
String phone=split[1];
        String up=split[split.length-3];
        String down=split[split.length-2];
        //4 封装
       
outK.set(phone);
        outv.setUpFlow(Long.parseLong(up));
        outv.setDownFlow(Long.parseLong(down));
        outv.setSumFlow();
        //写出
       
context.write(outK,outv);
    }
}

FlowReducer类

public class FlowReducer extends Reducer<Text,FlowBean,Text,FlowBean> {

    private FlowBean outv=new FlowBean();
    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
        //1 遍历集合累加key相同量
       
long allup=0;
        long alldown=0;
        for (FlowBean value : values) {
            allup+=value.getUpFlow();
            alldown+=value.getDownFlow();
        }
        //2 封装
       
outv.setUpFlow(allup);
        outv.setDownFlow(alldown);
        outv.setSumFlow();

        //3 写出
       
context.write(key,outv);
    }
}

FlowDriver类

public class FlowDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //1 获取job对象
       
Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        //2 设置jar
       
job.setJarByClass(FlowDriver.class);

        //3 关联mapperreducer
       
job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);

        //4 设置mapper 输出的keyvalue的类型
       
job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        //5 设置最终数据输出的keyvalue的类型
       
job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        //6 设置数据的输入输出路径
        
FileInputFormat.setInputPaths(job,new Path("E:\hadoop\Flow"));
        FileOutputFormat.setOutputPath(job,new Path("E:\hadoop\Flowout1"));
        //7 提交job
       
boolean result = job.waitForCompletion(true);
        System.exit(result ? 0:1);
    }
}

 

学习时间:12:25到15:50

原文地址:https://www.cnblogs.com/buyaoya-pingdao/p/15075486.html