栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Scala解析和组装json(使用orj.json/json4s)————附带详细代码和示例

Scala解析和组装json(使用orj.json/json4s)————附带详细代码和示例

文章目录

0 背景1 数据准备(读入数据)2 解析json文件3 组装json4 json写入到文件

4.1 直接写入4.2 格式化json字符串后写入 4 使用json4s

0 背景

因为项目项目需要实现修改json数据后重新写入,原本解析json使用的json4s(Scala3貌似可以去使用circe-json),但是要实现替换json中的值然后重新写回到文件中实现起来方便(没有找到相对应的借口),于是去寻找了一个相对易用的包。找到了org.json,于是就用此包实现了相对应的功能。

1 数据准备(读入数据)

包:

import java.io._
import org.json._
import org.apache.commons.io.FileUtils
import java.io.File

//写入的包
import java.io.FileOutputStream
import java.io.OutputStreamWriter

读入文件并转为jsonObject对象:

  val filePath = "/user/kafka.json"
  val file = new File(filePath)
  val content= FileUtils.readFileToString(file,"UTF-8")
  val jsonObject:JSonObject =new JSONObject(content)
2 解析json文件

例子如下:

{
  "description": "This is json File",
  "stu.info": [
    {
    "name":"xiaoming",
    "sex":"boy",
    "grade":98,
    "isOverseasStudent":false
    },
    {
    "name":"xiaohua",
    "sex":"gril",
    "grade":89,
    "isOverseasStudent":true
    }],
    "school":["USTC", "ZJU", "PK", "QH"]
}

解析代码如下:

  //得到“description”信息【objct对象】
  jsonObject.getString("description")

  //解析info【json Array(含json object)】
  val studInfoJsonArray = jsonObject.getJSONArray("stu.info")

  var i: Int = 0
  while ( i < studInfoJsonArray.length) {
    val tmpJsonOject = studInfoJsonArray.getJSonObject (i)
    println(tmpJsonOject.getString("name"))
    println(tmpJsonOject.getBoolean("isOverseasStudent"))
    i += 1
  }
  
  //【json Array】
  val schoolJsonArray = jsonObject.getJSONArray("school")

  var i: Int = 0
  while ( i < schoolJsonArray.length) {
    val school = schoolJsonArray.getString (i)
    println(school)
    i += 1
  }
  


3 组装json

这里通过实现替换xiaoming的成绩为100分和替换学校中的PK为SJ,来展示组装功能.

  var writeJsonObject =new JSONObject(content)
  var tmpStuInfoJsonArray = new JSONArray()
  var tmpSchoolJsonArray = new JSONArray()
  writeJsonObject.put("description", jsonObject.getString("description"))

  val studInfoJsonArray = jsonObject.getJSONArray("stu.info")
  var i: Int = 0
  while ( i < studInfoJsonArray.length) {
    val tmpJsonOject = studInfoJsonArray.getJSonObject (i) //临时替换的对象
    if(tmpJsonOject.getString("name") == "xiaoming"){
      tmpJsonOject.put("grade", 80)
    }else{}
    tmpStuInfoJsonArray.put(tmpJsonOject)
    i += 1
  }
  writeJsonObject.put("stu.info", tmpStuInfoJsonArray)

  val schoolJsonArray = jsonObject.getJSONArray("school")
  var i: Int = 0
  while ( i < schoolJsonArray.length) {
    val school = schoolJsonArray.getString (i)
    if(school == "PK"){
      tmpSchoolJsonArray.put("SJ")
    }else{
      tmpSchoolJsonArray.put(school)
    }
    i += 1
  }
  writeJsonObject.put("school", tmpSchoolJsonArray)

4 json写入到文件 4.1 直接写入
  val osw:OutputStreamWriter = new OutputStreamWriter(new FileOutputStream(orderFilePath, false), "UTF-8")
  osw.write(writeJsonObject.toString)
  osw.flush //清空缓冲区,强制输出数据

写入后的结果如下:

{"description":"This is json File","stu.info":[{"name":"xiaoming","sex":"boy","grade":80,"isOverseasStudent":false},{"name":"xiaohua","sex":"gril","grade":89,"isOverseasStudent":true}],"school":["USTC","ZJU","SJ","QH"]}
4.2 格式化json字符串后写入

格式化json的类(可以根据自己的需求,增改对字符的判断):

import scala.util.control.Breaks.{break, breakable}

class JsonFormatTool {
  //是否使用制表符(默认:四个空格)
  private val isTab = false

  def stringToJSON(strJson: String): String = {
    var isString = false
    var tabNum = 0
    val jsonFormat = new StringBuffer
    val length = strJson.length
    var i: Int = 0
    while ( {
      i < length
    }) {
      breakable {
        val c = strJson.charAt(i)
        if(c == '"'){
          if(isString){
            isString = false
          }else{
            isString = true
          }
          jsonFormat.append(c)
        }else if (c == '{' || c == '[') {
          if(isString == false){
            tabNum += 1
            jsonFormat.append(c + "n")
            jsonFormat.append(getSpaceOrTab(tabNum))
          }else{
            jsonFormat.append(c)
          }
        }
        else if (c == '}' || c == ']') {
          if(isString == false){
            tabNum -= 1
            jsonFormat.append("n")
            jsonFormat.append(getSpaceOrTab(tabNum))
          }
            jsonFormat.append(c)
        }
        else if (c == ',') {
          if (isString == false) {
            jsonFormat.append(c + "n")
            jsonFormat.append(getSpaceOrTab(tabNum))
          } else {
            jsonFormat.append(c)
          }
        }
        else if(c == '\'){
          if(strJson.charAt(i + 1) == 'n' || strJson.charAt(i + 1) == 't'){
            i += 2
            break
          }else{
            jsonFormat.append(c)
          }
        }
        else{
          jsonFormat.append(c)
        }
        i += 1
      }
    }
    jsonFormat.toString
  }

  def getSpaceOrTab(tabNum: Int): String = {
    val sbTab = new StringBuffer
    for (i <- 0 until tabNum) {
      if (isTab) sbTab.append('t')
      else sbTab.append("  ")
    }
    sbTab.toString
  }
}


使用json格式化工具类,写入文件:

//为了测试对字符串中,的处理,为json额外添加字段
val tableSchema = "{"id": "int", "trans_code": "string", "account_id": "string", "pay_channel_code": "string", "total_trans_price": "double", "total_trans_cost": "double", "create_time": "string", "update_time": "string", "_hoodie_is_deleted": "boolean", "hudi_delta_streamer_update_time": "string", "hudi_delta_streamer_ingest_date": "string"}"

writeJsonObject.put("table.schema",  tableSchema)


  val orderFilePath = "/user/kafka2.json"
  var tool = new JsonFormatTool()
  val jsonString = tool.stringToJSON(writeJsonObject.toString)
  val osw:OutputStreamWriter = new OutputStreamWriter(new FileOutputStream(orderFilePath, false), "UTF-8")
  osw.write(jsonString)
  osw.flush //清空缓冲区,强制输出数据

写入后的结果为:

{
  "description":"This is json File",
  "stu.info":[
    {
      "name":"xiaoming",
      "sex":"boy",
      "grade":80,
      "isOverseasStudent":false
    },
    {
      "name":"xiaohua",
      "sex":"gril",
      "grade":89,
      "isOverseasStudent":true
    }
  ],
  "school":[
    "USTC",
    "ZJU",
    "SJ",
    "QH"
  ],
  "table.schema":"{"id": "int", "trans_code": "string", "account_id": "string", "pay_channel_code": "string", "total_trans_price": "double", "total_trans_cost": "double", "create_time": "string", "update_time": "string", "_hoodie_is_deleted": "boolean", "hudi_delta_streamer_update_time": "string", "hudi_delta_streamer_ingest_date": "string"}"
4 使用json4s

包:

import org.json4s._
import scala.io.Source

import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

读取文件并转换为json对象:

    val paramKafakaHudiFilePath = "/software/member/config/kafkaHudi.json"

    val kafakaHudiFileStr = Source.fromFile(paramKafakaHudiFilePath).getLines.toList.mkString("n")

    val jsonData = parse(kafakaHudiFileStr)

解析json对象:

val JString(sparkSessionMaster) = (jsonData  "description")

解析jsonArray中的对象:

    val kafkaHudiList = for{
      JObject(hoodieParam) <- jsonData
      JField("hoodie.deltastreamer.source.kafka.topic", JString(topic)) <- hoodieParam
      JField("hoodie.deltastreamer.write.file.path", JString(savePath)) <- hoodieParam
      JField("hoodie.datasource.write.table.name", JString(tableName)) <- hoodieParam
      JField("hoodie.datasource.write.recordkey.field", JString(recordkey)) <- hoodieParam
      JField("hoodie.datasource.write.precombine.field", JString(precombine)) <- hoodieParam
      JField("hoodie.datasource.write.partitionpath.field", JString(partitionpath)) <- hoodieParam
      JField("hoodie.datasource.write.table.type", JString(writeTableType)) <- hoodieParam
      JField("hoodie.datasource.write.operation", JString(writeOperation)) <- hoodieParam
      JField("table.schema", JString(tableSchema)) <- hoodieParam
    } yield (topic, savePath, tableName, recordkey, precombine,  partitionpath, writeTableType, writeOperation,tableSchema)

解析的json字符串:

{
  "description": "Write Kafka multi topic data to Hudi table",
  "hoodie.param": [
    {
      "hoodie.deltastreamer.source.kafka.topic": "jk_test_model5.jk_test2.jk34.output",
      "hoodie.deltastreamer.write.file.path": "/test_hudi/jk_test/sub_trans17",
      "hoodie.datasource.write.table.name": "tb_sub_trans_detail_mor",
      "hoodie.datasource.write.recordkey.field": "id",
      "hoodie.datasource.write.precombine.field": "hudi_delta_streamer_update_time",
      "hoodie.datasource.write.partitionpath.field": "hudi_delta_streamer_ingest_date",
      "table.schema": "{"id": "int", "trans_code": "string", "account_id": "string", "client_trans_code": "string", "com_name": "string", "trans_number": "int", "trans_price": "double", "trans_cost": "double", "total_trans_price": "double", "total_trans_cost": "double", "create_time": "string", "update_time": "string", "_hoodie_is_deleted": "boolean", "hudi_delta_streamer_update_time": "string", "hudi_delta_streamer_ingest_date": "string"}",
      "hoodie.datasource.write.table.type": "MERGE_ON_READ",
      "hoodie.datasource.write.operation": "upsert"
    }]
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/757759.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号