En los últimos años, Apache Kafka
y Apache Spark se han convertido en herramientas populares en la caja de
herramientas del arquitecto de datos, ya que están equipados para controlar una
amplia variedad de escenarios de ingestión de datos y se han utilizado con
éxito en entornos de misión crítica donde la demanda es alta.
En este tutorial, te guío a
través de algunos de los conceptos básicos del uso de Kafka y Spark para
ingerir datos. Aunque los ejemplos no funcionan a escala empresarial, se pueden
aplicar las mismas técnicas en entornos exigentes.
Requisitos Previos
Este es un tutorial práctico que
cualquier persona con experiencia en programación puede seguir. Si tus
habilidades de programación están oxidadas, o tienes una mentalidad técnica,
pero eres nuevo en programación, he hecho todo lo posible para que este
tutorial sea accesible. Aun así, hay algunos requisitos previos en términos de
conocimiento y herramientas.
Se usarán las siguientes
herramientas:
·
Lenguaje de programación Java
8 (Oracle JDK) y un entorno de ejecución (runtime) utilizado por Maven y
Scala.
·
Algún tipo de editor de código o IDE: en este
caso he utilizado la edición de comunidad de IntelliJ para
crear este tutorial.
·
Scala:
lenguaje de programación que usa el entorno de ejecución de Java. Todos los
ejemplos están escritos usando Scala 2.12. Nota: No necesitas descargar Scala.
Además, necesitarás una cuenta de desarrollador de Twitter.
Si tiene una cuenta normal de Twitter, puedes obtener las claves de la API
verificando tu cuenta a través de SMS.
Una nota sobre convenciones. A lo largo de este tutorial, verás
algunos comandos que comienzan con un indicador (un signo de dólar) y se
escriben en una fuente con un espacio simple. Estos están destinados a ser
comandos que se ejecutan en un terminal. Para hacer esto, simplemente copia el
comando excluyendo el indicador, pégalo en tu terminal, y luego presiona la
tecla de retorno.
Requisito Previo: comprobación de herramientas
Ejecuta los siguientes comandos y compara tu salida contra
lo que se espera.
Si alguno de estos comandos falla
con un error, sigue las instrucciones para instalarlos en tu sistema operativo.
Requisito Previo: crear y configurar una aplicación de Twitter
Para crear una aplicación de
Twitter, navega a https://apps.twitter.com/.
Presiona el botón marcado "Crear nueva aplicación". Estará en la
esquina superior derecha o en el medio de la ventana de tu navegador,
dependiendo de si ya has creado una aplicación de Twitter.
Se te pedirá que completes varios
campos, algunos de los cuales son obligatorios. Aunque el formulario indique
que se requiere un sitio web, puedes usar una dirección localhost.
Una vez que hayas creado la
aplicación, debes ser redirigido a la pantalla de configuración de la
aplicación. Debajo del nombre de la aplicación hay una fila de elementos de
menú. Haz clic en el que dice "Claves y Tokens de Acceso".
En la parte inferior de esta
página hay un botón marcado, "Crear mi token de acceso". Presiónalo.
Ahora debe haber una serie de campos en la ventana del navegador. Solo tienes
que preocuparte por cuatro de ellos:
1.
Clave del Consumidor (en la sección de Configuración
de la Aplicación)
2.
Secreto del Consumidor (en la sección de Configuración
de la Aplicación)
3.
Token de Acceso (en la sección de Tu Token de Acceso)
4.
Secreto del Token de Acceso (en la sección de Tu
Token de Acceso)
Puedes copiarlos en un archivo de
texto para usarlos más tarde o dejar abierta la ventana del navegador hasta más
adelante en el tutorial cuando necesites los valores.
Obtención y compilación del código
El código de este tutorial se puede encontrar en GitHub.
$ git clone
git@github.com:silicon-valley-data-science/ingest-spark-kafka.git
$ cd ingest-spark-kafka
$ mvn clean package
Hay dos archivos que serán
importantes para el resto de este tutorial. El primero se puede encontrar en:
ingest-spark-kafka/src/main/scala/com/svds/blogs/ingest_spark_kafka/TwitterIngestTutorial.scala
Contiene plantillas que se completarán
más adelante. El otro archivo a tener en cuenta es:
ingest-spark-kafka/src/main/scala/com/svds/blogs/ingest_spark_kafka/TwitterIngestFinal.scala
Contiene la versión final del
código con la que debes terminar si trabajas durante todo el tutorial.
Validar la configuración de Twitter
Lo primero que debes hacer es
asegurarte de tener un entorno adecuado que pueda conectarse a la API de
Twitter. Copia los cuatro valores de la configuración de la aplicación de
Twitter en sus respectivos lugares en ingest-spark-kafka/twitter-secrets.properties.
Luego, compila y ejecuta TwitterIngestTutorial.
Puedes ejecutarlo usando tu IDE o con maven. Para ejecutarlo con maven, ejecuta
el siguiente comando (demostración):
$ mvn package exec:java -Dexec.mainClass="com.svds.blogs.ingest_spark_kafka.TwitterIngestTutorial"
La salida debe
contener el texto "Todas las variables de twitter están presentes"
justo antes de la línea que dice "[INFO] BUILD SUCCESS".
Configurar un contenedor Kafka
Ahora que sabes que tu configuración de Twitter es correcta, pongamos en
marcha un contenedor de Kafka. Si has utilizado Docker anteriormente,
probablemente sea una buena idea cerrar todos tus contenedores Docker antes de
continuar, para evitar disputas por recursos.
IMPORTANTE: El cliente
Kafka es exigente con la garantía de que el DNS y las direcciones IP coincidan cuando
se conecta. Para conectarte, debes crear una entrada del archivo host que mapee
un host llamado "kafka" a la dirección IP "127.0.0.1"
(a.k.a. "localhost"). En entornos Linux/Unix, este archivo se
encuentra en /etc/hosts,
mientras que en máquinas Windows estará en %SystemRoot%\System32\drivers\etc\host.
Simplemente añade la siguiente línea:
127.0.0.1 kafka
Utilizaremos
un contenedor Kafka creado por Spotify, ya que viene cuidadosamente incorporado
con Zookeeper. Es una tecnología menos con la que tendrás que familiarizarte. Bájatelo
y arranca el contenedor de esta manera (demostración):
$ docker pull
spotify/kafka
$ docker run -p
2181:2181 -p 9092:9092 --hostname kafka --name test_kafka --env
ADVERTISED_PORT=9092 --env ADVERTISED_HOST=kafka spotify/kafka
Vamos
a analizar estos comandos. El primer comando es simple, simplemente descarga la
imagen del docker llamada "spotify/kafka" que se ha subido al hub de
Docker. El siguiente comando ejecuta esa imagen localmente.
run significa
que la imagen se ejecutará ahora. -p 2181: 2181 -p 9092: 9092 mapea dos puertos locales a dos puertos del contenedor (puerto
local a la izquierda, puerto del contenedor a la derecha). Piensa en esto de la
misma manera que haces un redireccionamiento o tunelado de puertos entre
clientes y servidores SSH para enrutar el tráfico de forma segura. --hostname kafka le dice al contenedor que su nombre de host será
kafka; no significa nada fuera del contenedor. --name test_kafka le da un nombre al contenedor. Esto será útil si arrancas
y detienes el contenedor (como lo harás momentáneamente). --env ADVERTISED_PORT = 9092 --env
ADVERTISED_HOST = kafka pasa las variables de entorno al entorno de
tiempo de ejecución del contenedor. Estos es lo mismo que si hubieras emitido
un comando export
FOO = 'bar' desde un terminal dentro
del contenedor. El parámetro final es el nombre de la imagen desde la que se
origina el contenedor.
Ejecución de algunos comandos Kafka
Luego detenemos el contenedor y
lo reiniciamos en segundo plano. Presiona "CTRL + C" para detener el
contenedor. Debe registrar algo sobre la espera a que mueran ZooKeeper y Kafka
(¡los procesos!). Reinicia el contenedor con este comando:
$ docker start test_kafka
Debería ejecutarse rápidamente.
Ahora podemos conectarnos al contenedor y familiarizarnos con algunos comandos
de Kafka. Inicia sesión en el contenedor de esta manera:
$ docker exec -it test_kafka /bin/bash
Esto invoca al cliente Docker y
le dice que quieres conectar un TTY interactivo al contenedor llamado
test_kafka y arrancar un shell bash. Sabrás que estás dentro del contenedor si
el indicador cambia a algo que se ve así:
root@kafka:/#
Lo primero que haremos será crear
el topic de Kafka. Un topic de Kafka es una forma de agrupar datos en una única
aplicación. Otros sistemas de mensajes llaman a esto una "cola"; es
lo mismo. Desde el terminal del contenedor que acabas de iniciar, ejecuta este
comando (¡recuerda eliminar el indicador!):
root@kafka:/#
/opt/kafka_2.11-0.10.1.0/bin/kafka-topics.sh
--zookeeper kafka:2181
--create
--topic test_topic --partitions 3
--replication-factor
1
Están sucediendo muchas cosas
aquí. Vamos a desarrollar este comando. kafka-topics.sh es script
que encapsula un proceso Java que actúa como cliente de un punto final del
cliente Kafka que trata los topics. --zookeeper kafka: 2181 le
dice al cliente dónde encontrar ZooKeeper. Kafka utiliza ZooKeeper como un
servicio de directorio para realizar un seguimiento del estado de los miembros
del clúster Kafka. ZooKeeper también tiene roles en las operaciones de gestión
interna del clúster (elección del líder, sincronización, etc.). El cliente
consulta la información del clúster a ZooKeeper, de modo que pueda contactarse
directamente con los nodos Kafka. --create indica una
operación particular que creará un topic. --topic asigna un nombre
al topic. --partitions
3 indica en cuántas particiones se divide este topic. Las
particiones entran en juego cuando queremos lograr un mayor rendimiento. La
mejor información que he visto sobre cómo elegir el número de particiones es
una publicación
de blog del colaborador de Kafka Jun Rao. Elegimos tres aquí porque es más
de uno. --replication-factor
1 describe la cantidad de copias
redundantes de los datos. En nuestro caso, ese valor es solo "1", por
lo que no hay redundancia en absoluto, aunque es de esperar que esto ocurra con
un clúster que solo tenga un nodo.
Puedes comprobar que se ha creado
el topic cambiando el comando a --list:
root@kafka:/#
/opt/kafka_2.11-0.10.1.0/bin/kafka-topics.sh
--zookeeper kafka:2181
--list
Ahora que tenemos un topic, podemos
enviarle algunos mensajes. Eso implica un script diferente de Kafka, el
productor de la consola.
root@kafka:/#
/opt/kafka_2.11-0.10.1.0/bin/kafka-console-producer.sh
--topic test_topic --broker-list kafka:9092
--broker-list kafka:9092 es análogo a especificar los hosts de ZooKeeper, pero
en su lugar especifica un miembro del clúster Kafka al que contactar
directamente. (No tengo idea de por qué kafka-topics.sh
no soporta esto).
Se puede pensar que este comando está suspendido, pero en realidad está en un
bucle esperando a que enviemos algunos mensajes al topic. Para ello, escribe un
mensaje y presiona la tecla de retorno. Continúa enviando algunos mensajes al topic.
Cuando hayas terminado, presiona CTRL-C.
Ahora podemos reproducir estos mensajes utilizando el consumidor de la
consola. Usa este comando:
root@kafka:/#
/opt/kafka_2.11-0.10.1.0/bin/kafka-console-consumer.sh
--topic test_topic --bootstrap-server kafka:9092
--from-beginning
Tarda unos segundos en arrancar. Después de eso, deberías ver tantos mensajes
como hayamos producido anteriormente en la salida. CTRL-C nos sacará de esta
aplicación. Si lo ejecutas nuevamente, deberías ver el mismo resultado. Esto se
debe a que –from-beginning le dice a Kafka que quieres comenzar a leer el topic
desde el principio. Si omites ese argumento, el consumidor solo leerá nuevos
mensajes. Por detrás, Kafka hará un seguimiento del offset del topic de los
consumidores en ZooKeeper (si se usan grupos), o puedes hacerlo tú mismo. Puedes
experimentar con esto por tu cuenta ejecutando el consumidor de la consola y el
productor de la consola al mismo tiempo en diferentes terminales.
Si detienes el consumidor, por
favor arráncalo de nuevo. Lo usaremos más adelante para validar que estamos
enviando mensajes de Twitter a Kafka.
Creación de un cliente Kafka
Volvamos a editar TwitterIngestTutorial nuevamente. Contiene una plantilla de clase trozeada
llamada KafkaWriter.
Este paso la completará para que podamos enviar mensajes a Kafka.
Primero, añadimos algunas
propiedades de configuración a la variable config. Añade las siguientes líneas después del
comentario que dice "añadir configuración aquí".
// add configuration
settings here.
put("bootstrap.servers",
brokers)
put("topic",
topic)
put("key.serializer",
classOf[StringSerializer])
put("value.serializer",
classOf[StringSerializer])
Reconocerás bootstrap.servers del comando del consumidor de la consola que acabas
de usar. Es lo mismo, excepto que, en este caso, el valor se suministra a
partir de una cadena del constructor. El topic debe ser
autoexplicativo en este punto. Los últimos dos valores, key.serializer y value.serializer indican
al cliente cómo ordenar los datos que se envían a Kafka. En este caso, hemos
indicado que se esperan cadenas.
Luego modificamos el método write()
para enviar los datos a Kafka. Antes del método write() puedes
ver que se crea una instancia de KafkaProducer. El método write() usa
este productor para enviar datos a Kafka. Primero creamos un ProducerRecord,
y luego usamos el producer para enviarlos con send().
val record = new ProducerRecord[String,
String](this.topic, key, data)
producer.send(record).get(5,
TimeUnit.SECONDS)
Como se puede ver, la instancia record está
parametrizada para que coincida con los tipos esperados por los serializadores
descritos en las configuraciones key.serializer y value.serializer.
Dado que producer.send() devuelve una instancia java.util.concurrent.Future,
llamamos a su método get() y bloqueamos
hasta que vuelva.
Este es un ejemplo de un cliente síncrono.
Los clientes síncronos son más fáciles de escribir, pero a menudo no funcionan
bien en configuraciones altamente concurrentes (multiproceso). Este cliente
podría modificarse para que sea asíncrono
introduciendo una cola y un pool de ejecutores en el KafkaWriter. Esto se deja como un
ejercicio para el lector.
El último paso para el cliente Kafka es finalizar el método close() haciendo que se llame a producer.close().
Inicialización de Spark
Hay dos pasos para inicializar Spark para el streaming. Primero se crea una
instancia SparkConf,
y luego se configura un StreamingContext. Coloca este código después de la
comprobación de la validación de Twitter:
val conf = new SparkConf()
.setMaster("local[4]")
.setAppName("Twitter2Kafka")
val ssc = new StreamingContext(conf, Seconds(5))
En un escenario de producción, muchos de los valores de configuración de spark
provienen del entorno, frente a la especificación aquí en el código. local[4]
le dice a Spark que use cuatro ejecutores para el paralelismo. El parámetro Seconds del
constructor StreamingContext indica que nuestros "microbatches" tendrán
cinco segundos de ancho.
Flujo de entrada
Ahora vamos a crear un flujo de
entrada para procesarlo. El objeto TwitterUtils abstrae
la API de Twitter y nos da una buena interfaz DStream para
los datos. Básicamente, sondea la API de Twitter para detectar nuevos eventos y
realiza un seguimiento de los eventos que ya se han procesado.
Si tu trabajo fuera crear una
interfaz de flujo en una API heredada de tu empresa, la clase TwitterUtils serviría como un buen ejemplo de cómo hacerlo. Una
cosa importante a tener en cuenta con este ejemplo es que la ingestión del
flujo desde Twitter ocurre en un único hilo, lo que podría convertirse en un
cuello de botella y en un único punto de fallo en un escenario de producción.
Al mismo tiempo, consumir un flujo sin particionar es uno de esos problemas
difíciles en la informática.
Las siguientes líneas de código
crean el flujo de entrada, luego lo reparten de tres maneras y aplican una
función de mapeo para que tratemos con cadenas y no con objetos de la API de
Twitter. Como resultado, el flujo se tipeará como DStream[(Long, String)].
val stream = TwitterUtils.createStream(ssc,
twitterAuth = None,
filters = Seq("#nba", "#nfl",
"nba", "nfl"))
.repartition(3)
.map(tweet =>; (tweet.getId,
Converters.tweetToBase64(tweet)))
Los filtros en este caso nos
limitan a los tweets relacionados con algunos términos deportivos. Puedes
sustituir otros términos aquí o pasar un Seq vacío para recibir el flujo de
datos completo.
Operaciones de flujos
Una vez que tengamos una
referencia al flujo, podemos realizar operaciones sobre el mismo. Es importante
hacer la distinción conceptual de lo que está sucediendo ahora en este código:
si bien parece que todo reside dentro de una única clase (de hecho, un único
archivo), estás escribiendo código que potencialmente puede enviarse y
ejecutarse en muchos nodos.
Spark hace un buen trabajo al
mantenerte al tanto de esto. Si alguna vez ves un error en tiempo de ejecución
quejándose de una clase que no es Serializable, suele ser una indicación de que te
has olvidado de marcar una clase intencionada como Serializable
o (lo más probable) que hayas creado una instancia errónea de algo en el
cierre incorrecto; intenta empujarlo hacia abajo un poco más. StackOverflow
tiene una gran
cantidad de información sobre este tema.
Para realizar operaciones
concurrentes en nuestro flujo, lo descompondremos en instancias RDD
constituyentes y procesaremos cada una de ellas individualmente en el método publishTweets().
stream.foreachRDD(publishTweets _)
Finalmente, damos el pistoletazo
de salida iniciando el StreamingContext y
diciéndole que se espere:
ssc.start()
ssc.awaitTermination()
Si ejecutas este código, deberías
ver un mensaje del log que indique que Spark se está arrancando y procesando el
flujo. Lo más importante es que debes verificar que se vea el mensaje del log
de publishTweets() cada cinco segundos más o menos.
Operaciones del RDD
Antes hemos distribuido el flujo
de entrada, de modo que pudiéramos procesar fragmentos en paralelo en este
punto. Añade el siguiente código a publishTweets(), y luego ejecuta el código.
tweets.foreachPartition
{ partition =>
logger.info(s"PARTITION
SIZE=${partition.size}")
}
Querrá notar dos cosas. Primero,
los mensajes PARTITION
SIZE = X aparecen casi simultáneamente. En segundo lugar, y lo que
es más interesante, es que todo se está ejecutando en diferentes subprocesos, lo
que se indica por el preámbulo thread = XXX de los
mensajes del log. Si estuvieras ejecutando esto en un clúster, esos mensajes
probablemente saldrían no solo en diferentes hilos, sino también en máquinas
completamente diferentes.
Ahora sustituye ese código con
este:
tweets.foreachPartition
{ partition =>
val output = KafkaWriter("kafka:9092",
"test_topic")
partition.foreach { record =>
output.write(record._1.toString,
record._2)
}
output.close()
}
¿Te has dado cuenta de cómo hemos
instanciado cada instancia KafkaWriter dentro del cierre que funciona en la
partición? Esto se hace para evitar los problemas de serialización que hemos
mencionado anteriormente.
Si ejecutas este código, deberías
ver una gran cantidad de salidaen el consumidor de la consola Kafka que
dejaste en ejecución.
Conclusión
¡Eso es! Esperemos que, en este
punto, te hayas familiarizado con las operaciones y comandos simples de Kafka e
incluso hayas aprendido un poco sobre cómo los contenedores pueden facilitar el
desarrollo. Luego has aprendido algunas técnicas simples para gestionar datos
de streaming en Spark.
Continuando a partir de aquí, el
siguiente paso sería familiarizarse con el uso de Spark para ingerir y procesar
datos en batch (por ejemplo, desde HDFS) o continuar con Spark Streaming y
aprender a ingerir datos desde Kafka. En otras publicaciones futuras avanzaré más
sobre estos conceptos.
0 comentarios:
Publicar un comentario
Gracias por participar en esta página.