Searching...
miércoles, 4 de abril de 2018

Ingestión de datos con Spark y Kafka

Un componente de arquitectura importante de cualquier plataforma de datos son aquellas piezas que gestionan la ingestión de datos. En muchos de los entornos actuales de "big data", los datos involucrados son de tal escala en términos de rendimiento (piense en la API  "firehose" de Twitter) o volumen (por ejemplo, el proyecto 1000 Genomas) que deben considerarse cuidadosamente distintas técnicas y herramientas.


En los últimos años, Apache Kafka y Apache Spark se han convertido en herramientas populares en la caja de herramientas del arquitecto de datos, ya que están equipados para controlar una amplia variedad de escenarios de ingestión de datos y se han utilizado con éxito en entornos de misión crítica donde la demanda es alta.

En este tutorial, te guío a través de algunos de los conceptos básicos del uso de Kafka y Spark para ingerir datos. Aunque los ejemplos no funcionan a escala empresarial, se pueden aplicar las mismas técnicas en entornos exigentes.

Requisitos Previos

Este es un tutorial práctico que cualquier persona con experiencia en programación puede seguir. Si tus habilidades de programación están oxidadas, o tienes una mentalidad técnica, pero eres nuevo en programación, he hecho todo lo posible para que este tutorial sea accesible. Aun así, hay algunos requisitos previos en términos de conocimiento y herramientas.

Se usarán las siguientes herramientas:

·       Git: para administrar y clonar el código fuente.
·       Docker: para ejecutar algunos servicios en contenedores.
·       Lenguaje de programación Java 8 (Oracle JDK) y un entorno de ejecución (runtime) utilizado por Maven y Scala.
·       Maven 3: para compilar el código que escribimos.
·       Algún tipo de editor de código o IDE: en este caso he utilizado la edición de comunidad de IntelliJ para crear este tutorial.
·       Scala: lenguaje de programación que usa el entorno de ejecución de Java. Todos los ejemplos están escritos usando Scala 2.12. Nota: No necesitas descargar Scala.

Además, necesitarás una cuenta de desarrollador de Twitter. Si tiene una cuenta normal de Twitter, puedes obtener las claves de la API verificando tu cuenta a través de SMS.

Una nota sobre convenciones. A lo largo de este tutorial, verás algunos comandos que comienzan con un indicador (un signo de dólar) y se escriben en una fuente con un espacio simple. Estos están destinados a ser comandos que se ejecutan en un terminal. Para hacer esto, simplemente copia el comando excluyendo el indicador, pégalo en tu terminal, y luego presiona la tecla de retorno.

Requisito Previo: comprobación de herramientas


Ejecuta los siguientes comandos y compara tu salida contra lo que se espera.


Si alguno de estos comandos falla con un error, sigue las instrucciones para instalarlos en tu sistema operativo.

Requisito Previo: crear y configurar una aplicación de Twitter

Para crear una aplicación de Twitter, navega a https://apps.twitter.com/. Presiona el botón marcado "Crear nueva aplicación". Estará en la esquina superior derecha o en el medio de la ventana de tu navegador, dependiendo de si ya has creado una aplicación de Twitter.

Se te pedirá que completes varios campos, algunos de los cuales son obligatorios. Aunque el formulario indique que se requiere un sitio web, puedes usar una dirección localhost.

Una vez que hayas creado la aplicación, debes ser redirigido a la pantalla de configuración de la aplicación. Debajo del nombre de la aplicación hay una fila de elementos de menú. Haz clic en el que dice "Claves y Tokens de Acceso".

En la parte inferior de esta página hay un botón marcado, "Crear mi token de acceso". Presiónalo. Ahora debe haber una serie de campos en la ventana del navegador. Solo tienes que preocuparte por cuatro de ellos:

1.       Clave del Consumidor (en la sección de Configuración de la Aplicación)
2.       Secreto del Consumidor (en la sección de Configuración de la Aplicación)
3.       Token de Acceso (en la sección de Tu Token de Acceso)
4.       Secreto del Token de Acceso (en la sección de Tu Token de Acceso)

Puedes copiarlos en un archivo de texto para usarlos más tarde o dejar abierta la ventana del navegador hasta más adelante en el tutorial cuando necesites los valores.

Obtención y compilación del código


El código de este tutorial se puede encontrar en GitHub.

$ git clone git@github.com:silicon-valley-data-science/ingest-spark-kafka.git
$ cd ingest-spark-kafka
$ mvn clean package

Hay dos archivos que serán importantes para el resto de este tutorial. El primero se puede encontrar en:

ingest-spark-kafka/src/main/scala/com/svds/blogs/ingest_spark_kafka/TwitterIngestTutorial.scala

Contiene plantillas que se completarán más adelante. El otro archivo a tener en cuenta es:

ingest-spark-kafka/src/main/scala/com/svds/blogs/ingest_spark_kafka/TwitterIngestFinal.scala

Contiene la versión final del código con la que debes terminar si trabajas durante todo el tutorial.

Validar la configuración de Twitter

Lo primero que debes hacer es asegurarte de tener un entorno adecuado que pueda conectarse a la API de Twitter. Copia los cuatro valores de la configuración de la aplicación de Twitter en sus respectivos lugares en ingest-spark-kafka/twitter-secrets.properties.

Luego, compila y ejecuta TwitterIngestTutorial. Puedes ejecutarlo usando tu IDE o con maven. Para ejecutarlo con maven, ejecuta el siguiente comando (demostración):

$ mvn package exec:java -Dexec.mainClass="com.svds.blogs.ingest_spark_kafka.TwitterIngestTutorial"

La salida debe contener el texto "Todas las variables de twitter están presentes" justo antes de la línea que dice "[INFO] BUILD SUCCESS".

Configurar un contenedor Kafka

Ahora que sabes que tu configuración de Twitter es correcta, pongamos en marcha un contenedor de Kafka. Si has utilizado Docker anteriormente, probablemente sea una buena idea cerrar todos tus contenedores Docker antes de continuar, para evitar disputas por recursos.

IMPORTANTE: El cliente Kafka es exigente con la garantía de que el DNS y las direcciones IP coincidan cuando se conecta. Para conectarte, debes crear una entrada del archivo host que mapee un host llamado "kafka" a la dirección IP "127.0.0.1" (a.k.a. "localhost"). En entornos Linux/Unix, este archivo se encuentra en /etc/hosts, mientras que en máquinas Windows estará en %SystemRoot%\System32\drivers\etc\host. Simplemente añade la siguiente línea:

127.0.0.1 kafka

Utilizaremos un contenedor Kafka creado por Spotify, ya que viene cuidadosamente incorporado con Zookeeper. Es una tecnología menos con la que tendrás que familiarizarte. Bájatelo y arranca el contenedor de esta manera (demostración):

$ docker pull spotify/kafka

$ docker run -p 2181:2181 -p 9092:9092 --hostname kafka --name test_kafka --env ADVERTISED_PORT=9092 --env ADVERTISED_HOST=kafka spotify/kafka

Vamos a analizar estos comandos. El primer comando es simple, simplemente descarga la imagen del docker llamada "spotify/kafka" que se ha subido al hub de Docker. El siguiente comando ejecuta esa imagen localmente.

run significa que la imagen se ejecutará ahora. -p 2181: 2181 -p 9092: 9092 mapea dos puertos locales a dos puertos del contenedor (puerto local a la izquierda, puerto del contenedor a la derecha). Piensa en esto de la misma manera que haces un redireccionamiento o tunelado de puertos entre clientes y servidores SSH para enrutar el tráfico de forma segura. --hostname kafka le dice al contenedor que su nombre de host será kafka; no significa nada fuera del contenedor. --name test_kafka le da un nombre al contenedor. Esto será útil si arrancas y detienes el contenedor (como lo harás momentáneamente). --env ADVERTISED_PORT = 9092 --env ADVERTISED_HOST = kafka pasa las variables de entorno al entorno de tiempo de ejecución del contenedor. Estos es lo mismo que si hubieras emitido un comando export FOO = 'bar' desde un terminal dentro del contenedor. El parámetro final es el nombre de la imagen desde la que se origina el contenedor.

Ejecución de algunos comandos Kafka

Luego detenemos el contenedor y lo reiniciamos en segundo plano. Presiona "CTRL + C" para detener el contenedor. Debe registrar algo sobre la espera a que mueran ZooKeeper y Kafka (¡los procesos!). Reinicia el contenedor con este comando:

$ docker start test_kafka

Debería ejecutarse rápidamente. Ahora podemos conectarnos al contenedor y familiarizarnos con algunos comandos de Kafka. Inicia sesión en el contenedor de esta manera:

$ docker exec -it test_kafka /bin/bash

Esto invoca al cliente Docker y le dice que quieres conectar un TTY interactivo al contenedor llamado test_kafka y arrancar un shell bash. Sabrás que estás dentro del contenedor si el indicador cambia a algo que se ve así:

root@kafka:/#

Lo primero que haremos será crear el topic de Kafka. Un topic de Kafka es una forma de agrupar datos en una única aplicación. Otros sistemas de mensajes llaman a esto una "cola"; es lo mismo. Desde el terminal del contenedor que acabas de iniciar, ejecuta este comando (¡recuerda eliminar el indicador!):

root@kafka:/# /opt/kafka_2.11-0.10.1.0/bin/kafka-topics.sh --zookeeper kafka:2181 --create --topic test_topic --partitions 3 --replication-factor 1

Están sucediendo muchas cosas aquí. Vamos a desarrollar este comando. kafka-topics.sh es script que encapsula un proceso Java que actúa como cliente de un punto final del cliente Kafka que trata los topics. --zookeeper kafka: 2181 le dice al cliente dónde encontrar ZooKeeper. Kafka utiliza ZooKeeper como un servicio de directorio para realizar un seguimiento del estado de los miembros del clúster Kafka. ZooKeeper también tiene roles en las operaciones de gestión interna del clúster (elección del líder, sincronización, etc.). El cliente consulta la información del clúster a ZooKeeper, de modo que pueda contactarse directamente con los nodos Kafka. --create indica una operación particular que creará un topic. --topic asigna un nombre al topic. --partitions 3 indica en cuántas particiones se divide este topic. Las particiones entran en juego cuando queremos lograr un mayor rendimiento. La mejor información que he visto sobre cómo elegir el número de particiones es una publicación de blog del colaborador de Kafka Jun Rao. Elegimos tres aquí porque es más de uno. --replication-factor 1 describe la cantidad de copias redundantes de los datos. En nuestro caso, ese valor es solo "1", por lo que no hay redundancia en absoluto, aunque es de esperar que esto ocurra con un clúster que solo tenga un nodo.

Puedes comprobar que se ha creado el topic cambiando el comando a --list:

root@kafka:/# /opt/kafka_2.11-0.10.1.0/bin/kafka-topics.sh --zookeeper kafka:2181 --list

Ahora que tenemos un topic, podemos enviarle algunos mensajes. Eso implica un script diferente de Kafka, el productor de la consola.

root@kafka:/# /opt/kafka_2.11-0.10.1.0/bin/kafka-console-producer.sh --topic test_topic --broker-list kafka:9092

--broker-list kafka:9092 es análogo a especificar los hosts de ZooKeeper, pero en su lugar especifica un miembro del clúster Kafka al que contactar directamente. (No tengo idea de por qué kafka-topics.sh no soporta esto).

Se puede pensar que este comando está suspendido, pero en realidad está en un bucle esperando a que enviemos algunos mensajes al topic. Para ello, escribe un mensaje y presiona la tecla de retorno. Continúa enviando algunos mensajes al topic. Cuando hayas terminado, presiona CTRL-C.

Ahora podemos reproducir estos mensajes utilizando el consumidor de la consola. Usa este comando:

root@kafka:/# /opt/kafka_2.11-0.10.1.0/bin/kafka-console-consumer.sh --topic test_topic --bootstrap-server kafka:9092 --from-beginning

Tarda unos segundos en arrancar. Después de eso, deberías ver tantos mensajes como hayamos producido anteriormente en la salida. CTRL-C nos sacará de esta aplicación. Si lo ejecutas nuevamente, deberías ver el mismo resultado. Esto se debe a que –from-beginning le dice a Kafka que quieres comenzar a leer el topic desde el principio. Si omites ese argumento, el consumidor solo leerá nuevos mensajes. Por detrás, Kafka hará un seguimiento del offset del topic de los consumidores en ZooKeeper (si se usan grupos), o puedes hacerlo tú mismo. Puedes experimentar con esto por tu cuenta ejecutando el consumidor de la consola y el productor de la consola al mismo tiempo en diferentes terminales.

Si detienes el consumidor, por favor arráncalo de nuevo. Lo usaremos más adelante para validar que estamos enviando mensajes de Twitter a Kafka.

Creación de un cliente Kafka

Volvamos a editar TwitterIngestTutorial nuevamente. Contiene una plantilla de clase trozeada llamada KafkaWriter. Este paso la completará para que podamos enviar mensajes a Kafka.

Primero, añadimos algunas propiedades de configuración a la variable config. Añade las siguientes líneas después del comentario que dice "añadir configuración aquí".

// add configuration settings here.
put("bootstrap.servers", brokers)
put("topic", topic)
put("key.serializer", classOf[StringSerializer])
put("value.serializer", classOf[StringSerializer])

Reconocerás bootstrap.servers del comando del consumidor de la consola que acabas de usar. Es lo mismo, excepto que, en este caso, el valor se suministra a partir de una cadena del constructor. El topic debe ser autoexplicativo en este punto. Los últimos dos valores, key.serializer y value.serializer indican al cliente cómo ordenar los datos que se envían a Kafka. En este caso, hemos indicado que se esperan cadenas.

Luego modificamos el método write() para enviar los datos a Kafka. Antes del método write() puedes ver que se crea una instancia de KafkaProducer. El método write() usa este productor para enviar datos a Kafka. Primero creamos un ProducerRecord, y luego usamos el producer para enviarlos con send().

val record = new ProducerRecord[String, String](this.topic, key, data)
producer.send(record).get(5, TimeUnit.SECONDS)

Como se puede ver, la instancia record está parametrizada para que coincida con los tipos esperados por los serializadores descritos en las configuraciones key.serializer y value.serializer. Dado que producer.send() devuelve una instancia java.util.concurrent.Future, llamamos a su método get() y bloqueamos hasta que vuelva.

Este es un ejemplo de un cliente síncrono. Los clientes síncronos son más fáciles de escribir, pero a menudo no funcionan bien en configuraciones altamente concurrentes (multiproceso). Este cliente podría modificarse para que sea asíncrono introduciendo una cola y un pool de ejecutores en el KafkaWriter. Esto se deja como un ejercicio para el lector.

El último paso para el cliente Kafka es finalizar el método close() haciendo que se llame a producer.close().

Inicialización de Spark

Hay dos pasos para inicializar Spark para el streaming. Primero se crea una instancia SparkConf, y luego se configura un StreamingContext. Coloca este código después de la comprobación de la validación de Twitter:

val conf = new SparkConf()
      .setMaster("local[4]")
      .setAppName("Twitter2Kafka")
val ssc = new StreamingContext(conf, Seconds(5))

En un escenario de producción, muchos de los valores de configuración de spark provienen del entorno, frente a la especificación aquí en el código. local[4] le dice a Spark que use cuatro ejecutores para el paralelismo. El parámetro Seconds del constructor StreamingContext indica que nuestros "microbatches" tendrán cinco segundos de ancho.

Flujo de entrada

Ahora vamos a crear un flujo de entrada para procesarlo. El objeto TwitterUtils abstrae la API de Twitter y nos da una buena interfaz DStream para los datos. Básicamente, sondea la API de Twitter para detectar nuevos eventos y realiza un seguimiento de los eventos que ya se han procesado.

Si tu trabajo fuera crear una interfaz de flujo en una API heredada de tu empresa, la clase TwitterUtils serviría como un buen ejemplo de cómo hacerlo. Una cosa importante a tener en cuenta con este ejemplo es que la ingestión del flujo desde Twitter ocurre en un único hilo, lo que podría convertirse en un cuello de botella y en un único punto de fallo en un escenario de producción. Al mismo tiempo, consumir un flujo sin particionar es uno de esos problemas difíciles en la informática.

Las siguientes líneas de código crean el flujo de entrada, luego lo reparten de tres maneras y aplican una función de mapeo para que tratemos con cadenas y no con objetos de la API de Twitter. Como resultado, el flujo se tipeará como DStream[(Long, String)].

val stream = TwitterUtils.createStream(ssc, twitterAuth = None,
filters = Seq("#nba", "#nfl", "nba", "nfl"))
      .repartition(3)
      .map(tweet =>; (tweet.getId,
Converters.tweetToBase64(tweet)))

Los filtros en este caso nos limitan a los tweets relacionados con algunos términos deportivos. Puedes sustituir otros términos aquí o pasar un Seq vacío para recibir el flujo de datos completo.

Operaciones de flujos

Una vez que tengamos una referencia al flujo, podemos realizar operaciones sobre el mismo. Es importante hacer la distinción conceptual de lo que está sucediendo ahora en este código: si bien parece que todo reside dentro de una única clase (de hecho, un único archivo), estás escribiendo código que potencialmente puede enviarse y ejecutarse en muchos nodos.

Spark hace un buen trabajo al mantenerte al tanto de esto. Si alguna vez ves un error en tiempo de ejecución quejándose de una clase que no es Serializable, suele ser una indicación de que te has olvidado de marcar una clase intencionada como Serializable o (lo más probable) que hayas creado una instancia errónea de algo en el cierre incorrecto; intenta empujarlo hacia abajo un poco más. StackOverflow tiene una gran cantidad de información sobre este tema.

Para realizar operaciones concurrentes en nuestro flujo, lo descompondremos en instancias RDD constituyentes y procesaremos cada una de ellas individualmente en el método publishTweets().

stream.foreachRDD(publishTweets _)

Finalmente, damos el pistoletazo de salida iniciando el StreamingContext y diciéndole que se espere:

ssc.start()
ssc.awaitTermination()

Si ejecutas este código, deberías ver un mensaje del log que indique que Spark se está arrancando y procesando el flujo. Lo más importante es que debes verificar que se vea el mensaje del log de publishTweets() cada cinco segundos más o menos.

Operaciones del RDD

Antes hemos distribuido el flujo de entrada, de modo que pudiéramos procesar fragmentos en paralelo en este punto. Añade el siguiente código a publishTweets(), y luego ejecuta el código.

tweets.foreachPartition { partition =>
      logger.info(s"PARTITION SIZE=${partition.size}")
}

Querrá notar dos cosas. Primero, los mensajes PARTITION SIZE = X aparecen casi simultáneamente. En segundo lugar, y lo que es más interesante, es que todo se está ejecutando en diferentes subprocesos, lo que se indica por el preámbulo thread = XXX de los mensajes del log. Si estuvieras ejecutando esto en un clúster, esos mensajes probablemente saldrían no solo en diferentes hilos, sino también en máquinas completamente diferentes.

Ahora sustituye ese código con este:

tweets.foreachPartition { partition =>
      val output = KafkaWriter("kafka:9092", "test_topic")
      partition.foreach { record =>
           output.write(record._1.toString, record._2)
      }
      output.close()
}

¿Te has dado cuenta de cómo hemos instanciado cada instancia KafkaWriter dentro del cierre que funciona en la partición? Esto se hace para evitar los problemas de serialización que hemos mencionado anteriormente.

Si ejecutas este código, deberías ver una gran cantidad de salidaen el consumidor de la consola Kafka que dejaste en ejecución.

Conclusión

¡Eso es! Esperemos que, en este punto, te hayas familiarizado con las operaciones y comandos simples de Kafka e incluso hayas aprendido un poco sobre cómo los contenedores pueden facilitar el desarrollo. Luego has aprendido algunas técnicas simples para gestionar datos de streaming en Spark.

Continuando a partir de aquí, el siguiente paso sería familiarizarse con el uso de Spark para ingerir y procesar datos en batch (por ejemplo, desde HDFS) o continuar con Spark Streaming y aprender a ingerir datos desde Kafka. En otras publicaciones futuras avanzaré más sobre estos conceptos.

0 comentarios:

Publicar un comentario

Gracias por participar en esta página.

 
Back to top!