spark的广播变量

直接上代码:
包含了,map,filter,persist,mapPartitions等函数

 String master = "spark://192.168.2.279:7077";
//         jsc = getContext("local[2]");
        jsc = getContext(master);
        sqlContext = new SQLContext(jsc);
        connectionProperties = new Properties();
        connectionProperties.put("user", user);
        connectionProperties.put("password", password);
        connectionProperties.put("driver", "com.mysql.jdbc.Driver");
//        emrdataIndedx(filePath, jsc, sqlContext);//加载emrdataindex数据到mysql
        JavaRDD<String> javaRDD = jsc.textFile(filePath);
        String[] fields = {"pk_dcpv", "code_pvtype", "name_pvtype", "code_ord", "empi", "code_sex"
                , "name_sex", "birthday", "age", "code_dept", "name_dept", "bed", "pk_dcordrisreq"
                , "code_req", "code_rep", "code_rep_type", "name_rep_type", "code_state", "name_state"
                , "code_eu_type", "name_eu_type", "code_eu_item", "name_eu_item", "code_part"
                , "name_part", "create_time", "code_pres", "parent_code"};
        String[] old_type = {"D", "GYN", "X ", "MR ", "L05", "L04",
                "L12", "B ", "OTHC", "DOS", "ECG", "CT ", "UIS", "L02",
                "RIS", "SY ", "CB ", "L01", "ENT", "L03", "EYE", "NSC",
                "L07", "EMG", "NEU", "PTH", "DC", "INF", "GC", "L08",
                "L09", "BD", "L26", "ECT", "GM", "GP", "L10", "EDO",
                "L11", "DER", "EEG", "URO", "PFT", "L25", "RF", "OTH",
                "PIS", "PMR", "PSY", "MPL", "BM", "Z", "EIS", "BED", "BLD",
                "L27", "FOD", "R", "GYP", "CTD", "BDT", "L99", "EUS", "HNS",
                "L91", "SED", "L28", "F", "IED", "FOW", "L31", "OO", "P01", "L13"};
        //广播变量
        final Broadcast<String[]> broadcast = jsc.broadcast(old_type);
        StructType schema = createStructType(fields);
        JavaRDD<Row> mapPartitions1 = javaRDD.mapPartitions(new FlatMapFunction<Iterator<String>, Row>() {
            private static final long serialVersionUID = 1L;
            ObjectMapper mapper = new ObjectMapper();

            @SuppressWarnings("unchecked")
            public Iterator<Row> call(Iterator<String> iterator)
                    throws Exception {
                ArrayList<Row> arrayList = new ArrayList<Row>();
                // TODO Auto-generated method stub
                while (iterator.hasNext()) {
                    try {
                        String next = iterator.next();
                        map_t = mapper.readValue(next, Map.class);
                        for (Entry<String, Object> entry : map_t.entrySet()) {
                            map_s.put(entry.getKey(), String.valueOf(entry.getValue()));
                        }
                    } catch (Exception e) {
                        return null;
                    }
                    Row createOrdPart3Row = createOrdPart3Row(map_s);
                    arrayList.add(createOrdPart3Row);

                }
                return arrayList.iterator();
            }
        });
        JavaRDD<Row> mapPartitions2 = mapPartitions1.filter(new Function<Row, Boolean>() {
            private static final long serialVersionUID = 1L;

            public Boolean call(Row row) throws Exception {
                // TODO Auto-generated method stub
                String pk_dcpv1 = row.getString(0);
                String code_pvtype1 = row.getString(1);
                String code_rep_type1 = row.getString(15);
                return pk_dcpv1.split("_").length == 2
                        && (!"".equals(code_pvtype1) || null != code_pvtype1 || !"P".equals(code_pvtype1))
                        && Arrays.asList(broadcast.value()).contains(code_rep_type1);
            }
        });
        //broadcast不用就销毁
        broadcast.destroy();
        JavaRDD<Row> mapPartitions = mapPartitions2.repartition(100);
        JavaRDD<Row> persist = mapPartitions.persist(StorageLevel.MEMORY_AND_DISK_SER());
        JavaRDD<Row> filter1 = persist.filter(new Function<Row, Boolean>() {
            private static final long serialVersionUID = 1L;

            public Boolean call(Row row) throws Exception {
                // TODO Auto-generated method stub
                return row.getString(0).startsWith("1");
            }
        });
原文地址:https://www.cnblogs.com/kwzblog/p/10180281.html