每日一题 为了工作 2020 0508 第六十六题

package spark.action.factory;


import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;


import java.util.*;

/**
 *
 * @author 雪瞳
 * @Slogan 时钟尚且前行,人怎能就此止步!
 * @Function 模拟数据并创建DataFrame
 *
 */
public class MockData {
    public static void main(String[] args) {
        String master = "local";
        String appname = "dataFrame";
        SparkSession session = SparkSession.builder().master(master).appName(appname).getOrCreate();
        JavaSparkContext jsc = JavaSparkContext.fromSparkContext(session.sparkContext());

        List<Row> dataList = new ArrayList<>();
        Random random = new Random();

        String[] locations = new String[]{"鲁","京","冀","鄂","粤","沪","京","深","蒙","川"};
        String date = DateUtils.getTodayDate();

        for (int i=0 ; i < 3000 ; i++){
            //车牌号
            String car = locations[random.nextInt(locations.length)]+
                    (char)(65+random.nextInt(26))+
                     StringUtils.fullFillNumBites(5,String.valueOf(random.nextInt(10000)));
            //模拟24小时 yyyyMMdd  HH
            String baseActionTime = date+" "+
                     StringUtils.fullFillTwoBites(String.valueOf(random.nextInt(24)));
            //模拟一辆车被多少摄像头拍摄
            for (int j=0; j< random.nextInt(300)+1 ;j++){
                //每30个摄像头 小时+1
                if (j % 30==0 && j!=0){
                    int tmp = Integer.parseInt(
                            baseActionTime.split(" ")[1]) + 1;
                    baseActionTime = date+ " "+
                            StringUtils.fullFillTwoBites(String.valueOf(tmp));
                }
                //模拟区域ID 1-8
                String areaId = StringUtils.fullFillNumBites(2,
                        String.valueOf(random.nextInt(8)+1));
                //模拟道路ID 1-50
                String roadId = String.valueOf(random.nextInt(50)+1);
                //模拟路口数
                String monitorId = StringUtils.fullFillNumBites(4,
                        String.valueOf(random.nextInt(9)+1));
                //模拟车辆被多少个摄像头拍摄
                String cameraId = StringUtils.fullFillNumBites(5,
                        String.valueOf(random.nextInt(100000)+1));
                //模拟经过此路口开始时间 ,如:2018-01-01 20:09:10
                String actionTime = baseActionTime+
                        StringUtils.fullFillTwoBites(String.valueOf(random.nextInt(60)))+
                        StringUtils.fullFillTwoBites(String.valueOf(random.nextInt(60)));
                //模拟车速
                String speed = String.valueOf(random.nextInt(260)+1);
                //
                Row row = RowFactory.create(date, monitorId, cameraId, car, actionTime, speed, roadId, areaId);
                dataList.add(row);
            }
        }

        //将list序列化成row类型的javaRDD
        JavaRDD<Row> rowJavaRDD = jsc.parallelize(dataList);
        //动态创建schema方式创建DataFrame
        StructType structType = DataTypes.createStructType(Arrays.asList(
                DataTypes.createStructField("date", DataTypes.StringType, true),
                DataTypes.createStructField("monitor_id", DataTypes.StringType, true),
                DataTypes.createStructField("camera_id", DataTypes.StringType, true),
                DataTypes.createStructField("car", DataTypes.StringType, true),
                DataTypes.createStructField("action_time", DataTypes.StringType, true),
                DataTypes.createStructField("speed", DataTypes.StringType, true),
                DataTypes.createStructField("road_id", DataTypes.StringType, true),
                DataTypes.createStructField("area_id", DataTypes.StringType, true)
        ));
        //创建DataFrame
        Dataset<Row> dataFrame = session.createDataFrame(rowJavaRDD, structType);

        //打印数据
        System.err.println("车辆信息数据");
        dataFrame.show(50);
        dataFrame.registerTempTable("monitor_flow_action");
        //生成路口号与摄像头的对应表
        Map<String,Set<String>> monitorAndCameras = new HashMap<>();

        int index = 0;
        for (Row row : dataList){

            String monitorId = row.getString(1);
            Set<String> sets = monitorAndCameras.get(monitorId);
            if (sets == null){
                sets = new HashSet<>();
                monitorAndCameras.put(monitorId,sets);
            }
            index ++;
            if (index % 1000 == 0){
                sets.add(StringUtils.fullFillNumBites(5,
                        String.valueOf(random.nextInt(100000))));
            }
            String cameraId = row.getString(2);
            sets.add(cameraId);
        }
        //创建路口号与摄像头对应的dataFrame
        dataList.clear();
        Set<Map.Entry<String, Set<String>>> entrySet = monitorAndCameras.entrySet();
        for (Map.Entry<String, Set<String>> entry:entrySet){
            String monitorId = entry.getKey();
            Set<String> cameraIds = entry.getValue();
            Row row = null;
            for (String cameraId : cameraIds){
                row = RowFactory.create(monitorId,cameraId);
                dataList.add(row);
            }
        }
        //动态创建schema
        StructType structTypeTwo = DataTypes.createStructType(Arrays.asList(
                DataTypes.createStructField("monitor_id", DataTypes.StringType, true),
                DataTypes.createStructField("camera_id", DataTypes.StringType, true)
        ));
        JavaRDD<Row> parallelize = jsc.parallelize(dataList);
        Dataset<Row> dataFrameTwo = session.createDataFrame(parallelize, structTypeTwo);
        dataFrameTwo.registerTempTable("monitor_camera_info");
        System.err.println("路口与摄像头");
        dataFrameTwo.show(50);
    }
}

  

原文地址:https://www.cnblogs.com/walxt/p/12853054.html