Searching...
lunes, 14 de agosto de 2023

Impresión de esquemas complejos de un Dataframe Spark

La función printSchema para que sea más general. Ahora acepta cualquier DataType y un prefijo. Si se encuentra con un StructType, recorre sus campos y llama a printSchema de nuevo con un nuevo prefijo. Si se encuentra con un ArrayType, llama a printSchema con el tipo de elemento del array y un nuevo prefijo con []. Si no es ninguno de esos dos casos, simplemente imprime el prefijo y el tipo.

Con este enfoque, el programa puede manejar estructuras anidadas complejas, ya sean campos STRUCT anidados dentro de otros campos STRUCT o ARRAY, o campos ARRAY anidados dentro de campos STRUCT.

Para generar un archivo CSV se puede usar la API de Spark para escribir DataFrames en formato CSV. Primero, la función printSchema devuelve una lista de tuplas (campo, tipo), y luego convertimos esa lista en un DataFrame que finalmente escribimos en formato CSV.

import org.apache.spark.sql.{SparkSession, DataFrame, Row} import org.apache.spark.sql.types._ object SchemaExplorer { def main(args: Array[String]): Unit = { val spark = SparkSession.builder .appName("SchemaExplorer") .master("local[*]") .getOrCreate() val df = spark.read.json("path_a_tu_esquema_json.json") val schemaList = extractSchema(df.schema) val outputDf = spark.createDataFrame(schemaList.map(Row.fromTuple), StructType(Seq( StructField("Campo", StringType, false), StructField("Tipo", StringType, false) ))) outputDf.write.csv("path_donde_guardar.csv") spark.stop() } def extractSchema(dataType: DataType, prefix: String = ""): List[(String, String)] = { dataType match { case structType: StructType => structType.fields.flatMap { field => val newPrefix = if (prefix.isEmpty) field.name else s"$prefix.${field.name}" extractSchema(field.dataType, newPrefix) }.toList case arrayType: ArrayType => val innerType = arrayType.elementType val newPrefix = s"$prefix[]" extractSchema(innerType, newPrefix) case _ => List((prefix, dataType.simpleString)) } } }


Vamos a suponer que tienes un DataFrame con un esquema que tiene un campo users que es un ARRAY de STRUCT. Cada estructura tiene campos name y address, donde address es otro STRUCT con campos street y zip.

val data = Seq( Row(Seq(Row("John", Row("123 Main St", "12345")), Row("Jane", Row("456 Elm St", "67890")))) ) val addressSchema = StructType(Seq( StructField("street", StringType, false), StructField("zip", StringType, false) )) val userSchema = StructType(Seq( StructField("name", StringType, false), StructField("address", addressSchema, false) )) val schema = StructType(Seq( StructField("users", ArrayType(userSchema), false) )) val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)

Si ejecutamos el programa SchemaExplorer con este DataFrame, la salida sería:

Campo Tipo ------------------------- users[] StructType users[].name StringType users[].address StructType users[].address.street StringType users[].address.zip StringType

El programa se da cuenta de que users es un ARRAY y, por lo tanto, añade [] a su nombre. A continuación, como el tipo de elemento del ARRAY es un STRUCT, profundiza en sus campos y sigue haciendo lo mismo hasta que haya recorrido todo el esquema.

0 comentarios:

Publicar un comentario

Gracias por participar en esta página.

 
Back to top!