运营系统一站式分析模型

运营系统分析平台技术设计:

  1. 项目定义于运营系统关键指标的数据分析
  2. 关键代码描述:
    1. HiveWriter 主要用于写hive表抽象,包括加分区,写hive表,写success文件:  
      import org.apache.hadoop.fs.{FileSystem, Path}
      import org.apache.spark.sql.SparkSession
      
      class HiveWriter(tableName: String, outputDir: String, currentDate: String) {
      
        val currentYear = currentDate.substring(0, 4)
        val currentMonth = currentDate.substring(4, 6)
        val currentDay = currentDate.substring(6, 8)
      
        //加分区
        def addPartition(ss: SparkSession, hour: String): Unit = {
          var query: String = ""
          if(hour == ""){
            query =
              s"""ALTER TABLE $tableName ADD IF NOT EXISTS
                 |PARTITION(year='$currentYear',month='$currentMonth',day='$currentDay')
                 |LOCATION '$outputDir/$currentYear/$currentMonth/$currentDay'
                """.stripMargin
          }else{
            query =
              s"""ALTER TABLE $tableName ADD IF NOT EXISTS
                 |PARTITION(year='$currentYear',month='$currentMonth',day='$currentDay',hour=$hour)
                 |LOCATION '$outputDir/$currentYear/$currentMonth/$currentDay/$hour'
                """.stripMargin
          }
          ss.sql(query)
        }
      
        //写hive表
        def save(ss:SparkSession, name: String, hour: String): Unit = {
          var query: String = ""
          if(hour == ""){
            query =
              s"""
                 |INSERT overwrite TABLE ${tableName} partition(YEAR=${currentYear},MONTH=${currentMonth},DAY=${currentDay})
                 |SELECT *
                 |FROM ${name}
             """.stripMargin
          }else{
            query =
              s"""
                 |INSERT overwrite TABLE ${tableName} partition(YEAR=${currentYear},MONTH=${currentMonth},DAY=${currentDay},HOUR=${hour})
                 |SELECT *
                 |FROM ${name}
             """.stripMargin
      
          }
          ss.sql(query)
        }
      
        //写success文件
        def writeSuccFile(ss: SparkSession, hour: String): Unit = {
          val conf = ss.sparkContext.hadoopConfiguration
          val hdfs = FileSystem.get(conf)
          var path: String = ""
          if(hour == ""){
            path = s"${outputDir}/${currentYear}/${currentMonth}/${currentDay}/_SUCCESS"
          }else{
            path = s"${outputDir}/${currentYear}/${currentMonth}/${currentDay}/${hour}/_SUCCESS"
          }
          val success = new Path(path)
          hdfs.create(success)
        }
      
      }
      HiveConf
    2. QueryConf 主要用于根据传递来的表,时间等信息构建各种ETL需要的SQL:
      class QueryConf(tableName: String, currentDate: String, outputDir: String, callTableName: String) {
      
        val oneWeekAgo = CommonUtils.calTime(currentDate, Calendar.DATE, -6)
      
        //查询字段
        val resFields = Seq("pope_act_id","group_id","pid","order_id","is_complete","pay_suc_time")
      
        val payQuery =
          s"""
             |SELECT DISTINCT cast(order_id AS string) order_id,
             |                pay_suc_time
             |FROM table
             |WHERE concat(YEAR,MONTH,DAY) = ${currentDate}
      
             |AND pay_suc_time != ''
             |AND pay_suc_time IS NOT NULL
             |AND order_id IS NOT NULL""".stripMargin
      
        val callQuery =
          s"""
             |SELECT DISTINCT pope_act_id,
             |                group_id,
             |                pid,
             |                order_id,
             |                is_complete
             |FROM ${callTableName}
             |WHERE concat(YEAR,MONTH,DAY) BETWEEN ${oneWeekAgo} AND ${currentDate}
             |AND pope_act_id != ''
             |AND pid != 0""".stripMargin
      
      }
      QueryConf
    3. Process 执行器代码主要用于实际执行的封装: 
      import org.apache.spark.sql.SparkSession
      
      class Process(ss: SparkSession, queryConf: QueryConf, writer: HiveWriter) {
      
        def processPay(): Unit = {
      
          val payRawData = ss.sql(queryConf.payQuery)
          val callRawData = ss.sql(queryConf.callQuery)
          val payData = payRawData.join(callRawData,Seq("order_id"),"inner")
            .selectExpr(queryConf.resFields: _*)
      
          val name = "pay_tbl"
          payData.createOrReplaceTempView(name)
      
          writer.addPartition(ss, "")
          writer.save(ss, name, "")
          writer.writeSuccFile(ss, "")
      
        }
      
      }
      object Process {
        def apply(ss: SparkSession, queryConf: QueryConf, writer: HiveWriter): Process = {
          new Process(ss, queryConf, writer)
        }
      }
      View Code
    4. 程序串联 : 
      val Array(appName, tableName, currentDate, outputDir, callTableName) = args
          val ss = SparkSession.builder().appName(appName).enableHiveSupport().getOrCreate()
      
          val queryConf = new QueryConf(tableName, currentDate, outputDir, callTableName)
          val writer = new HiveWriter(tableName, outputDir, currentDate)
      
          val processor = Process(ss,queryConf,writer)
          processor.processPay()
      
          ss.stop()
      串联程序
  3. 关键技术细节:
    1.  SQL字符串表达式建议如下:s"""sql""".stripMargin,更整洁更好维护复杂的SQL
    2. 涉及到时间计算,预先unix_timestamp(observe_end_time) 在SQL中转化成时间戳,在map算子中只做最简单的判断计算
    3. Scala 枚举:
      object ActivityCallTBL extends Enumeration {
          type ActivityCallTBL = Value
          val callStatus = Value("gulfstream_ods.ods_binlog_d_order_status_increment")
          val callBase = Value("gulfstream_ods.ods_binlog_d_order_base_increment")
        }
      View Code
    4. 定义结构体: {} 里面能写复杂代码定义产出结果,适用于一句话搞不定返回值的情况
    5. ROW_NUMBER()函数,常用案例包括 
      1.   取出每种rate下score最大的两条记录
      2.   SQL实例:以下意思就是取a_birth_time最早的order信息:
        select order_id, passenger_id pid, product_id, a_birth_time ,unix_timestamp(a_birth_time) tim, area, starting_lng, starting_lat, dest_lng, dest_lat,  type order_type, combo_type, require_level, extra_info
         from ( select * , row_number() over (partition by order_id order by a_birth_time ) as tid
         from  gulfstream_ods.ods_binlog_d_order_base_increment
         where concat(year,month,day,hour)='2017112309' ) a where tid = 1
        View Code
    1. sql cast功能: cast(order_id AS string) order_id, spark.sql实现: df.select(df("colA").cast("int"))
    2. sql nvl功能 : nvl(t1.a_create_time,'1971-01-01 00:00:00') AS create_time

    3. spark select Seq() : selectExpr(Seq(): _*)
    4. import org.apache.spark.sql.functions  定义spark中用Scala实现的各种SQL常用算子
    5. withColumn("is_complete",functions.lit(0))  使用常数类型构造一个新的column,withColumn("testkey", concat($"apollo_exp_id", lit(":0:"), $"apollo_group_id"))
    6. df("colA") =!= df("colB")   column不等于算子
    7. 添加UDF函数 val genCarpool = spark.udf.register("genCarpool", func), 使用ds.withColumn("ds_carpool", genCarpool(ds("raw_ct")))
    8. import org.apache.spark.sql.functions._ 之后就能使用   from_unixtime(Colunm) 函数了。
    9. import spark.implicits._   牛逼的隐式转换把Scala类型 转化为  DF的类型,包括能使用df.select($"sss")
    10. functions.explode : spark中集合类打平函数,会把一个集合中的多个元素打平成多个Row
    11. df.drop  删除指定的column
    12. functions.when : functions.when(resPre("new_carpool").contains(resPre("ds_carpool")) and ???,1).otherwise(0), 关键技巧先转换成String(1,2,3,4) 再判断contains基本值
    13. coalesce(10) 在结果DF之后加上控制写hive表最合适的part个数
    14. package object activity 包对象更好适合常量场景
    15. <: 继承关系,[T <: Conf]
    16. DF启用别名: activeMarketingActDF.alias("a").join(odsGCouponRuleInfoDF.alias("b"), $"a.info_id" === $"b.id").selectExpr("a.id")

    17. flatMap函数: 当map中返回的是 LIST 会把结果打平
    18. selectExpr表达式: ds.selectExpr("colA", "colB as newName", "abs(colC)")
    19. 多个列中最大,最小值: greatest(exprs: Column*)  跳过null least(exprs: Column*) 跳过null 取最小的值
    20. 其他有效函数

      1. between
      2. isNaN
      3. isNull
    21. sparkGC太多可以优化点如下

      1. 调大task个数,更快GC,减少长时间GC时间拖慢整个进程。优化参数如下
        1. --conf spark.dynamicAllocation.maxExecutors=1000
          --conf spark.executor.cores=4

      2. 调大执行器的年轻代大小: --conf "spark.executor.extraJavaOptions=-XX:NewSize=2048m" 
    22. YarnScheduler: Lost executor 13 on bigdata-hdp-apache1005.xg01.diditaxi.com: Container killed by YARN for exceeding memory limits. 12.5 GB of 12 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.   解决手段:
      1.  --conf spark.executor.memory=12g
      2. repartition 设置大点
      3. --conf  spark.shuffle.spill.numElementsForceSpillThreshold=1500000 ,内存中row的数量超过这个值,不再放到内存直接写磁盘
原文地址:https://www.cnblogs.com/yyystar/p/7892862.html