Con las herramientas de monitorización y los problemas comunes descritos en mi anterior post, deberías poder asegurarte de que tus jobs Spark se ejecuten de manera confiable. Sin embargo, a veces también necesitarás que se ejecuten más rápido o de manera más eficiente por una variedad de razones. En este artículo voy a tratar de algunas de las opciones de rendimiento que tienes disponibles para que tus jobs se ejecuten más rápido.
Al igual que con la monitorización, hay varios niveles diferentes en los que puedes tunear jobs. Por ejemplo, si tuvieras una red súper rápida, eso haría que muchos de tus jobs Spark fueran más rápidos porque el shuffle suele ser uno de los pasos más costosos en un job Spark. Lo más probable es que no tengas mucha capacidad para controlar este tipo de cosas; por lo tanto, aquí voy a hablar de cosas que puedes controlar mediante código o configuración.
Hay diversas partes diferentes de los jobs Spark que es posible que quieras optimizar, y es útil ser específico. Algunas de las áreas pueden ser las siguientes:
- Opciones de diseño a nivel de código (por ejemplo, RDDs versus DataFrames)
- Datos en reposo
- Joins
- Agregaciones
- Datos en vuelo
- Propiedades individuales de la aplicación
- Dentro de la máquina virtual Java (JVM) de un ejecutor
- Nodos worker
- Propiedades de despliegue y a nivel de clúster
Esta lista no es de ninguna manera exhaustiva, pero al menos fundamenta lo que se cubre en este artículo. Además, hay dos formas de intentar lograr las características de ejecución que nos gustaría en los jobs Spark. Podemos hacerlo indirectamente estableciendo valores de configuración o cambiando el entorno de ejecución. Esto debería mejorar las cosas en todas las aplicaciones o jobs de Spark. De forma alternativa, podemos intentar cambiar directamente las características de ejecución o las opciones de diseño a nivel de tarea, etapa o job individual de Spark. Este tipo de correcciones son muy específicas para esa área de nuestra aplicación y, por lo tanto, tienen un impacto general limitado. Hay muchas cosas que se encuentran a ambos lados de la división directa frente a la indirecta, y vamos a establecer límites en consecuencia. En este post solo voy a hablar de las optimizaciones indirectas.
Una de las mejores cosas que puedes hacer para descubrir cómo mejorar el rendimiento es implementar una buena monitorización y seguimiento del historial de jobs. Sin esta información, puede resultar difícil saber si realmente estás mejorando el rendimiento del job.
Elecciones de diseño
Aunque las buenas elecciones de diseño parecen una forma algo obvia de optimizar el rendimiento, muchas veces no damos prioridad a este paso del proceso. Cuando diseñas tus aplicaciones, tomar buenas decisiones de diseño es muy importante porque no solo te ayuda a escribir mejores aplicaciones Spark, sino también a que se ejecuten de una manera más estable y consistente a lo largo del tiempo y frente a cambios o variaciones externas.
Scala, Java, Python o R
Esta pregunta es casi imposible de responder en el sentido general porque dependerá mucho de tu caso de uso. Por ejemplo, si quieres ejecutar un aprendizaje automático en un único nodo después de realizar un job ETL de larga duración, podríamos recomendar ejecutar tu código ETL como código SparkR y luego usar el ecosistema de aprendizaje automático masivo de R para ejecutar tus algoritmos de aprendizaje automático en un único nodo. Esto te ofrece lo mejor de ambos mundos y aprovecha la potencia de R y de Spark sin sacrificios. Las APIs estructuradas de Spark son consistentes en todos los lenguajes en términos de velocidad y estabilidad. Eso significa que debes programar con el lenguaje con el que te sientas más cómodo o que sea más adecuado para tu caso de uso.
Las cosas se complican un poco más cuando necesites incluir transformaciones personalizadas que no se pueden crear con las APIS estructuradas. Estas pueden manifestarse como transformaciones RDD o funciones definidas por el usuario (UDFs). Si vas a hacer esto, R y Python no son necesariamente la mejor opción simplemente por cómo se ejecuta realmente. También es más difícil proporcionar garantías más estrictas de tipos y manipulaciones cuando estás definiendo funciones que saltan de un lenguaje a otro. Resulta que usar Python para la mayor parte de la aplicación y migrar parte a Scala o escribir UDFs específicas en Scala a medida que la aplicación evoluciona es una técnica potente que permite un buen equilibrio entre usabilidad, mantenibilidad y rendimiento general.
Dataframes, SQL, Datasets y RDDs
Esta pregunta también surge con frecuencia. La respuesta es simple. En todos los lenguajes, los DataFrames, Datasets y SQL son equivalentes en velocidad. Esto significa que si usas DataFrames en cualquiera de estos lenguajes, el rendimiento es el mismo. Sin embargo, si vas a definir UDFs, tendrás un impacto de rendimiento al escribirlos en Python o R y, hasta cierto punto, un impacto de rendimiento menor en Java y Scala. Si quieres optimizar para obtener un rendimiento puro, te conviene intentar volver a los DataFrames y SQL lo antes posible. Aunque todo el código de DataFrame, SQL y Dataset se compile en RDDs, el motor de optimización de Spark escribirá un código RDD "más óptimo" que el que puedas hacer manualmente y, ciertamente, lo hará con mucho menos esfuerzo. Además, perderás las nuevas optimizaciones que se añadan al motor SQL de Spark en cada versión.
Por último, si quieres utilizar RDDs, definitivamente se recomienda utilizar Scala o Java. Si eso no es posible, te recomiendo que restrinjas el "área de superficie" de los RDDs de tu aplicación al mínimo. Eso es porque cuando Python ejecuta código RDD, serializa una gran cantidad de datos hacia y desde el proceso Python. Esto es muy costoso cuando se ejecuta sobre Big Data y también puede disminuir la estabilidad. Aunque no es exactamente relevante para el ajuste del rendimiento, es importante tener en cuenta que también hay algunas lagunas en cuanto a la funcionalidad que viene soportada en cada uno de los lenguajes de Spark (trataré sobre esto en otro artículo).
Serialización de objetos en los RDDs
Cuando trabajes con tipos de datos personalizados, querrás serializarlos usando Kryo porque es más compacto y mucho más eficiente que la serialización de Java. Sin embargo, esto tiene el inconveniente de registrar las clases que utilizarás en tu aplicación.
Puedes usar la serialización Kryo configurando spark.serializer en org.apache.spark.serializer.KryoSerializer. También deberás registrar explícitamente las clases a usar con el serializador Kryo mediante la configuración spark.kryo.classesToRegister. También hay una serie de parámetros avanzados para controlar esto con mayor detalle que se describen en la documentación de Kryo.
Para registrar tus clases, usa el SparkConf que acabas de crear y pasa los nombres de tus clases:
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
Configuraciones del Cluster
Esta área tiene enormes beneficios potenciales, pero probablemente sea una de las más difíciles de prescribir debido a la variación entre el hardware y los casos de uso. En general, monitorizar el rendimiento de las propias máquinas sea la técnica más útil para optimizar las configuraciones del clúster, especialmente cuando se trata de ejecutar múltiples aplicaciones en un único clúster (sean Spark o no).
Tamaño y uso compartido del clúster / aplicación
De alguna manera, esto se reduce a un problema de planificación y distribución de recursos; sin embargo, existen muchas opciones sobre cómo quieres compartir los recursos a nivel de clúster o de aplicación. Échale un vistazo a las propiedades disponibles para la configuración de aplicaciones Spark en la documentación oficial. Dedicaré un artículo posterior para hablar específicamente sobre el tema de la configuración Spark.
Asignación dinámica
Spark proporciona un mecanismo para ajustar dinámicamente los recursos que ocupa tu aplicación en función de la carga de trabajo. Esto significa que tu aplicación puede devolver recursos al clúster si ya no se utilizan y volver a solicitarlos más tarde cuando haya demanda. Esta funcionalidad es particularmente útil si múltiples aplicaciones comparten recursos en el clúster Spark y está desactivada de forma predeterminada, estando disponible en todos los administradores de clústeres generales; es decir, en modo standalone, modo YARN y modo Mesos. Si quieres habilitar esta funcionalidad, debes establecer spark.dynamicAllocation.enabled en true. La documentación de Spark presenta una serie de parámetros individuales que puedes ajustar.
Planificación
Existen una serie de optimizaciones potenciales diferentes que se pueden aprovechar para ayudar a que los jobs de Spark se ejecuten en paralelo con los pools de planificadores o ayudar a que las aplicaciones Spark se ejecuten en paralelo con algo como la asignación dinámica o la configuración de max-executor-cores. Las optimizaciones de planificación conllevan algo de investigación y experimentación, y desafortunadamente no hay soluciones súper rápidas más allá de configurar spark.scheduler.mode en FAIR para permitir una mejor compartición de recursos entre múltiples usuarios, o configurar --max-executor-cores, que especifica el número máximo de núcleos de ejecutores que necesitará tu aplicación. Especificar este valor puede garantizar que tu aplicación no consuma todos los recursos del clúster. También puedes cambiar el valor predeterminado, dependiendo de tu administrador de clúster, estableciendo la configuración spark.cores.max en un valor predeterminado de tu elección. Los administradores de clústeres también proporcionan algunas primitivas de planificación que pueden ser útiles cuando optimizas múltiples aplicaciones Spark, como se explica en la documentación de Spark.
Datos en reposo
La mayoría de las veces, cuando estás guardando datos, se leerán muchas veces cuando otras personas de tu organización accedan a los mismos conjuntos de datos para ejecutar diferentes análisis. Asegurarte de que estés almacenando tus datos para lecturas eficientes más adelante es absolutamente esencial para proyectos exitosos de big data. Esto implica elegir tu sistema de almacenamiento, elegir el formato de datos y aprovechar características como el particionamiento de los datos en algunos formatos de almacenamiento.
Almacenamiento de datos a largo plazo basado en archivos
Hay una serie de formatos de archivo diferentes disponibles, desde archivos simples de valores separados por comas (CSV) y blobs binarios, hasta formatos más sofisticados como Apache Parquet. Una de las formas más fáciles de optimizar tus jobs Spark es seguir las mejores prácticas cuando almacenas los datos y elegir el formato de almacenamiento más eficiente posible.
En general, siempre debes favorecer los tipos binarios estructurados para almacenar tus datos, especialmente cuando se va a acceder a ellos con frecuencia. Aunque los archivos como "CSV" parecen estar bien estructurados, son muy lentos de parsear y muchas veces también están llenos de casos límite y puntos débiles. Por ejemplo, los caracteres de nueva línea que se hayan escapado incorrectamente a menudo pueden causar muchos problemas cuando se lee una gran cantidad de archivos. El formato de archivo más eficiente que puedes elegir generalmente es Apache Parquet. Parquet almacena datos en archivos binarios con almacenamiento orientado a columnas y también registra algunas estadísticas sobre cada archivo que hacen posible omitir rápidamente los datos que no son necesarios para una consulta. Está bien integrado con Spark mediante la fuente de datos de Parquet incorporada.
Tipos de archivos particionables y compresión
Sea cual sea el formato de archivo que elijas, debes asegurarte de que sea "particionable", lo que significa que diferentes tareas puedan leer diferentes partes del archivo en paralelo. Cuando leamos el archivo, todos los núcleos pueden hacer parte del trabajo. Eso es porque el archivo se puede particionar. Si no usamos un tipo de archivo particionable, por ejemplo, algo como un archivo JSON mal formado, tendremos que leer el archivo completo en una única máquina, lo que reduce en gran medida el paralelismo.
El lugar principal en el que interviene el particionamiento es en los formatos de compresión. Un archivo ZIP o un archivo TAR no se puede particionar, lo que significa que incluso si tenemos 10 archivos en un archivo ZIP y 10 núcleos, solo un núcleo puede leer esos datos porque no podemos paralelizar el acceso al archivo ZIP. Este es un mal uso de recursos. Por el contrario, los archivos comprimidos con gzip, bzip2 o lz4 generalmente se pueden particionar si fueron escritos por un framework de procesamiento paralelo como Hadoop o Spark. Para tus propios datos de entrada, la forma más sencilla de convertirlos en particionables es subirlos como archivos separados, idealmente cada uno no mayor que unos pocos cientos de megabytes.
Particionamiento de tablas
El particionamiento de tablas se refiere al almacenamiento de archivos en directorios separados según una clave, como el campo de fecha de los datos. Los administradores de almacenamiento como Apache Hive apoyan este concepto, al igual que muchas de las fuentes de datos integradas en Spark. Particionar tus datos correctamente le permite a Spark omitir muchos archivos irrelevantes cuando solo requiere datos con un rango específico de claves. Por ejemplo, si los usuarios filtran con frecuencia por "fecha" o "ID de cliente" en sus consultas, particiona tus datos por esas columnas. Esto reducirá en gran medida la cantidad de datos que los usuarios finales deben leer para la mayoría de las consultas y, por lo tanto, aumentará drásticamente la velocidad.
La única desventaja del particionamiento, sin embargo, es que si particionas con una granularidad demasiado fina, puede resultar en muchos archivos pequeños y una gran sobrecarga al intentar listar todos los archivos en el sistema de almacenamiento.
Agrupamiento (o bucketing)
La esencia es que agrupar tus datos le permite a Spark “particionar previamente” los datos de acuerdo a cómo es probable que los lectores ejecuten los joins o las agregaciones. Esto puede optimizar el rendimiento y la estabilidad porque los datos se pueden distribuir consistentemente entre particiones en lugar de sesgarlos en solo una o dos. Por ejemplo, si se ejecutan joins con frecuencia por una columna inmediatamente después de una lectura, puedes usar el bucketing para asegurarte de que los datos estén bien particionados de acuerdo con esos valores. Esto puede ayudar a prevenir un shuffle antes de un join y, por lo tanto, ayudar a acelerar el acceso a los datos. El bucketing generalmente funciona de la mano con el particionamiento como una segunda forma de dividir físicamente los datos.
El número de ficheros
Además de organizar tus datos en buckets y particiones, también querrás considerar la cantidad de archivos y el tamaño de los archivos que estás almacenando. Si hay muchos archivos pequeños, pagarás un precio por listar y obtener cada uno de esos archivos individuales. Por ejemplo, si estás leyendo datos del Sistema de archivos distribuido de Hadoop (HDFS), estos datos se administran en bloques que tienen un tamaño de hasta 128 MB (de forma predeterminada). Esto significa que si tienes 30 archivos, de 5 MB cada uno, tendrás que solicitar potencialmente 30 bloques, aunque los mismos datos podrían haber cabido en 2 bloques (150 MB en total).
Tener muchos archivos pequeños hará que el planificador trabaje mucho más para localizar los datos y lanzar todas las tareas de lectura. Esto puede aumentar sobrecarga de la red y de la planificación del job. Tener menos archivos grandes alivia la tarea del planificador, pero también hará que las tareas se ejecuten durante más tiempo. En este caso, sin embargo, siempre puedes lanzar más tareas que archivos de entrada si quieres más paralelismo: Spark dividirá cada archivo entre múltiples tareas suponiendo que estés usando un formato particionable. En general, se recomienda dimensionar tus archivos para que cada uno contenga al menos algunas decenas de megabytes de datos.
Una forma de controlar el particionamiento de datos cuando escribas los datos es mediante una opción de escritura introducida en Spark 2.2. Para controlar cuántos registros entran en cada archivo, puedes especificar la opción maxRecordsPerFile en la operación de escritura.
Localidad de datos
Otro aspecto que puede ser importante en entornos de clústeres compartidos es la localidad de los datos. La localidad de datos básicamente especifica una preferencia por que ciertos nodos contengan ciertos datos, en lugar de tener que intercambiar estos bloques de datos a través de la red. Si ejecutas el sistema de almacenamiento en los mismos nodos que Spark, y el sistema soporta sugerencias de localidad, Spark intentará planificar tareas cerca de cada bloque de datos de entrada. Por ejemplo, el almacenamiento HDFS ofrece esta opción. Hay varias configuraciones que afectan a la localidad, pero generalmente se usará de forma predeterminada si Spark detecta que está usando un sistema de almacenamiento local. También observarás las tareas de lectura de datos marcadas como "local" en la interfaz de usuario web de Spark.
Recopilación de estadísticas
Spark incluye un optimizador de consultas basado en costes que planifica las consultas en función de las propiedades de los datos de entrada cuando se utilizan las APIs estructuradas. Sin embargo, para permitir que el optimizador basado en costes tome este tipo de decisiones, debes recopilar (y mantener) estadísticas sobre tus tablas. Hay dos tipos de estadísticas: estadísticas a nivel de tabla y a nivel de columna. La recopilación de estadísticas solo está disponible en tablas con nombre, no en DataFrames o RDDs arbitrarios.
Para recopilar estadísticas a nivel de tabla, puedes ejecutar el siguiente comando:
ANALYZE TABLE table_name COMPUTE STATISTICS
Para recopilar estadísticas a nivel de columna, puedes nombrar las columnas específicas:
ANALYZE TABLE table_name COMPUTE STATISTICS FOR COLUMNS column_name1, column_name2, ...
Las estadísticas a nivel de columna son más lentas de recopilar, pero ofrecen más información para que el optimizador basado en costes pueda utilizar sobre esas columnas de datos. Ambos tipos de estadísticas pueden ayudar en joins, agregaciones, filtros y una serie de otras cosas potenciales (por ejemplo, elegir automáticamente cuándo realizar un broadcast join). Puedes consultar la guía oficial de Spark SQL para ver diferentes optimizaciones basadas en estadísticas.
Configuraciones de Shuffle
La configuración del servicio de shuffle externo de Spark muchas veces puede aumentar el rendimiento porque permite a los nodos leer datos de shuffle de máquinas remotas incluso cuando los ejecutores de esas máquinas están ocupados (por ejemplo, con la recolección de basura). Sin embargo, esto tiene un coste en complejidad y mantenimiento, por lo que es posible que no valga la pena en tu implementación. Más allá de configurar este servicio externo, también hay una serie de configuraciones para los shuffles, como la cantidad de conexiones concurrentes por ejecutor, aunque estas suelen tener buenos valores predeterminados.
Además, para jobs basados en RDD, el formato de serialización tiene un gran impacto en el rendimiento del shuffle; siempre es preferible la serialización Kryo sobre la serialización de Java, como se ha descrito anteriormente. Además, para todos los jobs, el número de particiones de un shuffle es importante. Si tienes muy pocas particiones, entonces muy pocos nodos trabajaran y puede haber sesgo, pero si tienes demasiadas particiones, hay una sobrecarga para lanzar cada uno que puede comenzar a dominar. Intenta enfocarte a tener al menos unas pocas decenas de megabytes de datos por partición de salida en tu shuffle.
Presión de memoria y recolección de basura
Durante la ejecución de jobs Spark, las máquinas del ejecutor o del driver pueden tener dificultades para completar sus tareas debido a la falta de memoria suficiente o "presión de memoria". Esto puede ocurrir cuando la ejecución de una aplicación ocupa demasiada memoria o cuando se ejecuta la recolección de basura con demasiada frecuencia o tarda en ejecutarse, puesto que se crean grandes cantidades de objetos en la JVM y, posteriormente, se recolectan elementos no utilizados porque ya no se utilizan. Una estrategia para aliviar este problema es asegurarse de utilizar las APIs estructuradas tanto como sea posible. De esta manera no solo aumentará la eficiencia con la que se ejecutarán tus jobs Spark, sino que también se reducirá en gran medida la presión de la memoria porque los objetos JVM nunca se realizan y Spark SQL sólo ejecuta el procesamiento con su formato interno.
La documentación de Spark incluye algunos consejos excelentes sobre cómo ajustar la recolección de basura para aplicaciones basadas en RDD y UDF.
Medición del impacto de la recolección de basura
El primer paso en la optimización de la recolección de basura es recopilar estadísticas sobre la frecuencia con la que se realiza la recolección de basura y la cantidad de tiempo que lleva. Se puede hacer esto añadiendo -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps a las opciones de la JVM de Spark usando el parámetro de configuración spark.executor.extraJavaOptions. La próxima vez que ejecutes tu job Spark observarás mensajes impresos en los logs del worker cada vez que ocurra una recolección de basura. Estos logs estarán en los nodos worker de tu clúster (en los archivos stdout de sus directorios de trabajo), no en el driver.
Tuneo de la recolección de basura
Para seguir ajustando la recolección de basura, primero debes comprender información básica sobre la administración de memoria en la JVM:
- El espacio de almacenamiento dinámico de Java se divide en dos regiones: joven y antigua. La generación joven está destinada a contener objetos de corta duración, mientras que la generación antigua (Old) está destinada a objetos con una vida útil más larga.
- La generación joven se divide en tres regiones: Eden, Survivor1 y Survivor2.
A continuación, se muestra una descripción simplificada del procedimiento de recolección basura:
- Cuando Eden está lleno, se ejecuta una recolección de basura menor en Eden y los objetos que están vivos de Eden y Survivor1 se copian en Survivor2.
- Las regiones de Survivor se intercambian.
- Si un objeto tiene la edad suficiente o si Survivor2 está lleno, ese objeto se mueve a Old.
- Finalmente, cuando Old está casi lleno, se invoca una recolección de basura completa. Esto implica rastrear todos los objetos en la memoria dinámica, eliminar los no referenciados y mover los demás para llenar el espacio no utilizado, por lo que generalmente es la operación de recolección de basura más lenta.
El objetivo del ajuste de la recolección de basura en Spark es garantizar que solo los conjuntos de datos almacenados en caché de larga duración se almacenen en la generación Old y que la generación joven tenga el tamaño suficiente para almacenar todos los objetos de corta duración. Esto ayudará a evitar que recolecciones de basura completas recolecten objetos temporales creados durante la ejecución de la tarea. A continuación, se muestran algunos pasos que pueden resultar útiles.
Recopila estadísticas de recolección de basura para determinar si se está ejecutando con demasiada frecuencia. Si se invoca una recolección de basura completa múltiples veces antes de que se complete una tarea, significa que no hay suficiente memoria disponible para ejecutar tareas, por lo que debes disminuir la cantidad de memoria que Spark usa para el almacenamiento en caché (spark.memory.fraction).
Si hay demasiadas recolecciones menores pero no muchas recolecciones de basura importantes, sería útil asignar más memoria para Eden. Se puede establecer el tamaño de Edén para que sea una sobreestimación de la cantidad de memoria que necesitará cada tarea. Si se determina que el tamaño de Eden es E, se puede establecer el tamaño de la generación Joven usando la opción -Xmn=4/3*E. (La ampliación de 4/3 es para tener en cuenta también el espacio utilizado por las regiones Survivor).
Por ejemplo, si tu tarea es leer datos de HDFS, la cantidad de memoria utilizada por la tarea se puede estimar utilizando el tamaño del bloque de datos leído de HDFS. Ten en cuenta que el tamaño de un bloque descomprimido suele ser dos o tres veces el tamaño del bloque, por lo que si quieres tener un espacio de trabajo con tres o cuatro tareas, y el tamaño del bloque HDFS es de 128 MB, podemos estimar que el tamaño de Eden es de 43,128 MB.
Prueba el recolector de basura G1GC con -XX:+UseG1GC. Se puede mejorar el rendimiento en algunas situaciones en las que la recolección de basura es un cuello de botella y no hay forma de reducirla más dimensionando las generaciones. Ten en cuenta que con tamaños de memoria de ejecutores grandes, puede ser importante aumentar el tamaño de la región G1 con -XX:G1HeapRegionSize.
Monitoriza cómo cambia la frecuencia y el tiempo que tarda la recolección de basura con la nueva configuración.
La experiencia sugiere que el efecto del tuning de la recolección de basura depende de tu aplicación y de la cantidad de memoria disponible. Hay muchas más opciones de ajuste descritas en Internet, pero a un alto nivel, administrar la frecuencia con la que se realiza la recolección de basura completa puede ayudar a reducir la sobrecarga. Puedes especificar indicadores de ajuste de recolección de basura para ejecutores configurando spark.executor.extraJavaOptions en la configuración de un job.
Referencias:
Video: Tuning y optimización del rendimiento de jobs de Apache Spark.
SPARK+AI SUMMIT 2020. Organizado por Databricks.
En un próximo artículo trataré sobre un tipo de tuning Spark aplicado a etapas o jobs específicos.
Buen aprendizaje !!! :-)
0 comentarios:
Publicar un comentario
Gracias por participar en esta página.