spark读文件写mysql(java版)

package org.langtong.sparkdemo;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.storage.StorageLevel;
import scala.Tuple2;

import java.io.IOException;
import java.io.Serializable;
import java.util.*;
import java.util.Map.Entry;

/**
 * spark读文件 入mysql
 * org.langtong.sparkdemo.SparkReadFile2MysqlFull
 *
 * @author Administrator
 */
public class SparkReadFile2MysqlFull implements Serializable {
    private static final long serialVersionUID = 1L;
    private static Properties connectionProperties;
    private static JavaSparkContext jsc;
    private static SparkSession lalala;
    private static SQLContext sqlContext;
    private static String url = "jdbc:mysql://192.168.2.258:3306/diagbot?Unicode=true&characterEncoding=utf8&autoReconnect=true&failOverReadOnly=false";
    private static String mysqldriver = "com.mysql.jdbc.Driver";
    private static String user = "root";
    private static String password = "diagbot@db";
    private static String[] serials = new String[]{"13", "14", "15", "16", "17",
            "18", "19", "20", "21", "22", "23", "24", "25", "26", "27", "28", "29", "30", "31", "32"};
    private static String[] tables = new String[]{"emrdata_1", "emrdata_2",
            "emrdata_3", "emrdata_4", "emrdata_5", "emrdata_6", "emrdata_7",
            "emrdata_8", "emrdata_9"};
    private static Map<String, Object> map_t = new HashMap<String, Object>();
    private static Map<String, String> map_s = new HashMap<String, String>();
    private static Map<String, String> map_r = new HashMap<String, String>();
    private static Map<String, String> map_k = new HashMap<String, String>();

    public static void main(String[] args) throws IOException {
        for (int i = 0; i <= 57; i++) {
            StringBuffer stringBuffer = new StringBuffer();
            stringBuffer.append("hdfs://192.168.2.258:9000/datas/parquetFile/ord_detail_0718/part");
            stringBuffer.append(i);
            ordDetail_full(stringBuffer.toString(), i);
        }
//        ordDetail_full("/opt/diagbot/datas/srrsh/ord_detail/part0");

    }


    @SuppressWarnings("unused")
    private static void emrdataIndedx(String filePath, JavaSparkContext jsc,
                                      SQLContext sqlContext) {
        JavaRDD<String> javaRDD = jsc.textFile(filePath);
        String[] fields = {"pk_dcemr", "pk_dcpv", "empi", "code_pati",
                "code_pvtype", "code_ref_emr", "code_emr_type",
                "name_emr_type", "prlog_rdn", "code_dept_emr", "name_dept_emr",
                "code_psn_edit", "data_source", "source_pk", "create_time"};

        JavaRDD<Row> mapPartitions = javaRDD.mapPartitions(new FlatMapFunction<Iterator<String>, Row>() {
            private static final long serialVersionUID = 1L;
            ObjectMapper mapper = new ObjectMapper();

            @SuppressWarnings("unchecked")
            public Iterator<Row> call(Iterator<String> iterator)
                    throws Exception {
                ArrayList<Row> arrayList = new ArrayList<Row>();
                // TODO Auto-generated method stub
                while (iterator.hasNext()) {
                    try {
                        String next = iterator.next();
                        map_t = mapper.readValue(next, Map.class);
                        for (Entry<String, Object> entry : map_t.entrySet()) {
                            map_s.put(entry.getKey(),
                                    String.valueOf(entry.getValue()));
                        }
                    } catch (Exception e) {
                        // TODO: handle exception
                        return null;
                    }
                    arrayList.add(createEmerIndexRow(map_s));
                }
                return arrayList.iterator();
            }
        });
        StructType schema = createStructType(fields);
        HashMap<String, JavaRDD<Row>> hashMap2 = new HashMap<String, JavaRDD<Row>>();
        Dataset<Row> createDataFrame1 = sqlContext.createDataFrame(mapPartitions,
                schema);
        createDataFrame1.write().mode(SaveMode.Append)
                .jdbc(url, "emrdataindex_1", connectionProperties);
    }

    public static JavaRDD<Row> emrdata(JavaRDD<String> javaRDD) {
        JavaRDD<Row> rowRDD = javaRDD.map(new Function<String, Row>() {
            private static final long serialVersionUID = 1L;

            public Row call(String line) throws Exception {
                // TODO Auto-generated method stub
                try {
                    ObjectMapper mapper = new ObjectMapper();
                    map_t = mapper.readValue(line, Map.class);
                    for (Entry<String, Object> entry : map_t.entrySet()) {
                        map_s.put(entry.getKey(),
                                String.valueOf(entry.getValue()));
                    }

                } catch (Exception e) {
                    // TODO: handle exception
                    map_s.put("pvcode", "99999999");
                    map_s.put("remark", line);
                    map_s.put("pcode", "3100|H02");
                    return createEmrdataRow(map_s);
                }
                return createEmrdataRow(map_s);

            }
        });

        return rowRDD;
    }

    private static StructType createStructType(String[] fields) {
        LinkedList<StructField> structFieldsList = new LinkedList<StructField>();
        for (String field : fields) {
            structFieldsList.add(DataTypes.createStructField(field,
                    DataTypes.StringType, true));
        }
        StructType schema = DataTypes.createStructType(structFieldsList);
        return schema;

    }

    private static JavaSparkContext getContext(String master) {
        SparkConf conf = new SparkConf().setAppName("SparkReadkwz").setMaster(
                master);
        conf.set("spark.scheduler.mode", "FAIR");
        JavaSparkContext jsc = new JavaSparkContext(conf);
        return jsc;
    }

    private static void ordRec(String filePath) {
        String master = "spark://192.168.2.258:7077";
        jsc = getContext(master);
        sqlContext = new SQLContext(jsc);
        connectionProperties = new Properties();
        connectionProperties.put("user", user);
        connectionProperties.put("password", password);
        connectionProperties.put("driver", "com.mysql.jdbc.Driver");
        JavaRDD<String> javaRDD = jsc.textFile(filePath);
        String[] ordrecFields = {"pk_ord_record", "pk_dcord", "pk_dcpv", "code_pvtype", "name_pvtype", "pvcode"
                , "code_ord", "empi", "code_pati", "code_sex", "name_sex", "age", "code_dept", "name_dept", "bed"
                , "pk_dcordrisreq", "code_req", "code_rep", "code_rep_type", "name_rep_type", "code_eu_type"
                , "name_eu_type", "code_eu_item", "name_eu_item", "create_time"};
        StructType schema = createStructType(ordrecFields);
        JavaRDD<Row> mapPartitions = javaRDD.map(new Function<String, Row>() {
            private static final long serialVersionUID = 1L;
            ObjectMapper mapper = new ObjectMapper();

            public Row call(String line) throws Exception {
                // TODO Auto-generated method stub
                try {
                    map_t = mapper.readValue(line, Map.class);
                    for (Entry<String, Object> entry : map_t.entrySet()) {
                        map_s.put(entry.getKey(), String.valueOf(entry.getValue()));
                    }
                } catch (Exception e) {
                    // TODO: handle exception
                    return null;
                }
                return createOrdRecRow(map_s);
            }
        });

        //去重
        JavaRDD<Row> distinctRDD = mapPartitions.distinct();

        JavaRDD<Row> filterRDD = distinctRDD.filter(new Function<Row, Boolean>() {
            @Override
            public Boolean call(Row row) throws Exception {
                if (null == row) return false;
                if ("P".equals(row.getString(3)) || "null".equals(row.getString(3))) return false;

                if (row.getString(2).split("_").length == 1) return false;
                return true;
            }
        });

        //缓存
        JavaRDD<Row> persistRDD = filterRDD.persist(StorageLevel.MEMORY_AND_DISK_SER());
        JavaRDD<Row> filterRDD1 = null;
        Dataset<Row> dataFrame1 = null;
        for (final String se : serials
                ) {
            filterRDD1 = persistRDD.filter(new Function<Row, Boolean>() {
                @Override
                public Boolean call(Row row) throws Exception {
                    return row.getString(0).startsWith(se);
                }
            });
            if (!filterRDD1.isEmpty()) {
                dataFrame1 = sqlContext.createDataFrame(filterRDD1, schema);
                dataFrame1.write().mode(SaveMode.Append).jdbc(url, "ord_rec" + se, connectionProperties);
            }

        }
        jsc.close();

    }

    private static void ord(String filePath) {
        String master = "spark://192.168.2.258:7077";
        jsc = getContext(master);
        sqlContext = new SQLContext(jsc);
        connectionProperties = new Properties();
        connectionProperties.put("user", user);
        connectionProperties.put("password", password);
        connectionProperties.put("driver", "com.mysql.jdbc.Driver");
        JavaRDD<String> javaRDD = jsc.textFile(filePath);
        final String[] ordPart2Fields = {"pvcode", "pk_dcpv", "empi",
                "code_pvtype", "name_pvtype", "code_sex", "name_sex",
                "birthday", "code_dept", "name_dept", "code_ord",
                "code_orditem_type", "name_orditem_type", "code_orditem",
                "name_orditem", "date_create", "date_end", "note_ord",
                "code_pres", "parent_code", "create_time"};
        StructType schema = createStructType(ordPart2Fields);
        JavaRDD<Row> mapPartitions = javaRDD.map(new Function<String, Row>() {
            private static final long serialVersionUID = 1L;
            ObjectMapper mapper = new ObjectMapper();

            public Row call(String line) throws Exception {
                // TODO Auto-generated method stub
                try {
                    map_t = mapper.readValue(line, Map.class);
                    for (Map.Entry<String, Object> entry : map_t.entrySet()) {
                        map_s.put(entry.getKey(),
                                String.valueOf(entry.getValue()));
                    }
                } catch (Exception e) {
                    // TODO: handle exception
                    return null;
                }
                return createOrdRow(map_s);
            }
        });
        JavaRDD<Row> distinctRDD1 = mapPartitions.distinct();
        JavaRDD<Row> mapRDD1 = distinctRDD1.map(new Function<Row, Row>() {
            private static final long serialVersionUID = 1L;

            public Row call(Row row) throws Exception {
                // TODO Auto-generated method stub
                try {
                    String trimCodeType = row.getString(11).trim();
                    for (int i = 0; i < ordPart2Fields.length; i++) {
                        map_k.put(ordPart2Fields[i], row.getString(i));
                    }
                    map_k.put("code_orditem_type", trimCodeType);
                    return createOrdRow(map_k);
                } catch (Exception e) {
                    return row;
                }
            }
        });
        JavaRDD<Row> filterRDD = mapRDD1.filter(new Function<Row, Boolean>() {
            private static final long serialVersionUID = 1L;

            public Boolean call(Row row) throws Exception {
                // TODO Auto-generated method stub
                if (row == null)
                    return false;
                if ("P".equals(row.getString(3))) {
                    return false;
                }
                return true;
            }
        });
        JavaRDD<Row> persistRDD = filterRDD.persist(StorageLevel
                .MEMORY_AND_DISK_SER());
        JavaRDD<Row> filter1 = persistRDD.filter(new Function<Row, Boolean>() {
            private static final long serialVersionUID = 1L;

            public Boolean call(Row row) throws Exception {
                return row.getString(0).startsWith("1");
            }
        });
        JavaRDD<Row> filter2 = persistRDD.filter(new Function<Row, Boolean>() {
            private static final long serialVersionUID = 1L;

            public Boolean call(Row row) throws Exception {
                return row.getString(0).startsWith("2");
            }
        });
        JavaRDD<Row> filter3 = persistRDD.filter(new Function<Row, Boolean>() {
            private static final long serialVersionUID = 1L;

            public Boolean call(Row row) throws Exception {
                return row.getString(0).startsWith("31")
                        || row.getString(0).startsWith("32")
                        || row.getString(0).startsWith("33");
            }
        });
        JavaRDD<Row> filter4 = persistRDD.filter(new Function<Row, Boolean>() {
            private static final long serialVersionUID = 1L;

            public Boolean call(Row row) throws Exception {
                return row.getString(0).startsWith("34")
                        || row.getString(0).startsWith("4");
            }
        });
        JavaRDD<Row> filter5 = persistRDD.filter(new Function<Row, Boolean>() {
            private static final long serialVersionUID = 1L;

            public Boolean call(Row row) throws Exception {
                return row.getString(0).startsWith("35")
                        || row.getString(0).startsWith("5");
            }
        });
        JavaRDD<Row> filter6 = persistRDD.filter(new Function<Row, Boolean>() {
            private static final long serialVersionUID = 1L;

            public Boolean call(Row row) throws Exception {
                return row.getString(0).startsWith("36")
                        || row.getString(0).startsWith("6");
            }
        });
        JavaRDD<Row> filter7 = persistRDD.filter(new Function<Row, Boolean>() {
            private static final long serialVersionUID = 1L;

            public Boolean call(Row row) throws Exception {
                return row.getString(0).startsWith("37")
                        || row.getString(0).startsWith("7");
            }
        });
        JavaRDD<Row> filter8 = persistRDD.filter(new Function<Row, Boolean>() {
            private static final long serialVersionUID = 1L;

            public Boolean call(Row row) throws Exception {
                return row.getString(0).startsWith("8");
            }
        });
        JavaRDD<Row> filter9 = persistRDD.filter(new Function<Row, Boolean>() {
            private static final long serialVersionUID = 1L;

            public Boolean call(Row row) throws Exception {
                return row.getString(0).startsWith("38")
                        || row.getString(0).startsWith("39")
                        || row.getString(0).startsWith("30")
                        || row.getString(0).startsWith("9");
            }
        });
        Dataset<Row> createDataFrame1 = sqlContext.createDataFrame(filter1,
                schema);
        createDataFrame1.write().mode(SaveMode.Append)
                .jdbc(url, "ord_1", connectionProperties);
        Dataset<Row> createDataFrame2 = sqlContext.createDataFrame(filter2,
                schema);
        createDataFrame2.write().mode(SaveMode.Append)
                .jdbc(url, "ord_2", connectionProperties);
        Dataset<Row> createDataFrame3 = sqlContext.createDataFrame(filter3,
                schema);
        createDataFrame3.write().mode(SaveMode.Append)
                .jdbc(url, "ord_3", connectionProperties);
        Dataset<Row> createDataFrame4 = sqlContext.createDataFrame(filter4,
                schema);
        createDataFrame4.write().mode(SaveMode.Append)
                .jdbc(url, "ord_4", connectionProperties);
        Dataset<Row> createDataFrame5 = sqlContext.createDataFrame(filter5,
                schema);
        createDataFrame5.write().mode(SaveMode.Append)
                .jdbc(url, "ord_5", connectionProperties);
        Dataset<Row> createDataFrame6 = sqlContext.createDataFrame(filter6,
                schema);
        createDataFrame6.write().mode(SaveMode.Append)
                .jdbc(url, "ord_6", connectionProperties);
        Dataset<Row> createDataFrame7 = sqlContext.createDataFrame(filter7,
                schema);
        createDataFrame7.write().mode(SaveMode.Append)
                .jdbc(url, "ord_7", connectionProperties);
        Dataset<Row> createDataFrame8 = sqlContext.createDataFrame(filter8,
                schema);
        createDataFrame8.write().mode(SaveMode.Append)
                .jdbc(url, "ord_8", connectionProperties);

        Dataset<Row> createDataFrame9 = sqlContext.createDataFrame(filter9,
                schema);
        createDataFrame9.write().mode(SaveMode.Append)
                .jdbc(url, "ord_9", connectionProperties);
        jsc.close();

    }


    private static void ordDetail_full(String filePath, int i) {
        String master = "spark://192.168.2.145:7077";
        lalala = SparkSession.builder().config(new SparkConf().setAppName("lalala")).getOrCreate();
        // String master="local[*]";
        SparkContext sparkContext = lalala.sparkContext();
        SQLContext sqlContext = lalala.sqlContext();
//        SparkReadFile2MysqlFull.sqlContext = new SQLContext(jsc);
        connectionProperties = new Properties();
        connectionProperties.put("user", user);
        connectionProperties.put("password", password);
        connectionProperties.put("driver", "com.mysql.jdbc.Driver");
//        JavaRDD<String> javaRDD = jsc.textFile(filePath);
        final String[] ordDetailFields = {"pk_rep_lis", "name_index_lis", "value_lis", "name_quanti_unit",
                "limit_high", "limit_low", "desc_rrs", "value_flag_name", "create_time", "lis_trend"};
        StructType schema = createStructType(ordDetailFields);

        Dataset<Row> parquet = lalala.read().parquet(filePath);

        Dataset<Row> distinctRDD = parquet.distinct();
        JavaRDD<Row> rowJavaRDD = distinctRDD.toJavaRDD();
//
        // 先映射一下
        JavaRDD<Row> mapRDD = rowJavaRDD.map(new Function<Row, Row>() {
            public Row call(Row row) throws Exception {
                for (int i = 0; i < ordDetailFields.length; i++) {
                    map_k.put(ordDetailFields[i], row.getString(i));
                }
                String value_lis = row.getString(2);//检验结果
                String limit_high = row.getString(4);//最高值
                String limit_low = row.getString(5);//最低值
                String desc_rrs = row.getString(6);//正常结果区间
                String value_flag_name = row.getString(7);//箭头
                String lis_tread = row.getString(9);//检验结果趋势
                try {
                    if (value_lis.replaceAll("[^\d]+", "").isEmpty() || value_lis.equals("")) {
                        lis_tread = value_lis;
                    } else if (value_lis.replaceAll("[\d]+", "").contains(desc_rrs)) {
                        lis_tread = "正常";
                    } else if (("null".equals(limit_high) || "null".equals(limit_low)) && "↑".equals(value_flag_name)) {
                        lis_tread = "偏高";
                    } else if (("null".equals(limit_high) || "null".equals(limit_low)) && "↓".equals(value_flag_name)) {
                        lis_tread = "偏低";
                    } else if (desc_rrs.contains(":")) {
                        lis_tread = desc_rrs;
                    } else if (!value_lis.replaceAll("[^.\d]+", "").isEmpty()
                            && !limit_low.replaceAll("[^.\d]+", "").isEmpty()
                            && !limit_high.replaceAll("[^.\d]+", "").isEmpty()
                            && getCount(value_lis.replaceAll("[^.\d]+", "")) <= 1
                            && getCount(limit_low.replaceAll("[^.\d]+", "")) <= 1
                            && getCount(limit_high.replaceAll("[^.\d]+", "")) <= 1
                            && !".".equals(value_lis.replaceAll("[^.\d]+", ""))
                            && !".".equals(limit_low.replaceAll("[^.\d]+", ""))
                            && !".".equals(limit_high.replaceAll("[^.\d]+", ""))) {
                        if (Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) >= Float
                                .parseFloat(limit_low.replaceAll("[^.\d]+", ""))
                                && Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) <= Float
                                .parseFloat(limit_high.replaceAll("[^.\d]+", ""))) {
                            lis_tread = "正常";
                        }
                        if (Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) >= Float
                                .parseFloat(limit_low.replaceAll("[^.\d]+", "")) * 0.8
                                && Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) < Float
                                .parseFloat(limit_low.replaceAll("[^.\d]+", ""))) {
                            lis_tread = "偏低";
                        }
                        if (Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) >= Float
                                .parseFloat(limit_low.replaceAll("[^.\d]+", "")) * 0.5
                                && Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) < Float
                                .parseFloat(limit_low.replaceAll("[^.\d]+", "")) * 0.8) {
                            lis_tread = "很低";
                        }
                        if (Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) < Float
                                .parseFloat(limit_low.replaceAll("[^.\d]+", "")) * 0.5) {
                            lis_tread = "非常低";
                        }
                        if (Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) > Float
                                .parseFloat(limit_high.replaceAll("[^.\d]+", ""))
                                && Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) <= Float
                                .parseFloat(limit_high.replaceAll("[^.\d]+", "")) * 1.2) {
                            lis_tread = "偏高";
                        }
                        if (Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) > Float
                                .parseFloat(limit_high.replaceAll("[^.\d]+", "")) * 1.2
                                && Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) <= Float
                                .parseFloat(limit_high.replaceAll("[^.\d]+", "")) * 1.5) {
                            lis_tread = "很高";
                        }
                        if (Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) > Float
                                .parseFloat(limit_high.replaceAll("[^.\d]+", "")) * 1.5) {
                            lis_tread = "非常高";
                        }
                    } else {
                        lis_tread = "null";
                    }
                } catch (Exception e) {
                    lis_tread = "null";
                }

                map_k.put("lis_trend", lis_tread);
                return createOrdDetailFullRow(map_k);
            }
        });
        Dataset<Row> dataFrame = sqlContext.createDataFrame(mapRDD, schema);
        dataFrame.repartition(1).write().orc("hdfs://192.168.2.232:9000/datas/parquetFile/ord_detail_second/part" + i);
        sparkContext.stop();
        lalala.stop();
    }

    private static void ordDetail_split(String filePath) {
        String master = "spark://192.168.2.258:7077";
        // String master="local[*]";
        jsc = getContext(master);
        sqlContext = new SQLContext(jsc);
        connectionProperties = new Properties();
        connectionProperties.put("user", user);
        connectionProperties.put("password", password);
        connectionProperties.put("driver", "com.mysql.jdbc.Driver");
        JavaRDD<String> javaRDD = jsc.textFile(filePath);
        final String[] ordDetailFields = {"pk_rep_lis", "name_index_lis", "value_lis", "name_quanti_unit",
                "limit_high", "limit_low", "desc_rrs", "value_flag_name", "create_time", "lis_all", "lis_result"};
        StructType schema = createStructType(ordDetailFields);
        JavaRDD<Row> mapPartitions = javaRDD.map(new Function<String, Row>() {
            private static final long serialVersionUID = 1L;
            ObjectMapper mapper = new ObjectMapper();

            public Row call(String line) throws Exception {
                try {
                    map_t = mapper.readValue(line, Map.class);
                    for (Entry<String, Object> entry : map_t.entrySet()) {
                        map_s.put(entry.getKey(),
                                String.valueOf(entry.getValue()));
                    }
                } catch (Exception e) {
                    return null;
                }
                return createOrdDetailSpiltRow(map_s);
            }
        });

        JavaRDD<Row> filterRDD = mapPartitions
                .filter(new Function<Row, Boolean>() {
                    private static final long serialVersionUID = 1L;

                    public Boolean call(Row row) throws Exception {
                        // TODO Auto-generated method stub
                        if (null == row) {
                            return false;
                        }
                        return true;
                    }
                });
        JavaRDD<Row> distinctRDD = filterRDD.distinct();

        // 先映射一下
        JavaRDD<Row> mapRDD = distinctRDD.map(new Function<Row, Row>() {

            public Row call(Row row) throws Exception {
                for (int i = 0; i < ordDetailFields.length; i++) {
                    map_k.put(ordDetailFields[i], row.getString(i));
                }
                String name_index_lis = row.getString(1);//检验项目
                String value_lis = row.getString(2);//检验结果
                String name_quanti_unit = row.getString(3);//结果单位
                String limit_high = row.getString(4);//最高值
                String limit_low = row.getString(5);//最低值
                String desc_rrs = row.getString(6);//正常结果区间
                String value_flag_name = row.getString(7);//箭头
                String result = row.getString(10);//检验结果趋势result
                if (value_lis.replaceAll("[^\d]+", "").isEmpty() && "null".equals(desc_rrs)) {
                    value_flag_name = value_lis;
                } else if (value_lis.replaceAll("[\d]+", "").contains(desc_rrs)) {
                    value_flag_name = "正常";
                } else if (("null".equals(limit_high) || "null".equals(limit_low)) && "↑".equals(value_flag_name)) {
                    value_flag_name = "偏高";
                } else if (("null".equals(limit_high) || "null".equals(limit_low)) && "↓".equals(value_flag_name)) {
                    value_flag_name = "偏低";
                } else if (desc_rrs.contains(":")) {
                    value_flag_name = desc_rrs;
                } else if (!value_lis.replaceAll("[^.\d]+", "").isEmpty()
                        && !limit_low.replaceAll("[^.\d]+", "").isEmpty()
                        && !limit_high.replaceAll("[^.\d]+", "").isEmpty()
                        && getCount(value_lis.replaceAll("[^.\d]+", "")) <= 1
                        && getCount(limit_low.replaceAll("[^.\d]+", "")) <= 1
                        && getCount(limit_high.replaceAll("[^.\d]+", "")) <= 1
                        && !".".equals(value_lis.replaceAll("[^.\d]+", ""))
                        && !".".equals(limit_low.replaceAll("[^.\d]+", ""))
                        && !".".equals(limit_high.replaceAll("[^.\d]+", ""))) {
                    if (Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) >= Float
                            .parseFloat(limit_low.replaceAll("[^.\d]+", ""))
                            && Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) <= Float
                            .parseFloat(limit_high.replaceAll("[^.\d]+", ""))) {
                        value_flag_name = "正常";
                    }
                    if (Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) >= Float
                            .parseFloat(limit_low.replaceAll("[^.\d]+", "")) * 0.8
                            && Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) < Float
                            .parseFloat(limit_low.replaceAll("[^.\d]+", ""))) {
                        value_flag_name = "偏低";
                    }
                    if (Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) >= Float
                            .parseFloat(limit_low.replaceAll("[^.\d]+", "")) * 0.5
                            && Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) < Float
                            .parseFloat(limit_low.replaceAll("[^.\d]+", "")) * 0.8) {
                        value_flag_name = "很低";
                    }
                    if (Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) < Float
                            .parseFloat(limit_low.replaceAll("[^.\d]+", "")) * 0.5) {
                        value_flag_name = "太低了";
                    }
                    if (Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) > Float
                            .parseFloat(limit_high.replaceAll("[^.\d]+", ""))
                            && Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) <= Float
                            .parseFloat(limit_high.replaceAll("[^.\d]+", "")) * 1.2) {
                        value_flag_name = "偏高";
                    }
                    if (Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) > Float
                            .parseFloat(limit_high.replaceAll("[^.\d]+", "")) * 1.2
                            && Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) <= Float
                            .parseFloat(limit_high.replaceAll("[^.\d]+", "")) * 1.5) {
                        value_flag_name = "很高";
                    }
                    if (Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) > Float
                            .parseFloat(limit_high.replaceAll("[^.\d]+", "")) * 1.5) {
                        value_flag_name = "太高了";
                    }
                } else {
                    value_flag_name = "null";
                }
                StringBuffer bf = new StringBuffer();
                bf.append("[ ");
                bf.append(name_index_lis);
                bf.append(" :");
                bf.append(value_lis);
                bf.append(name_quanti_unit);
                bf.append("	");
                bf.append("区间 :");
                bf.append(desc_rrs);
                bf.append("	");
                bf.append("趋势 :");
                bf.append(value_flag_name);
                bf.append(" ]");
                result = bf.toString();
//                map_k.put("lis_all",name_index_lis);
                map_k.put("lis_result", result);
                return createOrdDetailSpiltRow(map_k);
            }
        });

        JavaPairRDD<String, Row> mapToPairRDD = mapRDD
                .mapToPair(new PairFunction<Row, String, Row>() {
                    private static final long serialVersionUID = 1L;

                    public Tuple2<String, Row> call(Row row) throws Exception {
                        // TODO Auto-generated method stub
                        return new Tuple2<String, Row>(row.getString(0), row);
                    }
                });
        // 分组进行计算
        JavaPairRDD<String, Row> reduceByKeyRDD = mapToPairRDD
                .reduceByKey(new Function2<Row, Row, Row>() {
                    private static final long serialVersionUID = 1L;

                    public Row call(Row v1, Row v2) throws Exception {
                        // TODO Auto-generated method stub
                        String lis_all1 = v1.getString(9);
                        String lis_all2 = v1.getString(9);
                        String result1 = v1.getString(10);
                        String result2 = v2.getString(10);
                        for (int i = 0; i < ordDetailFields.length; i++) {
                            map_r.put(ordDetailFields[i], v1.getString(i));
                        }
//                        map_r.put("lis_all",lis_all1+"
"+lis_all2);
                        map_r.put("lis_result", result1 + "
" + result2);
                        return createOrdDetailSpiltRow(map_r);
                    }
                });
        JavaRDD<Row> map = reduceByKeyRDD.map(new Function<Tuple2<String, Row>, Row>() {
                    private static final long serialVersionUID = 1L;
                    public Row call(Tuple2<String, Row> v1) throws Exception {
                        // TODO Auto-generated method stub
                        return v1._2();
                    }
                });
        JavaRDD<Row> persistRDD = map.persist(StorageLevel.MEMORY_AND_DISK_SER());
        JavaRDD<Row> filter = null;
        Dataset<Row> createDataFrame = null;
        for (final String se : serials) {
            filter = persistRDD.filter(new Function<Row, Boolean>() {
                private static final long serialVersionUID = 1L;

                public Boolean call(Row v1) throws Exception {
                    // TODO Auto-generated method stub
                    return v1.getString(0).startsWith(se);
                }
            });
            if (!filter.isEmpty()) {
                createDataFrame = sqlContext.createDataFrame(filter, schema);
                createDataFrame.write().mode(SaveMode.Append)
                        .jdbc(url, "ord_detail" + se, connectionProperties);
            }
        }
        jsc.close();

    }

    public static Row createOrdDetailSpiltRow(Map<String, String> map) {
        return RowFactory
                .create(map.get("pk_rep_lis"),
                        map.get("name_index_lis"), map.get("value_lis"),
                        map.get("name_quanti_unit"), map.get("limit_high"),
                        map.get("limit_low"), map.get("desc_rrs"),
                        map.get("value_flag_name"),
                        map.get("create_time"),
                        map.get("lis_all"),
                        map.get("lis_result"));

    }

    public static Row createOrdDetailFullRow(Map<String, String> map) {
        return RowFactory
                .create(map.get("pk_rep_lis"),
                        map.get("name_index_lis"), map.get("value_lis"),
                        map.get("name_quanti_unit"), map.get("limit_high"),
                        map.get("limit_low"), map.get("desc_rrs"),
                        map.get("value_flag_name"),
                        map.get("create_time"), map.get("lis_trend"));

    }

    public static Row createEmrdataRow(Map<String, String> map) {
        return RowFactory.create(map.get("code_group"), map.get("code_org"),
                map.get("code_pati"), map.get("create_time"),
                map.get("data_source"), map.get("degree"),
                map.get("edit_time"), map.get("empi"), map.get("flag_del"),
                map.get("inputfield"), map.get("pcode"), map.get("phrcd"),
                map.get("pk_dcemr"), map.get("pk_rec_data"), map.get("pname"),
                map.get("prtype"), map.get("punit"), map.get("pvcode"),
                map.get("rdn"), map.get("relardn"), map.get("remark"),
                map.get("setno"), map.get("source_pk"), map.get("stay"),
                map.get("valuetype"));
    }

    public static Row createEmerIndexRow(Map<String, String> map) {
        return RowFactory.create(map.get("pk_dcemr"), map.get("pk_dcpv"),
                map.get("empi"), map.get("code_pati"), map.get("code_pvtype"),
                map.get("code_ref_emr"), map.get("code_emr_type"),
                map.get("name_emr_type"), map.get("prlog_rdn"),
                map.get("code_dept_emr"), map.get("name_dept_emr"),
                map.get("code_psn_edit"), map.get("data_source"),
                map.get("source_pk"), map.get("create_time"));
    }

    public static Row createOrdRow(Map<String, String> map) {
        return RowFactory.create(
                map.get("pvcode"),
                map.get("pk_dcpv"),
                map.get("empi"),
                map.get("code_pvtype"),
                map.get("name_pvtype"),
                map.get("code_sex"),
                map.get("name_sex"),
                map.get("birthday"),
                map.get("code_dept"),
                map.get("name_dept"),
                map.get("code_ord"),
                map.get("code_orditem_type"),
                map.get("name_orditem_type"),
                map.get("code_orditem"),
                map.get("name_orditem"),
                map.get("date_create"),
                map.get("date_end"),
                map.get("note_ord"),
                map.get("code_pres"),
                map.get("parent_code"),
                map.get("create_time"));

    }

    public static Row createOrdRecRow(Map<String, String> map) {
        return RowFactory.create(
                map.get("pk_ord_record"),
                map.get("pk_dcord"),
                map.get("pk_dcpv"),
                map.get("code_pvtype"),
                map.get("name_pvtype"),
                map.get("pvcode"),
                map.get("code_ord"),
                map.get("empi"),
                map.get("code_pati"),
                map.get("code_sex"),
                map.get("name_sex"),
                map.get("age"),
                map.get("code_dept"),
                map.get("name_dept"),
                map.get("bed"),
                map.get("pk_dcordrisreq"),
                map.get("code_req"),
                map.get("code_rep"),
                map.get("code_rep_type"),
                map.get("name_rep_type"),
                map.get("code_eu_type"),
                map.get("name_eu_type"),
                map.get("code_eu_item"),
                map.get("name_eu_item"),
                map.get("create_time"));

    }

    private static Integer getCount(String source) {
        String replace = source.replace(".", "");
        return source.length() - replace.length();
    }


}
原文地址:https://www.cnblogs.com/kwzblog/p/10180436.html