Profundice en la arquitectura e interior de Spark

Apache Spark es un marco de trabajo de computación en clúster de uso general distribuido de código abierto. Una aplicación Spark es un proceso de JVM que ejecuta un código de usuario utilizando Spark como una biblioteca de terceros.

Como parte de este blog, mostraré la forma en que Spark trabaja en la arquitectura de Yarn con un ejemplo y los diversos procesos subyacentes que están involucrados, tales como:

  • Contexto de Spark
  • Yarn Resource Manager, Application Master y lanzamiento de ejecutores (contenedores).
  • Configuración de variables de entorno, recursos laborales.
  • CoarseGrainedExecutorBackend y RPC basado en Netty.
  • SparkListeners.
  • Ejecución de un trabajo (Plan lógico, Plan físico).
  • Spark-WebUI.

Contexto de Spark

El contexto de Spark es el primer nivel de punto de entrada y el corazón de cualquier aplicación de Spark. Spark-shell no es más que un REPL basado en Scala con binarios de chispa que crearán un objeto sc llamado contexto de chispa.

Podemos lanzar el Spark Shell como se muestra a continuación:

spark-shell --master yarn \ --conf spark.ui.port=12345 \ --num-executors 3 \ --executor-cores 2 \ --executor-memory 500M

Como parte del Spark-Shell, hemos mencionado los ejecutores num. Indican la cantidad de nodos trabajadores que se utilizarán y la cantidad de núcleos para cada uno de estos nodos trabajadores para ejecutar tareas en paralelo.

O puede iniciar Spark Shell usando la configuración predeterminada.

spark-shell --master yarn

Las configuraciones están presentes como parte de spark-env.sh

Nuestro programa Driver se ejecuta en el nodo Gateway, que no es más que una chispa. Creará un contexto de chispa y lanzará una aplicación.

Se puede acceder al objeto de contexto de chispa usando sc.

Una vez creado el contexto Spark, espera los recursos. Una vez que los recursos están disponibles, el contexto de Spark configura los servicios internos y establece una conexión con un entorno de ejecución de Spark.

Yarn Resource Manager, Application Master y lanzamiento de ejecutores (contenedores).

Una vez que se crea el contexto de Spark, comprobará con el Administrador de clústeres y lanzará el Application Master , es decir, lanzará un contenedor y registrará los controladores de señales .

Una vez que se inicia el Application Master, se establece una conexión con el controlador.

A continuación, ApplicationMasterEndPoint activa una aplicación proxy para conectarse al administrador de recursos.

Ahora, el contenedor de hilo realizará las siguientes operaciones como se muestra en el diagrama.

ii) YarnRMClient se registrará con el Application Master.

iii) YarnAllocator: solicitará 3 contenedores ejecutores, cada uno con 2 núcleos y 884 MB de memoria, incluidos 384 MB de sobrecarga

iv) AM inicia el hilo de reportero

Ahora, el Yarn Allocator recibe tokens del Driver para lanzar los nodos Executor e iniciar los contenedores.

Configuración de variables de entorno, recursos de trabajo y lanzamiento de contenedores.

Cada vez que se lanza un contenedor, hace las siguientes 3 cosas en cada uno de ellos.

  • Configurar variables env

Spark Runtime Environment (SparkEnv) es el entorno de ejecución con los servicios de Spark que se utilizan para interactuar entre sí a fin de establecer una plataforma informática distribuida para una aplicación Spark.

  • Configurar recursos laborales
  • Contenedor de lanzamiento

El contexto de lanzamiento del ejecutor de YARN asigna a cada ejecutor un id de ejecutor para identificar al ejecutor correspondiente (a través de Spark WebUI) e inicia un CoarseGrainedExecutorBackend.

CoarseGrainedExecutorBackend y RPC basado en Netty.

Luego de obtener recursos de Resource Manager, veremos al ejecutor iniciando

CoarseGrainedExecutorBackend es un ExecutorBackend que controla el ciclo de vida de un solo ejecutor. Envía el estado del ejecutor al conductor.

Cuando se inicia ExecutorRunnable, CoarseGrainedExecutorBackend registra el extremo de Executor RPC y los manejadores de señales para comunicarse con el controlador (es decir, con el extremo de CoarseGrainedScheduler RPC) e informar que está listo para iniciar tareas.

RPC basado en Netty: se utiliza para comunicarse entre nodos trabajadores, contexto de chispa, ejecutores.

NettyRPCEndPoint se utiliza para rastrear el estado de resultado del nodo trabajador.

RpcEndpointAddress es la dirección lógica para un punto final registrado en un entorno RPC, con RpcAddress y nombre.

Tiene el formato que se muestra a continuación:

Este es el primer momento en que CoarseGrainedExecutorBackend inicia la comunicación con el controlador disponible en driverUrl a través de RpcEnv.

SparkListeners

SparkListener (escucha del programador) es una clase que escucha los eventos de ejecución del DAGScheduler de Spark y registra toda la información del evento de una aplicación, como el ejecutor, los detalles de la asignación del controlador junto con los trabajos, etapas y tareas, y otros cambios de propiedades del entorno.

SparkContext inicia el LiveListenerBus que reside dentro del controlador. Registra JobProgressListener con LiveListenerBus, que recopila todos los datos para mostrar las estadísticas en la interfaz de usuario de Spark.

De forma predeterminada, solo el oyente para WebUI estaría habilitado, pero si queremos agregar otros oyentes, podemos usar spark.extraListeners.

Spark viene con dos oyentes que muestran la mayoría de las actividades.

i) StatsReportListener

ii) EventLoggingListener

EventLoggingListener:Si desea analizar más el rendimiento de sus aplicaciones más allá de lo que está disponible como parte del servidor de historial de Spark, puede procesar los datos del registro de eventos. Spark Event Log registra información sobre trabajos / etapas / tareas procesados. Se puede habilitar como se muestra a continuación ...

El archivo de registro de eventos se puede leer como se muestra a continuación

  • El controlador Spark inicia sesión en la carga de trabajo del trabajo / métricas de rendimiento en el directorio spark.evenLog.dir como archivos JSON.
  • Hay un archivo por aplicación, los nombres de archivo contienen el ID de la aplicación (por lo tanto, incluye una marca de tiempo) application_1540458187951_38909.

Muestra el tipo de eventos y el número de entradas para cada uno.

Ahora, agreguemos StatsReportListener a spark.extraListenersy comprobar el estado del trabajo.

Habilite el nivel de registro INFO para el registrador org.apache.spark.scheduler.StatsReportListener para ver los eventos de Spark.

Para habilitar el oyente, regístrelo en SparkContext. Se puede realizar de dos formas.

i) Usando el método SparkContext.addSparkListener (listener: SparkListener) dentro de su aplicación Spark.

Haga clic en el enlace para implementar oyentes personalizados - CustomListener

ii) Usando la opción de línea de comandos conf

Leamos un archivo de muestra y realicemos una operación de conteo para ver StatsReportListener.

Ejecución de un trabajo (Plan lógico, Plan físico).

En Spark, RDD ( conjunto de datos distribuidos resilientes ) es el primer nivel de la capa de abstracción. Es una colección de elementos divididos en los nodos del clúster que se pueden operar en paralelo. Los RDD se pueden crear de 2 formas.

i) Paralelizar una colección existente en su programa de controladores

ii) Hacer referencia a un conjunto de datos en un sistema de almacenamiento externo

Los RDD se crean utilizando un archivo en el sistema de archivos Hadoop o una colección Scala existente en el programa del controlador y transformándola.

Tomemos un fragmento de muestra como se muestra a continuación

La ejecución del fragmento anterior se lleva a cabo en 2 fases.

6.1 Plan lógico: en esta fase, se crea un RDD utilizando un conjunto de transformaciones. Realiza un seguimiento de esas transformaciones en el programa controlador mediante la construcción de una cadena de computación (una serie de RDD) como un gráfico de transformaciones para producir un RDD llamado Gráfico de linaje .

Las transformaciones se pueden dividir en 2 tipos

  • Transformación estrecha: una canalización de operaciones que se puede ejecutar como una etapa y no requiere que los datos se mezclen en las particiones, por ejemplo, mapa, filtro, etc.

Ahora los datos se leerán en el controlador usando la variable de transmisión.

  • Transformación amplia: aquí cada operación requiere que los datos se mezclen, de ahora en adelante, para cada transformación amplia se creará una nueva etapa, por ejemplo, reduceByKey, etc.

Podemos ver el gráfico de linaje usando toDebugString

6.2 Plano físico:En esta fase, una vez que activamos una acción en el RDD, The DAG Scheduler analiza el linaje de RDD y presenta el mejor plan de ejecución con etapas y tareas junto con TaskSchedulerImpl y ejecuta el trabajo en un conjunto de tareas en paralelo.

Una vez que realizamos una operación de acción, SparkContext desencadena un trabajo y registra el RDD hasta la primera etapa (es decir, antes de cualquier transformación amplia) como parte del DAGScheduler.

Ahora, antes de pasar a la siguiente etapa (transformaciones amplias), verificará si hay datos de partición que se deben barajar y si faltan resultados de operaciones principales de los que dependa, si falta alguna de esas etapas, entonces volverá a ejecuta esa parte de la operación haciendo uso del DAG (Gráfico acíclico dirigido) que lo hace tolerante a fallas.

En caso de faltar tareas, asigna tareas a los ejecutores.

Cada tarea se asigna a CoarseGrainedExecutorBackend del ejecutor.

Obtiene la información del bloque del Namenode.

ahora, realiza el cálculo y devuelve el resultado.

A continuación, el DAGScheduler busca las etapas recién ejecutadas y activa la operación de la siguiente etapa (reduceByKey).

ShuffleBlockFetcherIterator consigue que los bloques se barajen.

Ahora la operación de reducción se divide en 2 tareas y se ejecuta.

Al completar cada tarea, el ejecutor devuelve el resultado al controlador.

Una vez finalizado el trabajo, se muestra el resultado.

Spark-WebUI

Spark-UI ayuda a comprender el flujo de ejecución del código y el tiempo necesario para completar un trabajo en particular. La visualización ayuda a descubrir cualquier problema subyacente que tenga lugar durante la ejecución y a optimizar aún más la aplicación de Spark.

Veremos la visualización Spark-UI como parte del paso 6 anterior .

Una vez que se completa el trabajo, puede ver los detalles del trabajo, como el número de etapas, el número de tareas que se programaron durante la ejecución del trabajo de un trabajo.

Al hacer clic en los trabajos completados, podemos ver la visualización DAG, es decir, las diferentes transformaciones anchas y estrechas como parte de ella.

Puede ver el tiempo de ejecución que lleva cada etapa.

Al hacer clic en una etapa en particular como parte del trabajo, se mostrarán los detalles completos sobre dónde residen los bloques de datos, tamaño de los datos, el ejecutor utilizado, la memoria utilizada y el tiempo necesario para completar una tarea en particular. También muestra el número de cambios que se realizan.

Además, podemos hacer clic en la pestaña Ejecutores para ver el Ejecutor y el controlador utilizados.

Ahora que hemos visto cómo funciona Spark internamente, puede determinar el flujo de ejecución haciendo uso de la interfaz de usuario de Spark, los registros y ajustando Spark EventListeners para determinar la solución óptima en el envío de un trabajo de Spark.

Nota : Los comandos que se ejecutaron relacionados con esta publicación se agregan como parte de mi cuenta GIT.

Del mismo modo, también puede leer más aquí:

  • Arquitectura de Sqoop en profundidad con código.
  • Arquitectura HDFS en profundidad con código .
  • Arquitectura de Hive en profundidad con código .

Si también lo desea, puede conectarse conmigo en LinkedIn: Jayvardhan Reddy.

Si le gustó leerlo, puede hacer clic en el aplauso y dejar que los demás sepan. Si desea que agregue algo más, no dude en dejar una respuesta.