Thursday, June 4, 2020

UseCase: conversion of Flat_Data To XML To JSON

//**************************************
// created by Divyanshu Shekhar Singh
//**************************************
import com.thoughtworks.xstream.XStream
import com.thoughtworks.xstream.io.xml.DomDriver
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.{Row, SaveMode, SparkSession}

case class Name(firstName:String,middleName:String,lastName:String)
case class Person(id:String,name:Name,ssn:String,gender:String,salary:String)

object WriteXML {
  def main(args: Array[String]): Unit = {
// Spark Driver   lazy val spark:SparkSession = SparkSession.builder()
      .master("local")
      .appName("Spark")
      .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    val sc = spark.sparkContext
// creating flat data from code or read data from file    val data = Seq(Row("1",Row("James ","","Smith"),"36636","M","3000"),
      Row("2",Row("Michael ","Rose",""),"40288","M","4000"),
      Row("3",Row("Robert ","","Williams"),"42114","M","4000"),
      Row("4",Row("Maria ","Anne","Jones"),"39192","F","4000"),
      Row("5",Row("Jen","Mary","Brown"),"","F","-1")
    )
// create schema    val schema = new StructType()
      .add("id",StringType)
      .add("name",new StructType()
        .add("firstName",StringType)
        .add("middleName",StringType)
        .add("lastName",StringType))
      .add("ssn",StringType)
      .add("gender",StringType)
      .add("salary",StringType)

    val df = spark.createDataFrame(spark.sparkContext.parallelize(data),schema)

   // df.show()    import spark.implicits._
    // code to convert XML    //Now convert the DataFrame[Row] to DataSet[Person].    val dsPerson = df.as[Person]
   // dsPerson.show()    val dsString = dsPerson.mapPartitions(partition=>{
      val xstream = new XStream(new DomDriver)
      val data = partition.map(person=>{
        val xmlString = xstream.toXML(person)
        xmlString
      })
      data
    })
// writing data into xml file    dsString.write.mode(SaveMode.Overwrite).text("C:/Users/admin/Desktop/JSON_FOLDER/xstream.xml")

//Reading above xml data    val df1 = spark.read
      .format("xml")
      .option("rowTag","Person")
      .load("C:/Users/admin/Desktop/JSON_FOLDER/xstream.xml")
    df1.show(false)

 // converting to json
  val JsonDF = (df1.toJSON).show(false)

  }
}


No comments:

Post a Comment