Los UDFs son una característica
emocionante de Spark que ha evolucionado enormemente en las distintas releases.
Voy a intentar cubrir diferentes aspectos de la programación con UDFs en Spark.
Una UDF es bastante sencilla y
fácil de crear en Spark. Supongamos que tenemos los siguientes datos de prueba
de clientes. Si se observan los datos del país, hay muchas discrepancias, pero
sabemos que es el país correcto, es solo que la forma en que se introduce el dato
no es la habitual. Supongamos que necesitamos normalizar el nombre del país al
valor “USA” con la ayuda de un diccionario conocido, de modo
que sea siempre el mismo valor.
Es
fácil limpiar estos datos si userData fuera una colección Scala y
tuviéramos una lista de todas las combinaciones posibles de EE. UU. que podrían
ser datos mal escritos.
Sería muy elegante y fácil si
tuviéramos que usar esta función cleanCountry dentro de spark,
¿no es así? Ahí es donde los UDF entran en escena.
Podemos seguir adelante y
registrar nuestra función en la sesión Spark.
Ahora podemos usar la función de
escala normaliseCountry como si se tratara de una función SQL.
En realidad, existen múltiples
formas de registrar una UDF.
Cuando se usa en Spark SQL, la
única diferencia es que no necesitamos la función UserDefinedFunction val
normaliseCountry como lo hemos hecho en el ejemplo anterior.
También podemos usar el método udf
de sql.functions.
Para listar todas
las UDFs:
Hay que tener en cuenta algunas advertencias
a la hora de programar UDFs en Spark:
1)
Cuando usamos una UDF terminamos perdiendo toda
la optimización que Spark hace en nuestro Dataframe / Dataset. Cuando usamos un
UDF, es como una caja negra para el optimizador de Spark. Consideremos un
ejemplo de optimización general cuando se leen datos de una base de datos o de archivos
en formato de columnas como Parquet. Lo que hace es minimizar la cantidad de
datos que se leen desde la fuente misma.
Vamos a persistir el dataframe userData
anterior como archivo parquet y a leer de nuevo el dataframe como df1
para simular datos que se leen de un archivo parquet.
Supongamos que queremos filtrar
solo los datos con el nombre Joey, ya que todos le queremos. Si verificamos la
consulta que se ejecuta:
No te preocupes por la confusión
en torno al plan de ejecución, pero esto es lo que se ejecuta cuando intentamos
filtrar los datos. Esto es equivalente a tener un select * from table where
name = "Joey" en lugar de hacer un select * from table
y luego filtrar dentro del cluster Spark.
Pero supongamos que hacemos la
misma operación usando un UDF en su lugar.
La moraleja de la historia es que
hay muchas optimizaciones geniales que Spark hace por nosotros de las cuales
nos alejamos si terminamos usando UDFs. Por lo tanto, siempre se recomienda
evitar las UDFs siempre que sea posible.
2)
Otra advertencia es la forma en que se gestionan
los valores nulos. Es responsabilidad del programador asegurarse de que las UDFs
se tratan con cuidado. Supongamos que tenemos una UDF que convierte cadenas a
formato camelCase.
Esta UDF generará un error:
Por lo tanto, debemos asegurarnos
de gestionar los valores nulos con cuidado.
Vamos con otro ejemplo.
Supongamos que tenemos un dataframe como el siguiente:
Ahora queremos escribir un UDF
que convierta los valores categóricos yes, no, present, normal
en valores binarios 0s y 1s.
Para aplicar esta UDF a todas las
variables categóricas podemos utilizar la función foldLeft, de
forma que se respete la inmutabilidad de la programación funcional:
Vamos ahora a un ejemplo un poco
más complejo. Supongamos que tenemos un dataframe de entrada de la siguiente
forma:
Entre P1, P2, P3, P4 queremos
encontrar los 2 valores máximos para cada CustomerID y obtener el nombre de
columna equivalente para introducirlo en el dataframe, de forma que el
dataframe resultante sea:
Aquí para la primera fila, 8 y 7 son
los valores máximos. Los nombre de cada columna equivalente son P4 y P3,
respectivamente, de modo que para ese CustomerID en particular, debe contener
los valores P4 y P3.
Se puede usar una UDF para
obtener el resultado deseado. En la UDF, se pueden comprimir (zip) todos los
nombres de columna con su valor respectivo y luego ordenar el Array según
el valor, devolviendo al final los dos nombres de columna principales.
Ahora se puede usar withColumn
y pasar los valores de las columnas como un Array a la UDF previamente definida.
Vamos a ver ahora un buen ejemplo
de cómo tener una udf con un número arbitrario de parámetros de función que devuelva
una estructura. En este ejemplo hay tres componentes de interés: la case class
+ el esquema, la función definida por el usuario y la aplicación de la udf al dataframe.
En este ejemplo, TemperatureNote
es la clase que devolveremos desde nuestra UDF. El esquema proporciona los
nombres de las columnas creadas y sus tipos asociados. Por ejemplo, aunque el
nombre del campo para la marca de tiempo sea "timeStamp",
el nombre de la columna será "time_stamp". El esquema
debe coincidir exactamente con los tipos de la case class.
La función definida por el
usuario:
Crear la udf es muy sencillo,
simplemente pasamos una función que devuelva una instancia de la case class que
hemos creado junto con el esquema asociado. La entrada puede ser un número
arbitrario de parámetros con tipos asociados de sql spark.
Luego aplicamos la UDF al
dataframe de entrada:
Un caso posible de uso de las UDFs
puede ser para analizar el texto mal estructurado de los logs siguiendo un
método similar a este. El valor de retorno de la UDF se convierte en un objeto
anidado dentro de la columna especificada, y simplemente a partir de aquí
podemos proyectar haciendo una SELECT.
El código completo de este caso
de uso quedaría de la siguiente manera:
0 comentarios:
Publicar un comentario
Gracias por participar en esta página.