En este post trato sobre los detalles principales que necesitamos para monitorizar una aplicación Spark, básicamente logs, interfaz de usuario web, API REST y métricas. También existen distintas herramientas visuales de terceros como Grafana.
Depuración de aplicaciones Spark mediante logs
Una de las formas más detalladas de monitorizar Spark es a través de sus archivos de log. Naturalmente, los eventos extraños en los logs de Spark, o en los logs que añadamos a nuestra aplicación Spark, pueden ayudarnos a detectar exactamente dónde fallan los jobs o qué está causando ese error. Si usas la plantilla de aplicación proporcionada en https://github.com/databricks/Spark-The-Definitive-Guide, el framework de logging que configuramos en la plantilla permitirá que los logs de nuestra aplicación se muestren junto con los propios logs de Spark, haciéndolos muy fáciles de correlacionar. Sin embargo, un problema es que Python no podrá integrarse directamente con la librería de logging basada en Java de Spark. Sin embargo, mediante el uso del módulo de logging de Python o incluso sentencias print simples se podrán imprimir los resultados al error estándar y los hará fáciles de encontrar.
Para cambiar el nivel de log de Spark, simplemente ejecuta el siguiente comando:
spark.sparkContext.setLogLevel("INFO")
Esto nos permite leer los logs y, si utilizas la plantilla de aplicación, podrás logar tu propia información relevante junto con estos logs, lo que te permitirá inspeccionar tanto tu propia aplicación como Spark. Los logs en sí se imprimirán en el error estándar cuando se ejecute una aplicación en modo local, o el administrador del clúster los guardará en archivos cuando se ejecute Spark en un clúster. Consulta la documentación de cada administrador de clúster sobre cómo encontrarlos; por lo general, están disponibles a través de la interfaz de usuario web del administrador del clúster.
No siempre encontrarás la respuesta que necesitas simplemente buscando en los logs, pero puede ayudarte a identificar el problema concreto que estás encontrando y posiblemente añadir nuevas sentencias de log en tu aplicación para comprenderlo mejor. También es conveniente recopilar logs a lo largo del tiempo para poder usarlos más adelante. Por ejemplo, si la aplicación falla, querrás depurar por qué, sin tener acceso ya a la aplicación caída. Es posible que también quieras enviar los logs de la máquina en la que fueron escritos para conservarlos si una máquina falla o se desconecta (por ejemplo, si se ejecuta en la nube).
Ver la información sobre todas las aplicaciones Spark en ejecución depende del administrador de clúster que estés utilizando. Debes seguir estas instrucciones cuando depuras tu aplicación Spark:
- Spark Standalone: Dirígete al interfaz del nodo maestro de Spark en http://master:18080. El maestro y cada worker muestran el clúster y las estadísticas del job relacionadas. Además, también se escribe una salida de log detallada para cada job en el directorio de trabajo de cada worker. Discutiremos cómo habilitar los logs manualmente usando log4j con Spark.
- YARN: Si tu administrador de clúster es YARN, y supongamos que estés ejecutando tus jobs Spark en Cloudera (u otra plataforma basada en YARN), dirígetea la página de aplicaciones YARN en la Consola de administración de Cloudera Manager. Ahora, para depurar aplicaciones Spark que se ejecutan en YARN, consulta los logs del rol de Node Manager. Para que esto suceda, abre el visor de eventos de logs y luego filtra el flujo de eventos para elegir una ventana de tiempo y nivel de log y mostrar la fuente del Node Manager. También puedes acceder a los logs por comando. El formato del comando es el siguiente:
yarn logs -applicationId <application ID> [OPTIONS]
Por ejemplo, lo siguientes son comandos válidos para estos IDs:
yarn logs -applicationId application_561453090098_0005
yarn logs -applicationId application_561453090098_0005 userid
Ten en cuenta que estos IDs de usuario son diferentes. Sin embargo, esto es solo cierto si yarn.log-aggregation-enable es true en yarn-site.xml y ya ha terminado de ejecutarse la aplicación.
Spark usa log4j para su logging propio. Todas las operaciones que ocurren en el backend se logan en la consola del Spark Shell (que ya está configurado para el almacenamiento subyacente). Spark proporciona una plantilla de log4j como un archivo de propiedades, y podemos extender y modificar ese archivo para el logging en Spark. Dirígete al directorio SPARK_HOME/conf y deberías ver el archivo log4j.properties.template. Esto podría ayudarnos como punto de partida para nuestro propio sistema de logging.
Una vez modificado el archivo de la plantilla, lo renombramos como log4j.properties y lo colocamos en el mismo directorio (es decir, en el árbol del proyecto). Luego este fichero es recogido por Spark cuando se inicia la aplicación.
Un inconveniente de la clase org.apache.log4j.Logger es que no es serializable, lo que implica que no podemos usarla dentro de un cierre donde hagamos operaciones con la API de Spark. Para solucionar este problema tenemos que declarar el objeto Scala como extends Serializable.
El cierre no se puede distribuir ordenadamente a todas las particiones ya que no se puede cerrar en el logger; por lo tanto, toda la instancia del tipo MyMapper se distribuye a todas las particiones; una vez hecho esto, cada partición crea un nuevo logger y lo usa para el logging.
Ejemplo de sistema de logging usando Datasets:
Interfaz de Usuario Web
Cada SparkContext lanza una interfaz de usuario web, de forma predeterminada en el puerto 4040, que muestra información útil sobre la aplicación. Esta incluye:
- Una lista de las etapas y tareas del planificador.
- Un resumen de los tamaños de RDD y el uso de memoria.
- Información del entorno.
- Información sobre los ejecutores en ejecución
Puede acceder a esta interfaz simplemente abriendo http://<driver-node>:4040 en un navegador web. Si se ejecutan varios SparkContexts en el mismo host, se vincularán a puertos sucesivos que comiencen con 4040 (4041, 4042, etc.).
Ten en cuenta que esta información solo está disponible durante la duración de la aplicación de forma predeterminada. Para ver la interfaz de usuario web a posteriori, establece spark.eventLog.enabled a true antes de iniciar la aplicación. Esto configura Spark para logar eventos Spark que codifican la información que se muestra en la interfaz de usuario en el almacenamiento persistente.
Sigue siendo posible construir la interfaz de usuario de una aplicación a través del History Server de Spark, siempre que existan los logs de eventos de la aplicación. Puedes iniciar el servidor de historial ejecutando:
Esto crea una interfaz web en http://<server-url>:18080 de forma predeterminada, que lista las aplicaciones e intentos incompletos y completados.
Cuando se utiliza la clase proveedora del sistema de archivos (consulta la opción de configuración spark.history.provider), debe proporcionarse el directorio base de logging en la opción de configuración spark.history.fs.logDirectory y debe contener subdirectorios que representen los logs de eventos de una aplicación.
Los jobs Spark deben configurarse para logar eventos y para logarlos en el mismo directorio de escritura compartido. Por ejemplo, si el servidor se ha configurado con un directorio de log en hdfs://namenode/shared/spark-logs, las opciones del lado del cliente serían:
Una aplicación de larga duración (por ejemplo, de streaming) puede generar un único archivo enorme de log de eventos que puede costar mucho mantener y también requiere una gran cantidad de recursos a reproducir por cada actualización en el History Server de Spark.
Habilitar spark.eventLog.rolling.enabled y spark.eventLog.rolling.maxFileSize te permitiría tener archivos renovables de log de eventos en lugar de un archivo de log de eventos de gran tamaño que puede ayudar en algunos escenarios por sí solo, pero aún así no te ayuda a reducir el tamaño total de los logs.
El History Server de Spark puede aplicar compactación sobre los archivos renovables de log de eventos para reducir el tamaño general de los logs, mediante la configuración de spark.history.fs.eventLog.rolling.maxFilesToRetain en el History Server de Spark.
Los detalles se describen a continuación, pero ten en cuenta que la compactación es una operación CON PERDIDA. La compactación descartará algunos eventos que ya no se verán en la interfaz de usuario; es posible que quieras verificar qué eventos se descartarán antes de habilitar la opción.
Cuando ocurre la compactación, el History Server enumera todos los archivos de log de eventos disponibles para la aplicación y considera que los archivos de log de eventos tienen menos índice que el archivo con el índice más pequeño que se conservará como objetivo de la compactación. Por ejemplo, si la aplicación A tiene 5 archivos de log de eventos y spark.history.fs.eventLog.rolling.maxFilesToRetain se establece en 2, entonces se seleccionarán los primeros 3 archivos de log para compactarlos.
Una vez que selecciona el objetivo, los analiza para averiguar qué eventos se pueden excluir y los reescribe en un archivo compacto con los eventos descartados que se deciden excluir.
La compactación intenta excluir los eventos que apuntan a los datos desactualizados. A partir de ahora, a continuación se describen los candidatos de los eventos a excluir:
- Eventos para el job que está finalizado y eventos relacionados con etapas / tareas.
- Eventos para el ejecutor que se finaliza.
- Eventos para la ejecución de SQL que haya finalizado y eventos relacionados con jobs/etapas/tareas.
Una vez finalizada la reescritura, los archivos de log originales se eliminarán mediante el mejor esfuerzo posible. Es posible que el History Server no pueda eliminar los archivos de log originales, pero no afectará el funcionamiento del History Server.
Ten en cuenta que el History Server de Spark puede no compactar los archivos de log de eventos antiguos si se da cuenta de que no se reduciría mucho espacio durante la compactación. Para las consultas de streaming, normalmente esperamos que la compactación se ejecute ya que cada micro-batch disparará uno o más jobs que se terminarán en breve, pero la compactación no se ejecutará en muchos casos para las consultas batch.
Ten en cuenta también que esta es una nueva característica introducida en Spark 3.0 y puede que no sea completamente estable. En algunas circunstancias, la compactación puede excluir más eventos de los esperados, lo que genera algunos problemas de interfaz de usuario en el History Server para la aplicación. Úsalo con precaución.
Existen otras muchas opciones de configuración para el History Server. Por limitación de espacio no puedo incluir todas las opciones de configuración en este artículo. Consulta la tabla relevante a este efecto para las opciones de configuración del History Server de Spark.
Observa que en todas estas interfaces de usuario, las tablas se pueden ordenar haciendo clic en sus cabeceras, lo que facilita la identificación de tareas lentas, datos sesgados, etc.
Notas:
- El servidor de historial muestra los jobs Spark finalizados e incompletos. Si una aplicación realiza varios intentos después de fallos, se mostrarán los intentos fallidos, así como cualquier intento incompleto en curso o el último intento con éxito.
- Las aplicaciones incompletas solo se actualizan de forma intermitente. El tiempo entre actualizaciones se define por el intervalo entre comprobaciones de archivos modificados (spark.history.fs.update.interval). En clústeres más grandes, el intervalo de actualización se puede establecer en valores grandes. La forma de ver una aplicación en ejecución es en realidad ver su propia interfaz de usuario web.
- Las aplicaciones que salen sin registrarse como finalizadas aparecerán como incompletas, aunque ya no se estén ejecutando. Esto puede suceder si una aplicación falla.
- Una forma de señalar la finalización de un job de Spark es detener el SparkContext explícitamente (sc.stop()), o en Python usando el constructo with SparkContext() as sc: para manejar la configuración y desmontaje del SparkContext.
API REST
Además de la interfaz de usuario de Spark, también puedes acceder al estado y las métricas de Spark a través de una API REST disponible en http://localhost:4040/api/v1. Esta es una forma de crear visualizaciones y herramientas de monitorización sobre Spark. En su mayor parte, esta API expone la misma información presentada en la interfaz de usuario web, excepto que no incluye ninguna información relacionada con SQL. Esta puede ser una herramienta útil si quieres crear tu propia solución de reporting basada en la información disponible en la interfaz de usuario de Spark. Puedes consultar la tabla correspondiente con los endpoints de la API REST aquí.
Métricas
Spark tiene un sistema de métricas configurable basado en la librería de métricas Dropwizard. Esto permite a los usuarios reportar métricas de Spark a una variedad de receptores, incluyendo archivos HTTP, JMX y CSV. Las métricas son generadas por fuentes incrustadas en la base de código Spark. Proporcionan instrumentación para actividades específicas y componentes de Spark. El sistema de métricas se configura a través de un archivo de configuración que Spark espera que esté presente en $SPARK_HOME/conf/metrics.properties. Se puede especificar una ubicación de archivo personalizada a través de la propiedad de configuración spark.metrics.conf. En lugar de utilizar el archivo de configuración, se puede usar un conjunto de parámetros de configuración con el prefijo spark.metrics.conf. Más información aquí.
Referencias
Video: Understanding Spark Web UI
En un próximo artículo trataré sobre el tuning de aplicaciones Spark.
Buen aprendizaje !!
0 comentarios:
Publicar un comentario
Gracias por participar en esta página.