Map过程中使用静态变量和静态方法得到数据库中数据

package org.apache.hadoop.examples;

import java.io.FileOutputStream;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import com.mysql.jdbc.Statement;
import com.yangbo.util.Methods;
import com.yangbo.util.SqlHelper;

import java.sql.*;

public class GridsTzDay  {

    public static class Map extends Mapper<LongWritable,Text,Text,IntWritable>{    
        private Text k = new Text();
        public final static IntWritable v = new IntWritable();
        
        public static ArrayList<ArrayList> getArrayList(){
            
            Connection conn = null;
            String sql = "select id,minLon,minLat,maxLon,maxLat from grids_information";
            String url = "jdbc:mysql://192.168.1.164:3306/mqs?user=root&password=654321&useUnicode=true&amp;characterEncoding=UTF-8";
            
            ArrayList<ArrayList> arrayList = new ArrayList();
            ArrayList<String> temp = new ArrayList();
            
            try{
                Class.forName("com.mysql.jdbc.Driver");
                conn = DriverManager.getConnection(url);
                java.sql.Statement stmt = conn.createStatement();
                ResultSet rs = stmt.executeQuery(sql);
                while(rs.next()){
                    temp.clear();
                    temp.add(rs.getString(1));
                    temp.add(rs.getString(2));
                    temp.add(rs.getString(3));
                    temp.add(rs.getString(4));
                    temp.add(rs.getString(5));
                    arrayList.add((ArrayList) temp.clone());
                }
                
            }catch (SQLException e) {
                System.out.println("MySQL操作错误");
                e.printStackTrace();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    conn.close();
                } catch (SQLException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            
            return arrayList;
        }
        
        public static ArrayList<ArrayList> arrayList = Map.getArrayList();
        
        public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
            String[] s = value.toString().split(",");
            String yyyymmdd = s[0].trim();
             //String str = arrayList.get(0).get(1).toString();
             
            double lng;
            double lat;
            if(s[12].trim().equals("null")){
                lng = 720;
            }else{
                lng = Double.parseDouble(s[12].trim());
            }
             
             if(s[13].trim().equals("null")){
                 lat = 720;
             }else{
                 lat = Double.parseDouble(s[13].trim());
             }
             
             String gid = null;
            
             for(int i=0;i<arrayList.size();i++){
                 double minLng = Double.parseDouble(arrayList.get(i).get(1).toString());
                 double minLat = Double.parseDouble(arrayList.get(i).get(2).toString());
                 double maxLng = Double.parseDouble(arrayList.get(i).get(3).toString());
                 double maxLat = Double.parseDouble(arrayList.get(i).get(4).toString());
                 
                 if(lng>=minLng && lng<maxLng && lat>=minLat && lat<maxLat){
                     gid = arrayList.get(i).get(0).toString();
                     int weakFlag;
                     if(Integer.parseInt(s[10].trim())>-85){
                         weakFlag = 1;
                     }else{
                         weakFlag = 0;
                     }
                     
                     k.set(yyyymmdd+"	"+gid);
                     v.set(weakFlag);
                     
                     context.write(k, v);    
                 
                 }
                 
             }
             
                    
        }
    }
    
    public static class Reduce extends Reducer<Text,IntWritable,Text,FloatWritable>{    
        private FloatWritable result = new FloatWritable();
        public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException{            
            float weakNum=0;
            float totalNum=0;
            
            for(IntWritable val : values){
                if(val.get()==0){
                    weakNum++;
                    totalNum++;
                }else{
                    totalNum++;
                }
                
            }
            
            float rate = (weakNum/totalNum)*100;
            
            result.set(rate);
            context.write(key, result);    
        }
    }
    
    
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException,SQLException {            
        

        
        Configuration conf1 = new Configuration();
        conf1.set("mapred.job.tracker", "192.168.1.164:9001");
        
        FileSystem fs=FileSystem.get(conf1);
        Path delef=new Path("grid_weak_rate");
        boolean isDeleted=fs.delete(delef,true);
        System.out.println("Delete?"+isDeleted);
        
        
        String[] ars=new String[]{"device_information","grid_weak_rate"};
        String[] otherArgs = new GenericOptionsParser(conf1, ars).getRemainingArgs();
        if (otherArgs.length != 2) {
          System.err.println("Usage: GridT2G  ");
          System.exit(2);
        }
        Job job1 = new Job(conf1, "Job1");
        job1.setJarByClass(GridWeakRssiRate.class);
        job1.setMapperClass(Map.class);
        job1.setReducerClass(Reduce.class);
        job1.setOutputKeyClass(Text.class);
        job1.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job1, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job1, new Path(otherArgs[1]));
        job1.waitForCompletion(true);
        //sub Mapreduce
        



      }
}
package org.apache.hadoop.examples;

import java.io.FileOutputStream;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import com.mysql.jdbc.Statement;
import com.yangbo.util.Methods;
import com.yangbo.util.SqlHelper;

import java.sql.*;

public class GridsTzDay  {

    public static class Map extends Mapper<LongWritable,Text,Text,IntWritable>{    
        private Text k = new Text();
        public final static IntWritable v = new IntWritable();
        
        public static ArrayList<ArrayList> getArrayList(){
            
            Connection conn = null;
            String sql = "select id,minLon,minLat,maxLon,maxLat from grids_information";
            String url = "jdbc:mysql://192.168.1.164:3306/mqs?user=root&password=654321&useUnicode=true&amp;characterEncoding=UTF-8";
            
            ArrayList<ArrayList> arrayList = new ArrayList();
            ArrayList<String> temp = new ArrayList();
            
            try{
                Class.forName("com.mysql.jdbc.Driver");
                conn = DriverManager.getConnection(url);
                java.sql.Statement stmt = conn.createStatement();
                ResultSet rs = stmt.executeQuery(sql);
                while(rs.next()){
                    temp.clear();
                    temp.add(rs.getString(1));
                    temp.add(rs.getString(2));
                    temp.add(rs.getString(3));
                    temp.add(rs.getString(4));
                    temp.add(rs.getString(5));
                    arrayList.add((ArrayList) temp.clone());
                }
                
            }catch (SQLException e) {
                System.out.println("MySQL操作错误");
                e.printStackTrace();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    conn.close();
                } catch (SQLException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            
            return arrayList;
        }
        
        public static ArrayList<ArrayList> arrayList = Map.getArrayList();
        
        public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
            String[] s = value.toString().split(",");
            String yyyymmdd = s[0].trim();
             //String str = arrayList.get(0).get(1).toString();
             
            double lng;
            double lat;
            if(s[12].trim().equals("null")){
                lng = 720;
            }else{
                lng = Double.parseDouble(s[12].trim());
            }
             
             if(s[13].trim().equals("null")){
                 lat = 720;
             }else{
                 lat = Double.parseDouble(s[13].trim());
             }
             
             String gid = null;
            
             for(int i=0;i<arrayList.size();i++){
                 double minLng = Double.parseDouble(arrayList.get(i).get(1).toString());
                 double minLat = Double.parseDouble(arrayList.get(i).get(2).toString());
                 double maxLng = Double.parseDouble(arrayList.get(i).get(3).toString());
                 double maxLat = Double.parseDouble(arrayList.get(i).get(4).toString());
                 
                 if(lng>=minLng && lng<maxLng && lat>=minLat && lat<maxLat){
                     gid = arrayList.get(i).get(0).toString();
                     int weakFlag;
                     if(Integer.parseInt(s[10].trim())>-85){
                         weakFlag = 1;
                     }else{
                         weakFlag = 0;
                     }
                     
                     k.set(yyyymmdd+"	"+gid);
                     v.set(weakFlag);
                     
                     context.write(k, v);    
                 
                 }
                 
             }
             
                    
        }
    }
    
    public static class Reduce extends Reducer<Text,IntWritable,Text,FloatWritable>{    
        private FloatWritable result = new FloatWritable();
        public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException{            
            float weakNum=0;
            float totalNum=0;
            
            for(IntWritable val : values){
                if(val.get()==0){
                    weakNum++;
                    totalNum++;
                }else{
                    totalNum++;
                }
                
            }
            
            float rate = (weakNum/totalNum)*100;
            
            result.set(rate);
            context.write(key, result);    
        }
    }
    
    
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException,SQLException {            
        

        
        Configuration conf1 = new Configuration();
        conf1.set("mapred.job.tracker", "192.168.1.164:9001");
        
        FileSystem fs=FileSystem.get(conf1);
        Path delef=new Path("grid_weak_rate");
        boolean isDeleted=fs.delete(delef,true);
        System.out.println("Delete?"+isDeleted);
        
        
        String[] ars=new String[]{"device_information","grid_weak_rate"};
        String[] otherArgs = new GenericOptionsParser(conf1, ars).getRemainingArgs();
        if (otherArgs.length != 2) {
          System.err.println("Usage: GridT2G  ");
          System.exit(2);
        }
        Job job1 = new Job(conf1, "Job1");
        job1.setJarByClass(GridWeakRssiRate.class);
        job1.setMapperClass(Map.class);
        job1.setReducerClass(Reduce.class);
        job1.setOutputKeyClass(Text.class);
        job1.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job1, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job1, new Path(otherArgs[1]));
        job1.waitForCompletion(true);
        //sub Mapreduce
        



      }
}
原文地址:https://www.cnblogs.com/jingyunyb/p/3423184.html