Searching...
lunes, 22 de junio de 2020

Programación con UDFs con Spark Scala

Los UDFs son una característica emocionante de Spark que ha evolucionado enormemente en las distintas releases. Voy a intentar cubrir diferentes aspectos de la programación con UDFs en Spark.

Una UDF es bastante sencilla y fácil de crear en Spark. Supongamos que tenemos los siguientes datos de prueba de clientes. Si se observan los datos del país, hay muchas discrepancias, pero sabemos que es el país correcto, es solo que la forma en que se introduce el dato no es la habitual. Supongamos que necesitamos normalizar el nombre del país al valor “USA con la ayuda de un diccionario conocido, de modo que sea siempre el mismo valor.


Es fácil limpiar estos datos si userData fuera una colección Scala y tuviéramos una lista de todas las combinaciones posibles de EE. UU. que podrían ser datos mal escritos.


Sería muy elegante y fácil si tuviéramos que usar esta función cleanCountry dentro de spark, ¿no es así? Ahí es donde los UDF entran en escena.

Podemos seguir adelante y registrar nuestra función en la sesión Spark.


Ahora podemos usar la función de escala normaliseCountry como si se tratara de una función SQL.


En realidad, existen múltiples formas de registrar una UDF.

Cuando se usa en Spark SQL, la única diferencia es que no necesitamos la función UserDefinedFunction val normaliseCountry como lo hemos hecho en el ejemplo anterior.


También podemos usar el método udf de sql.functions.


Para listar todas las UDFs:


Hay que tener en cuenta algunas advertencias a la hora de programar UDFs en Spark:

1)     Cuando usamos una UDF terminamos perdiendo toda la optimización que Spark hace en nuestro Dataframe / Dataset. Cuando usamos un UDF, es como una caja negra para el optimizador de Spark. Consideremos un ejemplo de optimización general cuando se leen datos de una base de datos o de archivos en formato de columnas como Parquet. Lo que hace es minimizar la cantidad de datos que se leen desde la fuente misma.

Vamos a persistir el dataframe userData anterior como archivo parquet y a leer de nuevo el dataframe como df1 para simular datos que se leen de un archivo parquet.


Supongamos que queremos filtrar solo los datos con el nombre Joey, ya que todos le queremos. Si verificamos la consulta que se ejecuta:


No te preocupes por la confusión en torno al plan de ejecución, pero esto es lo que se ejecuta cuando intentamos filtrar los datos. Esto es equivalente a tener un select * from table where name = "Joey" en lugar de hacer un select * from table y luego filtrar dentro del cluster Spark.

Pero supongamos que hacemos la misma operación usando un UDF en su lugar.


La moraleja de la historia es que hay muchas optimizaciones geniales que Spark hace por nosotros de las cuales nos alejamos si terminamos usando UDFs. Por lo tanto, siempre se recomienda evitar las UDFs siempre que sea posible.

2)     Otra advertencia es la forma en que se gestionan los valores nulos. Es responsabilidad del programador asegurarse de que las UDFs se tratan con cuidado. Supongamos que tenemos una UDF que convierte cadenas a formato camelCase.


Esta UDF generará un error:


Por lo tanto, debemos asegurarnos de gestionar los valores nulos con cuidado.

Vamos con otro ejemplo. Supongamos que tenemos un dataframe como el siguiente:


Ahora queremos escribir un UDF que convierta los valores categóricos yes, no, present, normal en valores binarios 0s y 1s.


Para aplicar esta UDF a todas las variables categóricas podemos utilizar la función foldLeft, de forma que se respete la inmutabilidad de la programación funcional:


Vamos ahora a un ejemplo un poco más complejo. Supongamos que tenemos un dataframe de entrada de la siguiente forma:


Entre P1, P2, P3, P4 queremos encontrar los 2 valores máximos para cada CustomerID y obtener el nombre de columna equivalente para introducirlo en el dataframe, de forma que el dataframe resultante sea:


Aquí para la primera fila, 8 y 7 son los valores máximos. Los nombre de cada columna equivalente son P4 y P3, respectivamente, de modo que para ese CustomerID en particular, debe contener los valores P4 y P3.

Se puede usar una UDF para obtener el resultado deseado. En la UDF, se pueden comprimir (zip) todos los nombres de columna con su valor respectivo y luego ordenar el Array según el valor, devolviendo al final los dos nombres de columna principales.


Ahora se puede usar withColumn y pasar los valores de las columnas como un Array a la UDF previamente definida.


Vamos a ver ahora un buen ejemplo de cómo tener una udf con un número arbitrario de parámetros de función que devuelva una estructura. En este ejemplo hay tres componentes de interés: la case class + el esquema, la función definida por el usuario y la aplicación de la udf al dataframe.


En este ejemplo, TemperatureNote es la clase que devolveremos desde nuestra UDF. El esquema proporciona los nombres de las columnas creadas y sus tipos asociados. Por ejemplo, aunque el nombre del campo para la marca de tiempo sea "timeStamp", el nombre de la columna será "time_stamp". El esquema debe coincidir exactamente con los tipos de la case class.

La función definida por el usuario:


Crear la udf es muy sencillo, simplemente pasamos una función que devuelva una instancia de la case class que hemos creado junto con el esquema asociado. La entrada puede ser un número arbitrario de parámetros con tipos asociados de sql spark.

Luego aplicamos la UDF al dataframe de entrada:


Un caso posible de uso de las UDFs puede ser para analizar el texto mal estructurado de los logs siguiendo un método similar a este. El valor de retorno de la UDF se convierte en un objeto anidado dentro de la columna especificada, y simplemente a partir de aquí podemos proyectar haciendo una SELECT.

El código completo de este caso de uso quedaría de la siguiente manera:



0 comentarios:

Publicar un comentario

Gracias por participar en esta página.

 
Back to top!