scala对复杂json的处理

本次代码主要侧重为flink stream流解析cannal-json,经过多次实验,发现还是阿里的fastjson较为好用,故在此做记录

将依赖引入

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.33</version>
</dependency>

案例数据,json数据:
{
    "data": [
        {
            "name": "张三", 
            "age": "22", 
            "xb": ""
        }
    ], 
    "database": "test", 
    "es": 1623029629000, 
    "id": 1, 
    "isDdl": false, 
    "mysqlType": {
        "name": "varchar(500)", 
        "age": "int(11)", 
        "xb": "varchar(20)"
    }, 
    "old": null, 
    "pkNames": null, 
    "sql": "", 
    "sqlType": {
        "name": 12, 
        "age": 4, 
        "xb": 12
    }, 
    "table": "mysql_result1", 
    "ts": 1623029681690, 
    "type": "INSERT"
}
案例:本次将通过fink stream读取kafka中json数据,然后过滤出插入的数据,并且拿到数据,代码如下
package it.bigdata.flink.study.test

import java.util.Properties

import com.alibaba.fastjson.JSON
import it.bigdata.flink.study.test.entity.MysqlResult
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer


object SteamBinLog {
  def main(args: Array[String]): Unit = {
    //创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    //3.从kafka中读取数据
    val props = new Properties()
    props.setProperty("bootstrap.servers","10.18.35.155:9092,10.18.35.156:9092,10.18.35.157:9092")
    //    props.setProperty("group.id","consumer-group")
    val stream3 = env.addSource(new FlinkKafkaConsumer[String]("test_mysql_result1", new SimpleStringSchema(), props)
    .setStartFromEarliest())
    val stream4 = stream3.map(data => {
      val json= JSON.parseObject(data)
      json
    })
        .filter(_.get("type").equals("INSERT"))
        .map(data=>{
          val json = JSON.parseObject(data.getJSONArray("data").get(0).toString)
          val my = new MysqlResult()
          my.setName(json.getString("name"))
          my.setAge(json.getIntValue("age"))
          my.setXb(json.getString("xb"))
          my
        })

    stream4.print()


    env.execute("stream binlog")
  }


}

下边是报错json的相关操作,复制的原为内容:https://segmentfault.com/a/1190000039415392

1.将数据转为json

import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
import org.json4s.jackson.Serialization._
import org.json4s.jackson.Serialization
case class WOE(col: String, woe: Map[String, String])
implicit val formats = Serialization.formats(NoTypeHints)
val testMap = Map[String, String]()
testMap += ("1" -> "1.1")
val a = WOE("1", immutable.Map(testMap.toList:_*))
println(write(a)) 

输出{"col":"1","woe":{"1":"1.1"}}

2、解析json

implicit val formats = Serialization.formats(NoTypeHints)
val js =
"""
{"col":"1","woe":{"1":"1.1"}}
"""
val ab = parse(js).extract[WOE]
println(write(ab)) 

如果是List也可以

implicit val formats = Serialization.formats(NoTypeHints)

    val b = new ListBuffer[WOE]
    val testMap = Map[String, String]()
    testMap += ("1" -> "1.1")
    b += WOE("1", immutable.Map(testMap.toList:_*))
    b += WOE("3", immutable.Map(testMap.toList:_*))
    println(write(b))

val js =
      """
 [{"col":"1","woe":{"1":"1.1"}},{"col":"3","woe":{"1":"1.1"}}]
      """
    val ab = parse(js).extract[List[WOE]]
    println(ab.toString) 

1、scala自带的Json解析
scala 2.10(以上,其他版本不清楚)自带Json解析,scala.util.parsing.json.JSON
object转json

val testMap = Map[String, String]()
    testMap += ("1" -> "2.034")
    testMap += ("2" -> "2.0134")
    println(scala.util.parsing.json.JSONObject(scala.collection.immutable.Map(testMap.toList: _*))) 

但好像只能处理map,且map要转成immutable

2、fastjson

解析json

import com.alibaba.fastjson.JSON
object JsonDemo {
  def main(args: Array[String]) {
    val text = "{"name":"name1", "age":55}"
    val json = JSON.parseObject(text)
    println(json.get("name"))
    println(json.get("age"))
  }
} 

再例如

import com.alibaba.fastjson.JSON

object Json {
  def main(args: Array[String]): Unit = {
    val str2 = "{"et":"kanqiu_client_join","vtm":1435898329434,"body":{"client":"866963024862254","client_type":"android","room":"NBA_HOME","gid":"","type":"","roomid":""},"time":1435898329}"
       val json=JSON.parseObject(str2)
       //获取成员
       val fet=json.get("et")
       //返回字符串成员
       val etString=json.getString("et")
       //返回整形成员
       val vtm=json.getInteger("vtm")
       println(vtm)
       //返回多级成员
       val client=json.getJSONObject("body").get("client")
       println(client) 

在spark-steaming中,使用fast-json更加稳定,json-lib经常出现莫名问题,而且fastjson的解析速度更快.

object转json,首先必须要显式的定义参数,否则会报错

ambiguous reference to overloaded definition
1
例如:

val testMap = Map[String, String]()
testMap += ("1" -> "2.034")
testMap += ("2" -> "2.0134")
val a = JSON.toJSONString(testMap, true)
println(a) 

不会报错,但是输出结果是奇怪的

{
    "empty":false,
    "sizeMapDefined":false,
    "traversableAgain":true
} 

3、json4s

object转json

val testMap = Map[String, String]()
testMap += ("1" -> "2.034")
testMap += ("2" -> "2.0134")
val jj = compact(render(testMap))
println(jj) 

输出

[{"2":"2.0134"},{"1":"2.034"}]

如果都是String,复杂的Map结构也可以解析

val testMap = Map[String, Map[String, String]]()
val subMap = Map[String, String]()
subMap += ("1" -> "1.1")
testMap += ("1" -> subMap)
println(write(testMap)) 

输出{"1":{"1":"1.1"}}
但这样的形式不利于解析

再例如

implicit val formats = Serialization.formats(NoTypeHints)
val m = Map(
      "name" -> "john doe",
      "age" -> 18,
      "hasChild" -> true,
      "childs" -> List(
        Map("name" -> "dorothy", "age" -> 5, "hasChild" -> false),
        Map("name" -> "bill", "age" -> 8, "hasChild" -> false)))
    val mm = Map(
      "1" -> Map ("1"->"1.2")
    )
println(write(a)) 

TEST

package com.dfssi.dataplatform.analysis.exhaust.alarm

import java.sql.Timestamp
import java.util

import com.alibaba.fastjson.serializer.SerializerFeature
import org.apache.spark.Logging
import org.json4s.NoTypeHints

//将要解析得数据
case class NeedEntity(val vin: String,
                      val downoutput: Double,
                      val collectTime: Long,
                      val lon: Double,
                      val lat: Double,
                      val failureList: java.util.List[Integer] = new util.ArrayList[Integer]()
                     ) extends Serializable

//管理状态
//这是事件管理得 按照每个事件来处理

class OverLimitEvent(var vin: String,
                     var startTime: Long,
                     var startLon: Double,
                     var startLat: Double,
                     var eventType:String="overlimit",
                      var endTime: Long = 0,
                      var endLon: Double = 0.0,
                      var endLat: Double = 0.0,
                      var minValue: Double = 0.0,
                      var maxValue: Double = 0.0
                         ) extends Serializable with Logging{

  def getInsertMap(): Map[String, Any] = {
    Map(
      "vin" -> vin,
      "startTime" -> new Timestamp(startTime),
      "startLon" -> startLon,
      "startLat" -> startLat
    )
  }

  def getUpdateMap(): Map[String, Any] = {
    Map(
      "vin" -> vin,
      "startTime" -> new Timestamp(startTime),
      "endTime" -> new Timestamp(startTime),
      "endLon" -> startLon,
      "endLat" -> startLat,
      "maxValue" -> maxValue,
      "minValue" -> minValue
    )
  }

  def updateByEntity(entity: NeedEntity) = {
    this.endTime = entity.collectTime
    this.endLat = entity.lat
    this.endLon = entity.lon
    if (this.maxValue != null && this.maxValue < entity.downoutput) {
      this.maxValue = entity.downoutput
    }
    if (this.minValue != null && this.minValue > entity.downoutput) {
      this.minValue = entity.downoutput
    }

  }

  override  def toString(): String ={
    import org.json4s.jackson.Serialization._
    import org.json4s.jackson.Serialization
    implicit val formats = Serialization.formats(NoTypeHints)
    write(this)
  }

}

object OverLimitEvent {
  val ID_FIELD = Array("vin", "startTime")
  def apply(
             vin: String,
             startTime: Long,
             startLon: Double,
             startLat: Double,
             endTime: Long,
             endLon: Double,
             endLat: Double,
             minValue: Double,
             maxValue: Double
           ): OverLimitEvent = {
    val event = new OverLimitEvent(vin, startTime, startLon, startLat)
    event.endTime = endTime
    event.endLat = endLat
    event.endLon = endLon
    event.maxValue = maxValue
    event.minValue = minValue
    event
  }

  def buildByEntity(entity: NeedEntity): OverLimitEvent = {
    new OverLimitEvent(entity.vin, entity.collectTime, entity.lon, entity.lat)
  }

  def buildByJson(json: String): OverLimitEvent = {
    com.alibaba.fastjson.JSON.parseObject(json, classOf[OverLimitEvent])
  }

  override  def toString(): String ={
    import org.json4s.jackson.Serialization._
    import org.json4s.jackson.Serialization
    implicit val formats = Serialization.formats(NoTypeHints)
    write(this)
  }

}

case class ExhaustAlarmStatus(val vin: String, var overLimitEvent: OverLimitEvent=null,var faultEvent:Map[String,OverLimitEvent]=null, var lastTime: Long) {
  override  def toString(): String ={
    import org.json4s.jackson.Serialization._
    import org.json4s.jackson.Serialization
    implicit val formats = Serialization.formats(NoTypeHints)
    write(this)
  }
}

object ExhaustAlarmStatus {
  def buildByJson(json: String): ExhaustAlarmStatus = {
    if(json!=null){
        com.alibaba.fastjson.JSON.parseObject(json,
        classOf[ExhaustAlarmStatus])
    }else{
      null
    }
  }

  def toJSON(state: ExhaustAlarmStatus): String = com.alibaba.fastjson.JSON.toJSONString(state, SerializerFeature.PrettyFormat)

  def main(args: Array[String]): Unit = {
    val json = "{"vin":"222", "OverLimitEvent":{ "vin":"222",  "startTime":123456789, "startLon":1.0, "startLat":1.0, "endTime":123456789, "endLon":1.0, "endLat":1.0, "minValue":1.0, "maxValue":1.0 },"lastTime":1556441242000}";
    val state  = com.alibaba.fastjson.JSON.parseObject(json,
      classOf[ExhaustAlarmStatus])
    println(state.overLimitEvent)
    import org.json4s.JsonDSL._
    import org.json4s.jackson.JsonMethods._
    import org.json4s.jackson.Serialization._
    import org.json4s.jackson.Serialization
    implicit val formats = Serialization.formats(NoTypeHints)
    val jsonstr = write(state)
    println(jsonstr)
  }

}
 
author@nohert
原文地址:https://www.cnblogs.com/gzgBlog/p/14859689.html