spark基础(1)

将相同国家进行分组,然后将count相加sum(count), 对sum(count)进行排序,输出top5

    val path="/Volumes/Data/BigData_code/data/flight-data/csv/2015-summary.csv"
    val data = spark.read.option("inferSchema", "true").option("header", "true").csv(path)
    //查询前5个count max 的国家
    data.groupBy("DEST_COUNTRY_NAME").sum("count")
      .withColumnRenamed("sum(count)", "destination_total")
      .sort(desc("destination_total"))
      .limit(5).show()

代码的执行如图:

查看用户在一天内进行采集所用费用最多的日期:
下面是表格的格式:

    //添加一个列用于统计总费用,并查看用户话费最多的是哪个日期
    val selectData = staticData.selectExpr("CustomerId", "(UnitPrice * Quantity) as total_cost", "InvoiceDate")
    selectData.show()
    //进行分组:分组的标准:客户的ID 和 购买的时间(一天内为相同标准), 并对分组的内容进行统计
    val groupData = selectData.groupBy(
      col("CustomerId"), window(col("InvoiceDate"), "1 day")
    ).sum("total_cost")
    groupData.show(5)

window函数:https://blog.csdn.net/weixin_38653290/article/details/83962789

使用流处理实现相同功能

    //进行分组:分组的标准:客户的ID 和 购买的时间(一天内为相同标准), 并对分组的内容进行统计
    val streamData = spark.readStream.schema(staticSchema) //设置分区
      .option("maxFilesPerTrigger", 1) //设置一次读入的文件个数
      .format("csv")
      .option("header", "true")
      .load(path)
    //执行相同的逻辑操作
    val streamGroupData = streamData.selectExpr(
      "CustomerId", "(UnitPrice * Quantity) as total_cost", "InvoiceDate"
    ).groupBy(
      $"CustomerId", window($"InvoiceDate", "1 day")
    ).sum("total_cost")

注意由于流处理和静态处理不一样,所以无法使用静态处理中的动作操作。流处理是将流处理的结果放入内存的一个表中。每一次处理完,不断的更新这个表即可

    //将结果存入内存中
    streamGroupData.writeStream.format("memory")    //表示存入内存中
      .queryName("streamGroupData")     //表示存入内存的表的名字
      .outputMode("complete")     //complete表示表中所有记录
      .start()

然后查询

    //对流处理后的结果进行查询
    spark.sql(
      """
        |select *
        |from streamGroupData
        |order by 'sum(total_cost)' desc
        |""".stripMargin).show(5)
原文地址:https://www.cnblogs.com/ALINGMAOMAO/p/14432200.html