Searching...
domingo, 25 de abril de 2021

Resolución de problemas comunes en Aplicaciones Spark

Antes de tratar sobre la optimización de Aplicaciones Spark (tuning), voy a dedicar este post a revisar algunos signos y síntomas de problemas en tus jobs de Spark, incluyendo signos que podríamos observar (p. e., tareas lentas), así como síntomas del propio Spark (p.e., OutOfMemoryError).  Hay muchos problemas que pueden afectar a los jobs de Spark, por lo que es imposible cubrirlos todos.  Pero voy a tratar algunos de los problemas más comunes que se pueden encontrar en Spark.  Además de los signos y síntomas, también voy a repasar algunos tratamientos posibles para estos problemas.

Los Jobs de Spark no se inician

Este problema puede surgir con frecuencia, especialmente cuando recién estás comenzando con un despliegue o un entorno nuevos.
  • La interfaz de usuario de Spark no muestra ningún nodo del clúster excepto el driver.
  • La interfaz de usuario se Spark parece estar reportando información incorrecta.
Esto ocurre principalmente cuando las demandas de recursos de tu aplicación o de tu clúster o no están configuradas correctamente.  Spark, en un entorno distribuido, hace algunas suposiciones sobre redes, sistemas de archivos y otros recursos.  Durante el proceso de configuración del clúster, probablemente has configurado  algo incorrectamente y ahora el nodo que ejecuta el driver no puede hablar con los ejecutores. Esto puede deberse a que no has especificado qué IP y puerto están abiertos o no abriste el correcto.  Lo más probable es que se trate de un problema a nivel de cluster, máquina o configuración. Otra opción es que tu aplicación solicite más recursos por ejecutor de los que tu administrador de clúster tiene actualmente libres, en cuyo caso el driver estará esperando eternamente a que se inicien los ejecutores.
  • Asegúrate de que las máquinas puedan comunicarse entre sí por los puertos esperados. Idealmente, deberías abrir todos los puertos entre los nodos worker a menos que tengas restricciones de seguridad más estrictas.
  • Asegúrate de que las configuraciones de recursos de Spark sean correctas y de que tu administrador de cluster esté configurado correctamente para Spark.  Intenta ejecutar una aplicación simple primero para ver si funciona.  Un problema común puede ser que hayas solicitado más memoria por ejecutor de la que el administrador del clúster tiene libre para asignar, así que verifica cuánto se reporta como libre (en su interfaz de usuario) y tu configuración de memoria en el spark-submit.

Errores previos a la ejecución 

Esto puede suceder cuando está desarrollando una nueva aplicación y anteriormente has ejecutado código en este clúster, pero ahora algún código nuevo no funciona.
  • Los comandos no se ejecutan en absoluto y generan grandes mensajes de error.
  • Compruebas la interfaz de usuario de Spark y no parece que se ejecuten jobs, etapas o tareas.
Después de comprobar y confirmar que la pestaña de entorno de la interfaz de usuario de Spark muestra la información correcta para tu aplicación, vale la pena volver a chequear tu código.  Muchas veces, puede haber un simple error tipográfico o un nombre de columna incorrecto que impide que el job de Spark se compile en su plan Spark subyacente (cuando se usa la API de DataFrame).
  • Deberías echar un vistazo al error devuelto por Spark para confirmar que no hay un problema en tu código, como proporcionar la ruta del fichero de entrada o un nombre de campo incorrectos.
  • Volver a comprobar que el clúster tiene la conectividad de red esperada entre tu driver, tus workers y el sistema de almacenamiento que estás utilizando.
  • Es posible que haya problemas con las librerías o classpaths que provoquen que se cargue la versión incorrecta de una librería para acceder al almacenamiento.  Intenta simplificar tu aplicación hasta que obtengas una versión más pequeña que reproduzca el problema (por ejemplo, simplemente leyendo un conjunto de datos).

Errores durante la ejecución 

Este tipo de problema ocurre cuando ya estás trabajando en un clúster o en partes de tu aplicación Spark antes de encontrar un error.  Esta puede ser parte de un job programado que se ejecuta en algún intervalo o parte de alguna exploración interactiva que parece fallar después de algún tiempo.
  • Un job de Spark se ejecuta correctamente en todo el clúster, pero el siguiente falla.
  • Falla un paso en una consulta de múltiples pasos.
  • Un job planificado que se ejecutó ayer está fallando hoy.
  • Mensaje de error difícil de analizar.
Los posibles tratamientos son los siguientes:
  • Comprueba si tus datos existen o están en el formato esperado.  Esto puede cambiar con el tiempo o algún cambio en el flujo puede haber tenido consecuencias no deseadas en tu aplicación.
  • Si aparece un error rápidamente cuando ejecutas una consulta (es decir, antes de que las tareas se hayan lanzado), lo más probable es que se trate de un error de análisis al planificar la consulta.  Esto significa que probablemente hayas escrito mal un nombre de columna a la que se hace referencia en la consulta o que una columna, vista o tabla a la que hizo referencia no existe.
  • Lee la traza de la pila para tratar de encontrar pistas sobre qué componentes están involucrados (por ejemplo, en qué operador y etapa se estaba ejecutando).
  • Intenta aislar el problema comprobando progresivamente los datos de entrada y asegurándote de que los datos se ajustan a lo esperado.  También intenta eliminar la lógica hasta que puedas aislar el problema en una versión más pequeña de tu aplicación.
  • Si un job ejecuta tareas durante algún tiempo y luego falla, podría deberse a un problema con los mismos datos de entrada, en el que el esquema podría especificarse incorrectamente o una fila en particular no se ajusta al esquema esperado. Por ejemplo, a veces tu esquema puede especificar que los datos no contienen nulos, pero tus datos en realidad contienen nulos, lo que puede hacer que fallen ciertas transformaciones.
  • También es posible que tu propio código para procesar los datos falle, en cuyo caso Spark te mostrará la excepción lanzada por tu código. En este caso, verás una tarea marcada como "fallida" en la interfaz de usuario de Spark, y también puedes ver los logs de esa máquina para comprender qué estaba haciendo cuando falló.  Intenta añadir más logs dentro de tu código para averiguar qué registro de datos se estaba procesando.

Tareas lentas o rezagadas

Este problema es bastante común al optimizar aplicaciones y puede ocurrir debido a que el trabajo no se distribuye uniformemente en tus máquinas ("sesgo") o debido a que una de tus máquinas es más lenta que las otras (por ejemplo, debido a un problema de hardware).

Cualquiera de los siguientes son síntomas apropiados del problema:
  • Las etapas de Spark parecen ejecutarse hasta que solo quedan un puñado de tareas.  Luego, esas tareas llevan mucho tiempo.
  • Estas tareas lentas aparecen en la interfaz de usuario de Spark y ocurren de manera consistente en los mismos conjuntos de datos.
  • Ocurren en las etapas, una tras otra.
  • Aumentar la cantidad de máquinas asignadas a la aplicación Spark no ayuda realmente; algunas tareas siguen tardando mucho más tiempo que otras.
  • En las métricas de Spark, ciertos ejecutores leen y escriben muchos más datos que otros.
Las tareas lentas a menudo se denominan "rezagadas". Hay muchas razones por las que pueden ocurrir, pero la causa más frecuente de este problema es que tus datos están divididos de manera desigual en particiones de DataFrame o RDD. Cuando esto sucede, es posible que algunos ejecutores deban trabajar en cantidades de trabajo mucho mayores que otros. Un caso particularmente común es cuando se utiliza una operación group-by-key y una de las claves tiene más datos que otras. En este caso, cuando observas la interfaz de usuario de Spark, es posible que veas que los datos de shuffle para algunos nodos sean mucho más grandes que para otros.
  • Intenta aumentar el número de particiones para tener menos datos por partición.
  • Intenta volver a particionar con otra combinación de columnas. Por ejemplo, los rezagados pueden aparecer cuando particionas por una columna sesgada o una columna donde muchos valores son nulos. En el último caso, podría tener sentido filtrar primero los valores nulos.
  • Intenta aumentar la memoria asignada a tus ejecutores si es posible.
  • Monitoriza el ejecutor que tiene problemas y fíjate si es la misma máquina en todos los jobs; también puedes tener un ejecutor o una máquina en mal estado en tu clúster, por ejemplo, una cuyo disco está casi lleno.
  • Si este problema está asociado con un join o una agregación que está tardando demasiado, consulta más adelante.
  • Comprueba si tus funciones definidas por usuario (UDFs) son un desperdicio en su asignación de objetos o lógica de negocio. Intenta convertirlas a código DataFrame si es posible.
  • Asegúrate de que tus UDFs o funciones agregadas definidas por usuario (UDAFs) se estén ejecutando en un batch de datos lo suficientemente pequeño. Muchas veces, una agregación puede extraer una gran cantidad de datos en memoria para una clave común, lo que hace que ese ejecutor tenga que hacer mucho más trabajo que los demás.
  • Activar la especulación, que tratamos más adelante, hará que Spark ejecute una segunda copia de las tareas que son extremadamente lentas. Esto puede ser útil si el problema se debe a un nodo defectuoso porque la tarea se ejecutará en uno más rápido. Sin embargo, la especulación tiene un coste porque consume recursos adicionales. Además, para algunos sistemas de almacenamiento que utilizan la consistencia eventual, podrías terminar con datos de salida duplicados si tus escrituras no son idempotentes. 
  • Puede surgir otro problema común cuando trabajas con conjuntos de datos. Puesto que los conjuntos de datos ejecutan una gran cantidad de instanciación de objetos para convertir registros en objetos Java para las UDFs, pueden causar mucha recolección de basura. Si usas conjuntos de datos, observa las métricas de recolección de basura en la interfaz de usuario de Spark para ver si son consistentes con las tareas lentas.
Los rezagados pueden ser uno de los problemas más difíciles de depurar, simplemente porque hay muchas causas posibles. Sin embargo, con toda probabilidad, la causa será algún tipo de sesgo en los datos, por lo que definitivamente comienza por chequear la interfaz de usuario de Spark en busca de cantidades desequilibradas de datos entre las tareas.

Agregaciones lentas

Si tienes una agregación lenta, comienza por revisar los problemas de la sección "Tareas lentas" antes de continuar. Después de haberlos intentado, es posible que continúes viendo el mismo problema.
  • Tareas lentas durante una llamada groupBy.
  • Los trabajos posteriores a la agregación también son lentos.
Desafortunadamente, este problema no siempre se puede resolver. A veces, los datos de tu job tienen algunas claves sesgadas y la operación que quieres ejecutar sobre ellos debe ser lenta.
  • Aumentar el número de particiones, antes de una agregación, puede ayudar a reducir el número de claves diferentes procesadas en cada tarea.
  • Aumentar la memoria del ejecutor también puede ayudar a aliviar este problema. Si una sola clave tiene muchos datos, esto permitirá que tu ejecutor escriba a disco con menos frecuencia y termine más rápido, aunque aún puede ser mucho más lento que los ejecutores que procesan el resto de claves.
  • Si descubres que las tareas posteriores a la agregación también van lento, esto significa que tu conjunto de datos puede haberse desequilibrado después de la agregación. Intenta insertar una llamada a repartition para particionarlo aleatoriamente.
  • Asegúrate de que todos los filtros y sentencias SELECT que puedan estar por encima de la agregación pueden ayudar a garantizar que estés trabajando solo en los datos con los que necesitas trabajar y nada más. El optimizador de consultas de Spark lo hará automáticamente para las APIs estructuradas.
  • Asegúrate de que los valores nulos estén representados correctamente (utilizando el concepto de nulo de Spark) y no como un valor predeterminado como "" o "VACÍO". Spark suele optimizar para omitir nulos al principio del job cuando es posible, pero no puede hacerlo para tus propios valores marcados.
  • Algunas funciones de agregación también son inherentemente más lentas que otras. Por ejemplo, collect_list y collect_set son funciones de agregación muy lentas porque deben devolver todos los objetos encontrados al driver y deben evitarse en un código donde el rendimiento es crítico.

Joins lentos

Los joins y las agregaciones son ambos shuffles, por lo que comparten algunos de los mismos síntomas y tratamientos generales.
  • Parece que una etapa de join está tardando mucho. Puede ser una o múltiples tareas.
  • Las etapas anterior y posterior al join parecen funcionar con normalidad.
Los tratamientos posibles pueden ser los siguientes:
  • Muchos joins se pueden optimizar (manual o automáticamente) a otros tipos de joins.
  • Experimentar con diferentes órdenes de joins puede ayudar realmente a acelerar los jobs, especialmente si algunas de esos joins filtran una gran cantidad de datos; haz esos primero.
  • Particionar un conjunto de datos antes del join puede ser muy útil para reducir el movimiento de datos en el clúster, especialmente si el mismo conjunto de datos se utilizará en múltiples operaciones de join. Vale la pena experimentar con diferentes particionamientos pre-join. Ten en cuenta, nuevamente, que esto no es "gratis" y tiene el coste de un shuffle.
  • Los joins lentos también pueden deberse a un sesgo en los datos. No siempre hay mucho que se pueda hacer aquí, pero dimensionar la aplicación Spark y/o aumentar el tamaño de los ejecutores puede ayudar.
  • Asegúrate de que todos los filtros y sentencia de selección que pueden estar por encima del join puede ayudar a garantizar que estés trabajando solo en los datos que necesitas para el join.
  • Asegúrate de que los valores nulos se gestionan correctamente (que estés usando nulos) y no algún valor predeterminado como "" o "VACÍO", como ocurre con las agregaciones.
  • A veces, Spark no puede planificar correctamente un join de broadcast si no conoce ninguna estadística sobre el DataFrame o la tabla de entrada. Si sabes que una de las tablas que intervienen en el join es pequeña, puedes intentar forzar un broadcast o usar los comandos de recopilación de estadísticas de Spark para permitirle analizar la tabla.

Lecturas y Escrituras Lentas

La E/S lenta puede ser difícil de diagnosticar, especialmente con sistemas de archivos en red.
  • Lectura lenta de datos de un sistema de archivos distribuido o un sistema externo.
  • Escrituras lentas desde sistemas de archivos de red o almacenamiento de blobs.
Los tratamientos posibles pueden ser los siguientes:
  • Activar la especulación (estableciendo spark.speculation a true) puede ayudar con las lecturas y escrituras lentas. Esto lanzará tareas adicionales con la misma operación en un intento de ver si es solo un problema transitorio en la primera tarea. La especulación es una herramienta potente y funciona bien con sistemas de archivos consistentes. Sin embargo, puede causar escrituras de datos duplicados con algunos servicios en la nube eventualmente consistentes, como Amazon S3, así que comprueba si viene soportado por el conector del sistema de almacenamiento que estés utilizando.
  • Garantizar una conectividad de red suficiente puede ser importante: es posible que tu clúster Spark simplemente no tenga suficiente ancho de banda total de red para acceder a tu sistema de almacenamiento.
  • Para sistemas de archivos distribuidos como HDFS que se ejecutan en los mismos nodos que Spark, asegúrate de que Spark vea los mismos nombres de host para los nodos que el sistema de archivos. Esto permitirá que Spark realice una planificación basada en la localidad, que podrás ver en la columna "localidad" en la interfaz de usuario de Spark.

Driver OutOfMemoryError o Driver Unresponsive

Este suele ser un problema bastante serio porque fallará tu aplicación Spark. A menudo sucede debido a la recopilación de demasiados datos para el driver, lo que hace que se quede sin memoria.
  • La aplicación Spark no responde o falla.
  • OutOfMemoryErrors o mensajes de recolección de basura en los logs del driver.
  • Los comandos tardan mucho en ejecutarse o no se ejecutan en absoluto.
  • La interactividad es muy baja o nula.
  • El uso de memoria es alto para la JVM del driver.
Hay una variedad de posibles razones para que esto suceda, y el diagnóstico no siempre es sencillo.
  • Es posible que su código haya intentado recopilar un conjunto de datos demasiado grande para el nodo del controlador mediante operaciones como collect.
  • Es posible que estés utilizando un join de broadcast en la que los datos que se transmitirán son demasiado grandes. Utiliza la configuración de join de broadcast máxima de Spark para controlar mejor el tamaño que se transmitirá.
  • Una aplicación de larga duración ha generado una gran cantidad de objetos en el driver y no puede liberarlos. La herramienta jmap de Java puede ser útil para ver qué objetos están llenando la mayor parte de la memoria de la JVM de tu driver, imprimiendo un histograma de la pila. Sin embargo, ten en cuenta que jmap pausará esa JVM mientras se ejecuta.
  • Aumenta la asignación de memoria del driver si es posible para que funcione con más datos.
  • Pueden ocurrir problemas con las JVMs que se quedan sin memoria si estás utilizando otro enlace de lenguaje, como Python, debido a que la conversión de datos entre las dos requiere demasiada memoria en la JVM. Intenta ver si tu problema es específico del lenguaje elegido y devuelve menos datos al nodo del driver, o escríbelos en un archivo en lugar de traerlos como objetos a memoria.
  • Si estás compartiendo un SparkContext con otros usuarios (por ejemplo, a través de un servidor SQL JDBC y algunos entornos de notebook), asegúrate de que la gente no estén intentando hacer algo que pueda estar causando una gran cantidad de asignación de memoria en el driver (como trabajar con arrays demasiado grandes en su código o recopilar grandes conjuntos de datos).

Executor OutOfMemoryError o Executor Unresponsive

Las aplicaciones Spark a veces pueden recuperarse automáticamente de esto, según el verdadero problema subyacente.
  • OutOfMemoryErrors o mensajes de recolección de basura en los logs del ejecutor. Puedes encontrarlos en la interfaz de usuario de Spark.
  • Ejecutores que fallan o dejan de responder.
  • Tareas lentas en ciertos nodos que nunca parecen recuperarse.
Los tratamientos posibles pueden ser los siguientes:
  • Intenta aumentar la memoria disponible para los ejecutores y el número de ejecutores.
  • Intenta aumentar el tamaño del worker de PySpark a través de las configuraciones relevantes de Python.
  • Busca mensajes de error de recolección de basura en los logs del ejecutor. Algunas de las tareas que se están ejecutando, especialmente si usas UDFs, pueden crear muchos objetos que deben recolectarse como basura. Reparticiona tus datos para aumentar el paralelismo, reduce la cantidad de registros por tarea y asegurate de que todos los ejecutores obtengan la misma cantidad de trabajo.
  • Asegúrate de que los valores nulos se gestionen correctamente (que estés usando nulos) y no algún valor predeterminado como "" o "VACÍO", como discutimos anteriormente.
  • Es más probable que esto suceda con RDDs o con Datasets debido a las instanciaciones de objetos. Intenta usar menos UDFs y más operaciones estructuradas de Spark cuando sea posible.
  • Usa herramientas de monitorización de Java como jmap para obtener un histograma del uso de la memoria de la pila en tus ejecutores y fíjate qué clases están ocupando más espacio.
  • Si los ejecutores se colocan en nodos que también tienen otras cargas de trabajo ejecutándose, como un almacén de clave-valor, intenta aislar tus jobs de Spark del resto de trabajos.

Nulos inesperados en los resultados

  • Valores nulos inesperados después de transformaciones.
  • Los jobs planificados en producción que solían funcionar ya no funcionan o ya no generan resultados correctos.
Los tratamientos posibles pueden ser los siguientes:
  • Es posible que el formato de los datos haya cambiado sin ajustar tu lógica de negocio. Esto significa que el código que funcionaba antes ya no es válido.
  • Utiliza un acumulador para intentar contar registros o ciertos tipos, así como parsear o procesar errores en los que se omita un registro. Esto puede ser útil porque podrías pensar que estás parseando datos de un formato determinado, pero algunos de los datos no llegan en ese formato. La mayoría de las veces, los usuarios colocarán el acumulador en una UDF cuando estén parseando sus datos en bruto en un formato más controlado y realizarán los recuentos ahí. Esto te permite contar registros válidos e inválidos y luego operar en consecuencia a posteriori.
  • Asegúrate de que tus transformaciones realmente den como resultado planes de consulta válidos. Spark SQL a veces realiza coacciones de tipo implícitas que pueden generar resultados confusos. Por ejemplo, la expresión SQL SELECT 5*"23" da como resultado 115 porque la cadena "23" se convierte en un valor entero 23, pero la expresión SELECT 5 * "" da como resultado un valor nulo porque la cadena vacía se convierte en un número entero que da nulo. Asegúrate de que tus conjuntos de datos intermedios tengan el esquema esperado (intenta usar printSchema) y busca cualquier operación CAST en el plan de consulta final.

Errores de espacio en disco

  • Salen errores de "no space left on fisk" y los jobs fallan.
Los tratamientos posibles pueden ser los siguientes:
  • La forma más fácil de solucionar esto, por supuesto, es añadir más espacio en disco. Puedes hacer esto dimensionando los nodos en los que estás trabajando o conectando almacenamiento externo en un entorno cloud.
  • Si tienes un clúster con espacio de almacenamiento limitado, es posible que algunos nodos se agoten primero debido al sesgo. Reparticionar los datos como se describió anteriormente puede ayudar aquí.
  • También hay una serie de configuraciones de almacenamiento con las que puedes experimentar. Algunas de estos determinan cuánto tiempo se deben mantener los logs en la máquina antes de ser retirados. Para obtener más información, consulta las configuraciones continuas de logs de ejecutores de Spark del tipo spark.executor.logs.rolling.*.
  • Intenta eliminar manualmente algunos archivos de logs antiguos o archivos de shuffle antiguos de las máquinas en cuestión. Esto puede ayudar a aliviar parte del problema, aunque obviamente no es una solución permanente.

Errores de Serialización

Salen errores de serialización y los jobs fallan.
  • Esto es muy poco habitual cuando se trabaja con las APIs estructuradas, pero es posible que estés intentando ejecutar alguna lógica personalizada sobre los ejecutores con UDFs o RDDs y la tarea que estás tratando de serializar a estos ejecutores o los datos que estás tratando de compartir no pueden serializarse. Esto sucede a menudo cuando estás trabajando con algún código o datos que no se pueden serializar en una UDF o función, o si estás trabajando con tipos de datos extraños que no se pueden serializar. Si estás usando (o tienes la intención de usar la serialización Kryo), comprueba que estés registrando tus clases para que se serialicen realmente.
  • Intenta no hacer referencia a ningún campo del objeto envolvente en tus UDFs cuando crees UDFs dentro de una clase Java o Scala. Esto puede hacer que Spark intente serializar todo el objeto envolvente, lo que puede no ser posible. En su lugar, copia los campos relevantes a variables locales en el mismo alcance que el cierre y utilízalos.

Conclusiones

En este post y en uno anterior he cubierto algunas de las herramientas principales que se pueden usar para monitorizar y depurar jobs y aplicaciones Spark, así como los problemas más comunes con que nos encontramos y sus soluciones. Al igual que con la depuración de cualquier software complejo, recomiendo adoptar un enfoque paso a paso basado en principios para depurar problemas. Añade sentencias de logging para averiguar dónde está fallando tu job y qué tipo de datos llegan en cada etapa, intenta aislar el problema en la menor parte de código posible y trabaja a partir de ahí. Para problemas de sesgo en los datos, que son exclusivos de la computación paralela, usa la interfaz de usuario de Spark para obtener una visión general rápida de cuánto trabajo está realizando cada tarea. 

Referencias




Video



En otro post voy a tratar sobre el ajuste del rendimiento en particular y las diversas herramientas que se pueden utilizar para ello.

Feliz aprendizaje !!! :)


0 comentarios:

Publicar un comentario

Gracias por participar en esta página.

 
Back to top!