//**************************************
// created by Divyanshu Shekhar Singh
//**************************************
import com.thoughtworks.xstream.XStream
import com.thoughtworks.xstream.io.xml.DomDriver
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
case class Name(firstName:String,middleName:String,lastName:String)
case class Person(id:String,name:Name,ssn:String,gender:String,salary:String)
object WriteXML {
def main(args: Array[String]): Unit = {
// Spark Driver lazy val spark:SparkSession = SparkSession.builder()
.master("local")
.appName("Spark")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
val sc = spark.sparkContext
// creating flat data from code or read data from file val data = Seq(Row("1",Row("James ","","Smith"),"36636","M","3000"),
Row("2",Row("Michael ","Rose",""),"40288","M","4000"),
Row("3",Row("Robert ","","Williams"),"42114","M","4000"),
Row("4",Row("Maria ","Anne","Jones"),"39192","F","4000"),
Row("5",Row("Jen","Mary","Brown"),"","F","-1")
)
// create schema val schema = new StructType()
.add("id",StringType)
.add("name",new StructType()
.add("firstName",StringType)
.add("middleName",StringType)
.add("lastName",StringType))
.add("ssn",StringType)
.add("gender",StringType)
.add("salary",StringType)
val df = spark.createDataFrame(spark.sparkContext.parallelize(data),schema)
// df.show() import spark.implicits._
// code to convert XML //Now convert the DataFrame[Row] to DataSet[Person]. val dsPerson = df.as[Person]
// dsPerson.show() val dsString = dsPerson.mapPartitions(partition=>{
val xstream = new XStream(new DomDriver)
val data = partition.map(person=>{
val xmlString = xstream.toXML(person)
xmlString
})
data
})
// writing data into xml file dsString.write.mode(SaveMode.Overwrite).text("C:/Users/admin/Desktop/JSON_FOLDER/xstream.xml")
//Reading above xml data val df1 = spark.read
.format("xml")
.option("rowTag","Person")
.load("C:/Users/admin/Desktop/JSON_FOLDER/xstream.xml")
df1.show(false)
// converting to json
val JsonDF = (df1.toJSON).show(false)
}
}
No comments:
Post a Comment