En este artículo voy a desarrollar una aplicación simple de streaming estructurado con el que espero fomentar tu curiosidad sobre este tipo de procesos. Esto te puede servir como base para continuar con la ruta de aprendizaje del procesamiento de streaming en Spark. Comenzando con un análisis batch del proceso, puedes empezar a conocer los datos y luego crear una versión en streaming del job. En el proceso, vas a apreciar lo cerca que están las APIs de batch y streaming estructurado, y también observar que algunas operaciones batch habituales ahora se aplican en un contexto de streaming.
Recursos online
Empezando el ejercicio
La secuencia de pasos que generalmente se requieren para crear un job de streaming estructurado:
- Inicializar Spark
- Fuentes (sources): adquisición de datos de streaming
- Declarar las operaciones que queremos aplicar a los datos de streaming.
- Receptores (sinks): salida de los datos resultantes
Para nuestro ejercicio de streaming, voy a usar un servidor TCP para simular un sistema web que entrega sus logs en tiempo real. El simulador utiliza un conjunto de datos alimentado mediante una conexión de socket TCP que incorpora el flujo de datos que vamos a analizar.
Conectarse a un stream
Streaming estructurado define los conceptos de fuentes (sources) y receptores (sinks) como las abstracciones clave para consumir un flujo y producir un resultado. Vamos a utilizar la implementación de TextSocketSource para conectarnos al servidor a través de un socket TCP. Las conexiones de socket están definidas por el host del servidor y el puerto donde está escuchando las conexiones. Estos dos elementos de configuración son necesarios para crear la fuente socket:
Observa cómo la creación de un stream es bastante similar a la declaración de una fuente de datos estática en el caso batch. En lugar de usar el generador read, usamos la construcción readStream y le pasamos los parámetros requeridos por la fuente de streaming. La API es básicamente la misma API de DataFrame y Dataset para datos estáticos pero con algunas modificaciones y limitaciones.
Preparar los datos del Stream
La fuente socket genera un DataFrame de streaming con una columna, value, que contiene los datos recibidos del flujo. Consulta la fuente Socket en la documentación de Spark.
Para transformar nuestros datos de texto plano en registros WebLog, primero necesitamos un esquema. El esquema proporciona la información estructurada necesaria para parsear el texto en un objeto JSON. Después de definir un esquema para nuestros datos, procedemos a crear un Dataset.
(1) Obtenemos un esquema a partir de la definición de la case class.
(2) Transformamos el valor textual a formato JSON utilizando el soporte JSON integrado en Spark SQL.
(3) Utilizamos la API Dataset para transformar los registros JSON en objetos WebLog.
Como resultado de este proceso, obtenemos un Dataset de streaming de registros WebLog.
Operaciones sobre el Dataset de Streaming
El webLogStream que acabamos de obtener es un Dataset de streaming de tipo Dataset[WebLog]. La diferencia entre esta instancia y la versión batch es que webLogStream es un conjunto de datos de streaming. Podemos observar esto consultando el objeto:
Vamos a crear la primera consulta sobre nuestros datos: ¿Cuántos registros hay en nuestro conjunto de datos? Esta es una pregunta que podemos responder fácilmente cuando tenemos acceso a todos los datos. Sin embargo, ¿Cómo contamos registros que llegan continuamente? La respuesta es que algunas operaciones que consideramos habituales en un conjunto de datos estático, como contar los registros, no tienen un significado definido en un conjunto de datos de streaming. Esto significa que las consultas directas que usamos en un Dataset o DataFrame estático ahora necesitan dos niveles de interacción. Primero, necesitamos declarar las transformaciones de nuestro flujo, y luego necesitamos iniciar el proceso de streaming.
Creación de una consulta
¿Cuáles son las URL populares? ¿En que ventana temporal? Ahora que tenemos acceso inmediato al flujo de logs web, no necesitamos esperar un día o un mes para tener un ranking de las URL populares. Podemos tener esa información a medida que se descubren las tendencias en períodos de tiempo mucho más cortos.
Primero, para definir el período de tiempo de nuestro interés, creamos una ventana sobre un timestamp. Una característica interesante del streaming estructurado es que podemos definir ese intervalo de tiempo en el timestamp en el que se generaron los datos, también conocido como tiempo del evento, en contraposición al momento en que se procesan los datos (tiempo de procesamiento).
Nuestra definición de ventana será de cinco minutos de datos de eventos. Dado que nuestra línea de tiempo es una simulación, los cinco minutos pueden suceder mucho más rápido o más lento que el tiempo del reloj. De esta manera, podemos apreciar claramente cómo el streaming estructurado usa la información del timestamp de los eventos para llevar un control de la línea de tiempo del evento.
Tenemos que extraer las URLs y seleccionar solo páginas de contenido, como .html, .htm o directorios.
Hemos convertido la petición para que contenga solo la URL visitada y hemos filtrado todas las páginas sin contenido. Ahora definimos la consulta en ventana para calcular las URLs que son tendencia principal:
Comenzar el procesamiento del flujo
Para iniciar un job de streaming estructurado, necesitamos especificar un receptor y un modo de salida. Estos son dos conceptos nuevos introducidos por el streaming estructurado:
- Un receptor (sink) define dónde queremos materializar los datos resultantes; por ejemplo, a un archivo del sistema de archivos, a una tabla en memoria o a otro sistema de streaming como Kafka.
- El modo de salida (output) define cómo queremos que se entreguen los resultados: ¿queremos ver todos los datos cada vez, solo las actualizaciones o solo los nuevos registros?
Estas opciones se dan a una operación writeStream. Crea la consulta de streaming que inicia el consumo del flujo, materializa los cálculos declarados en la consulta y genera el resultado en el receptor (sink) de salida.
Para nuestra consulta utilizamos el receptor de memoria (memory sink) y el modo de salida complete para tener una tabla totalmente actualizada cada vez que se añadan nuevos registros al resultado de llevar un control del ranking de URLs.
El receptor de memoria envía los datos a una tabla temporal con el mismo nombre dado en la opción queryName. Podemos observar esto consultando las tablas registradas en Spark SQL:
Exploración de datos
Dado que estamos acelerando la línea de tiempo del log en el lado del productor, después de unos segundos, podemos ejecutar el siguiente comando para ver el resultado de las primeras ventanas, como se ilustra en la siguiente figura.
Observa cómo el tiempo de procesamiento (unos segundos) está desacoplado del tiempo del evento (cientos de minutos de logs):
Referencias:
Guía de documentación oficial de programación Spark con Streaming Estructurado
Modelo de Programación de Streaming Estructurado (Oficial)
[Activity] Analyzing Apache Log files with Structured Streaming
How to analyze log data with Python and Apache Spark
How to wrangle log data with Python and Apache Spark
Spark: Web Server Logs Analysis with Scala
Buen aprendizaje !!! 😀😀
0 comentarios:
Publicar un comentario
Gracias por participar en esta página.