spark学习

一。读取文件并输出到控制台

person.json

{"name":"json","age":23,"hobby":"running"}
{"name":"charles","age":32,"hobby":"basketball"}
{"name":"tom","age":28,"hobby":"football"}
{"name":"lili","age":24,"hobby":"running"}
{"name":"bob","age":20,"hobby":"swimming"}

读取文件

StructType structType  = new StructType().add("name","string").add("age","integer").add("hobby","string");
Dataset<Row> dr = spark.readStream()
.schema(structType)
.json("/Users/huyanxia/Documents/ceshi");//文件夹而不是文件名称

输出

dr.map(new MapFunction<Row, String>(){

                       @Override
                       public String call(Row row) throws Exception {
                           System.out.println("fdsg");
                           HashMap hashMap =new HashMap();
                           //这是一个遍历操作,row即表示为当前行数据,get(i)表示当前行的第几列
                           hashMap.put(row.get(0),row.get(1));
                           return return;
                       }
                   },Encoders.javaSerialization(Map.class)
            ).writeStream()
                    .outputMode("append")
                    .format("console")//输出到控制台
                    .start()
                    .awaitTermination();

  二。使用list构建Dataset

List<String> list = new ArrayList<>();
            list.add("fawf");
            list.add("feaf");
            list.add("jfki urer");
            Dataset<String> ds  = spark.createDataset(list, Encoders.STRING());
            ds.first().substring(1,2);
            ds.show();
            Dataset<String> f = ds.filter(value->value.contains("a"));
            f.show();
            ds.map(new MapFunction<String, String>(){

                @Override
                public String call(String venderCode) throws Exception {
                    System.out.println("fdsg");
                    return venderCode+"fedfef";
                }
            }, Encoders.STRING()).show();

  

原文地址:https://www.cnblogs.com/zhima-hu/p/13658372.html