Cómo hacer una aplicación sencilla con Akka Cluster

Si lees mi historia anterior sobre Scalachain, probablemente hayas notado que está lejos de ser un sistema distribuido. Carece de todas las características para funcionar correctamente con otros nodos. Agregue a esto que una cadena de bloques compuesta por un solo nodo es inútil. Por eso decidí que era hora de trabajar en el tema.

Dado que Scalachain funciona con Akka, ¿por qué no aprovechar la oportunidad de jugar con Akka Cluster? Creé un proyecto simple para jugar un poco con Akka Cluster, y en esta historia voy a compartir mis aprendizajes. Vamos a crear un clúster de tres nodos, utilizando Cluster Aware Routers para equilibrar la carga entre ellos. Todo se ejecutará en un contenedor Docker y usaremos docker-compose para una implementación sencilla.

Ok, ¡vamos a rodar! ?

Introducción rápida a Akka Cluster

Akka Cluster proporciona un gran soporte para la creación de aplicaciones distribuidas. El mejor caso de uso es cuando tiene un nodo que desea replicar N veces en un entorno distribuido. Esto significa que todos los N nodos son pares que ejecutan el mismo código. Akka Cluster le ofrece de inmediato el descubrimiento de miembros en el mismo clúster. Utilizando Cluster Aware Routers es posible equilibrar los mensajes entre actores en diferentes nodos. También es posible elegir la política de equilibrio, ¡haciendo que el equilibrio de carga sea pan comido!

En realidad, puede elegir entre dos tipos de enrutadores:

Enrutador de grupo : los actores a los que se enviarán los mensajes, llamados rutas, se especifican utilizando su ruta de actor. Los enrutadores comparten las rutas creadas en el clúster. Usaremos un enrutador de grupo en este ejemplo.

Enrutador de grupo : el enrutador crea e implementa las rutas, por lo que son sus hijos en la jerarquía de actores. Las rutas no se comparten entre enrutadores. Esto es ideal para un escenario de réplica principal, donde cada enrutador es el principal y sus rutas enrutan las réplicas.

Esto es solo la punta del iceberg, por lo que los invito a leer la documentación oficial para obtener más información.

Un clúster para cálculos matemáticos

Imaginemos un escenario de caso de uso. Suponga diseñar un sistema para ejecutar cálculos matemáticos a pedido. El sistema se implementa en línea, por lo que necesita una API REST para recibir las solicitudes de cálculo. Un procesador interno maneja estas solicitudes, ejecuta el cálculo y devuelve el resultado.

En este momento, el procesador solo puede calcular el número de Fibonacci. Decidimos utilizar un clúster de nodos para distribuir la carga entre los nodos y mejorar el rendimiento. Akka Cluster se encargará de la dinámica del clúster y el equilibrio de carga entre los nodos. ¡OK suena bien!

Jerarquía de actores

Lo primero es lo primero: necesitamos definir nuestra jerarquía de actores. El sistema se puede dividir en tres partes funcionales: la lógica empresarial , la gestión del clúster y el nodo en sí. También está el servidor, pero no es un actor, y trabajaremos en eso más adelante.

Lógica de negocios

La aplicación debe realizar cálculos matemáticos. Podemos definir un Processoractor simple para administrar todas las tareas computacionales. Cada cálculo que apoyamos se puede implementar en un actor específico, que será un hijo de Processoruno. De esta forma la aplicación es modular y más fácil de ampliar y mantener. Ahora mismo, el único hijo de ProcessorWill será el ProcessorFibonacciactor. Supongo que puedes adivinar cuál es su tarea. Esto debería ser suficiente para empezar.

Gestión de clústeres

Para administrar el clúster necesitamos un ClusterManager. Suena simple, ¿verdad? Este actor maneja todo lo relacionado con el clúster, como devolver a sus miembros cuando se le pregunta. Sería útil registrar lo que sucede dentro del clúster, por lo que definimos un ClusterListeneractor. Este es un elemento secundario de ClusterManagery se suscribe a los eventos del clúster que los registra.

Nodo

El Nodeactor es la raíz de nuestra jerarquía. Es el punto de entrada de nuestro sistema que se comunica con la API. El Processory el ClusterManagerson sus hijos, junto con el ProcessorRouteractor. Este es el equilibrador de carga del sistema, distribuyendo la carga entre Processors. Lo configuraremos como un enrutador compatible con clústeres, para que todos ProcessorRouterpuedan enviar mensajes a Processorlos correos electrónicos de todos los nodos.

Implementación del actor

¡Es hora de implementar a nuestros actores! Primero implementamos los actores relacionados con la lógica empresarial del sistema. Luego pasamos a los actores para la gestión del clúster y al actor raíz ( Node) al final.

Procesador Fibonacci

Este actor ejecuta el cálculo del número de Fibonacci. Recibe un Computemensaje que contiene el número a calcular y la referencia del actor al que responder. La referencia es importante, ya que puede haber diferentes actores solicitantes. ¡Recuerde que estamos trabajando en un entorno distribuido!

Una vez Computeque se recibe el mensaje, la fibonaccifunción calcula el resultado. Lo envolvemos en un ProcessorResponseobjeto para proporcionar información sobre el nodo que ejecutó el cálculo. Esto será útil más adelante para ver la política de turnos en acción.

A continuación, el resultado se envía al actor al que debemos responder. Pan comido.

object ProcessorFibonacci { sealed trait ProcessorFibonacciMessage case class Compute(n: Int, replyTo: ActorRef) extends ProcessorFibonacciMessage def props(nodeId: String) = Props(new ProcessorFibonacci(nodeId)) def fibonacci(x: Int): BigInt = { @tailrec def fibHelper(x: Int, prev: BigInt = 0, next: BigInt = 1): BigInt = x match { case 0 => prev case 1 => next case _ => fibHelper(x - 1, next, next + prev) } fibHelper(x) } } class ProcessorFibonacci(nodeId: String) extends Actor { import ProcessorFibonacci._ override def receive: Receive = { case Compute(value, replyTo) => { replyTo ! ProcessorResponse(nodeId, fibonacci(value)) } } }

Procesador

El Processoractor gestiona los subprocesadores específicos, como el de Fibonacci. Debería crear una instancia de los subprocesadores y enviarles las solicitudes. En este momento sólo tenemos un sub-procesador, por lo que el Processorrecibe un tipo de mensaje: ComputeFibonacci. Este mensaje contiene el número de Fibonacci para calcular. Una vez recibido, el número a calcular se envía a a FibonacciProcessor, junto con la referencia del sender().

object Processor { sealed trait ProcessorMessage case class ComputeFibonacci(n: Int) extends ProcessorMessage def props(nodeId: String) = Props(new Processor(nodeId)) } class Processor(nodeId: String) extends Actor { import Processor._ val fibonacciProcessor: ActorRef = context.actorOf(ProcessorFibonacci.props(nodeId), "fibonacci") override def receive: Receive = { case ComputeFibonacci(value) => { val replyTo = sender() fibonacciProcessor ! Compute(value, replyTo) } } }

ClusterListener

Nos gustaría registrar información útil sobre lo que sucede en el clúster. Esto podría ayudarnos a depurar el sistema si es necesario. Este es el propósito del ClusterListeneractor. Antes de comenzar, se suscribe a los mensajes de eventos del clúster. Los reacciona actor para mensajes como MemberUp, UnreachableMember, o MemberRemoved, registrar el evento correspondiente. Cuando ClusterListenerse detiene, se da de baja de los eventos del clúster.

object ClusterListener { def props(nodeId: String, cluster: Cluster) = Props(new ClusterListener(nodeId, cluster)) } class ClusterListener(nodeId: String, cluster: Cluster) extends Actor with ActorLogging { override def preStart(): Unit = { cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberEvent], classOf[UnreachableMember]) } override def postStop(): Unit = cluster.unsubscribe(self) def receive = { case MemberUp(member) => log.info("Node {} - Member is Up: {}", nodeId, member.address) case UnreachableMember(member) => log.info(s"Node {} - Member detected as unreachable: {}", nodeId, member) case MemberRemoved(member, previousStatus) => log.info(s"Node {} - Member is Removed: {} after {}", nodeId, member.address, previousStatus) case _: MemberEvent => // ignore } }

Cluster Manager

El actor responsable de la gestión del clúster es ClusterManager. Crea el ClusterListeneractor y proporciona la lista de miembros del clúster a pedido. Podría ampliarse para añadir más funcionalidades, pero ahora mismo es suficiente.

object ClusterManager { sealed trait ClusterMessage case object GetMembers extends ClusterMessage def props(nodeId: String) = Props(new ClusterManager(nodeId)) } class ClusterManager(nodeId: String) extends Actor with ActorLogging { val cluster: Cluster = Cluster(context.system) val listener: ActorRef = context.actorOf(ClusterListener.props(nodeId, cluster), "clusterListener") override def receive: Receive = { case GetMembers => { sender() ! cluster.state.members.filter(_.status == MemberStatus.up) .map(_.address.toString) .toList } } }

Procesador Enrutador

The load-balancing among processors is handled by the ProcessorRouter. It is created by the Node actor, but this time all the required information are provided in the configuration of the system.

class Node(nodeId: String) extends Actor { //... val processorRouter: ActorRef = context.actorOf(FromConfig.props(Props.empty), "processorRouter") //... }

Let’s analyse the relevant part in the application.conf file.

akka { actor { ... deployment { /node/processorRouter { router = round-robin-group routees.paths = ["/user/node/processor"] cluster { enabled = on allow-local-routees = on } } } } ... }

The first thing is to specify the path to the router actor, that is /node/processorRouter. Inside that property we can configure the behaviour of the router:

  • router: this is the policy for the load balancing of messages. I chose the round-robin-group, but there are many others.
  • routees.paths: these are the paths to the actors that will receive the messages handled by the router. We are saying: “When you receive a message, look for the actors corresponding to these paths. Choose one according to the policy and forward the message to it.” Since we are using Cluster Aware Routers, the routees can be on any node of the cluster.
  • cluster.enabled: are we operating in a cluster? The answer is on, of course!
  • cluster.allow-local-routees: here we are allowing the router to choose a routee in its node.

Using this configuration we can create a router to load balance the work among our processors.

Node

The root of our actor hierarchy is the Node. It creates the children actors — ClusterManager, Processor, and ProcessorRouter — and forwards the messages to the right one. Nothing complex here.

object Node { sealed trait NodeMessage case class GetFibonacci(n: Int) case object GetClusterMembers def props(nodeId: String) = Props(new Node(nodeId)) } class Node(nodeId: String) extends Actor { val processor: ActorRef = context.actorOf(Processor.props(nodeId), "processor") val processorRouter: ActorRef = context.actorOf(FromConfig.props(Props.empty), "processorRouter") val clusterManager: ActorRef = context.actorOf(ClusterManager.props(nodeId), "clusterManager") override def receive: Receive = { case GetClusterMembers => clusterManager forward GetMembers case GetFibonacci(value) => processorRouter forward ComputeFibonacci(value) } }

Server and API

Every node of our cluster runs a server able to receive requests. The Server creates our actor system and is configured through the application.conf file.

object Server extends App with NodeRoutes { implicit val system: ActorSystem = ActorSystem("cluster-playground") implicit val materializer: ActorMaterializer = ActorMaterializer() val config: Config = ConfigFactory.load() val address = config.getString("http.ip") val port = config.getInt("http.port") val nodeId = config.getString("clustering.ip") val node: ActorRef = system.actorOf(Node.props(nodeId), "node") lazy val routes: Route = healthRoute ~ statusRoutes ~ processRoutes Http().bindAndHandle(routes, address, port) println(s"Node $nodeId is listening at //$address:$port") Await.result(system.whenTerminated, Duration.Inf) }

Akka HTTP powers the server itself and the REST API, exposing three simple endpoints. These endpoints are defined in the NodeRoutes trait.

The first one is /health, to check the health of a node. It responds with a 200 OK if the node is up and running

lazy val healthRoute: Route = pathPrefix("health") { concat( pathEnd { concat( get { complete(StatusCodes.OK) } ) } ) }

The /status/members endpoint responds with the current active members of the cluster.

lazy val statusRoutes: Route = pathPrefix("status") { concat( pathPrefix("members") { concat( pathEnd { concat( get { val membersFuture: Future[List[String]] = (node ? GetClusterMembers).mapTo[List[String]] onSuccess(membersFuture) { members => complete(StatusCodes.OK, members) } } ) } ) } ) }

The last (but not the least) is the /process/fibonacci/n endpoint, used to request the Fibonacci number of n.

lazy val processRoutes: Route = pathPrefix("process") { concat( pathPrefix("fibonacci") { concat( path(IntNumber) { n => pathEnd { concat( get { val processFuture: Future[ProcessorResponse] = (node ? GetFibonacci(n)).mapTo[ProcessorResponse] onSuccess(processFuture) { response => complete(StatusCodes.OK, response) } } ) } } ) } ) }

It responds with a ProcessorResponse containing the result, along with the id of the node where the computation took place.

Cluster Configuration

Once we have all our actors, we need to configure the system to run as a cluster! The application.conf file is where the magic takes place. I’m going to split it in pieces to present it better, but you can find the complete file here.

Let’s start defining some useful variables.

clustering { ip = "127.0.0.1" ip = ${?CLUSTER_IP} port = 2552 port = ${?CLUSTER_PORT} seed-ip = "127.0.0.1" seed-ip = ${?CLUSTER_SEED_IP} seed-port = 2552 seed-port = ${?CLUSTER_SEED_PORT} cluster.name = "cluster-playground" }

Here we are simply defining the ip and port of the nodes and the seed, as well as the cluster name. We set a default value, then we override it if a new one is specified. The configuration of the cluster is the following.

akka { actor { provider = "cluster" ... /* router configuration */ ... } remote { log-remote-lifecycle-events = on netty.tcp { hostname = ${clustering.ip} port = ${clustering.port} } } cluster { seed-nodes = [ "akka.tcp://"${clustering.cluster.name}"@"${clustering.seed-ip}":"${clustering.seed-port} ] auto-down-unreachable-after = 10s } } ... /* server vars */ ... /* cluster vars */ }

Akka Cluster is build on top of Akka Remoting, so we need to configure it properly. First of all, we specify that we are going to use Akka Cluster saying that provider = "cluster". Then we bind cluster.ip and cluster.port to the hostname and port of the netty web framework.

The cluster requires some seed nodes as its entry points. We set them in the seed-nodes array, in the format akka.tcp://"{clustering.cluster.name}"@"{clustering.seed-ip}":”${clustering.seed-port}”. Right now we have one seed node, but we may add more later.

The auto-down-unreachable-after property sets a member as down after it is unreachable for a period of time. This should be used only during development, as explained in the official documentation.

Ok, the cluster is configured, we can move to the next step: Dockerization and deployment!

Dockerization and deployment

To create the Docker container of our node we can use sbt-native-packager. Its installation is easy: add addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.15") to the plugin.sbt file in the project/ folder. This amazing tool has a plugin for the creation of Docker containers. it allows us to configure the properties of our Dockerfile in the build.sbt file.

// other build.sbt properties enablePlugins(JavaAppPackaging) enablePlugins(DockerPlugin) enablePlugins(AshScriptPlugin) mainClass in Compile := Some("com.elleflorio.cluster.playground.Server") dockerBaseImage := "java:8-jre-alpine" version in Docker := "latest" dockerExposedPorts := Seq(8000) dockerRepository := Some("elleflorio")

Once we have setup the plugin, we can create the docker image running the command sbt docker:publishLocal. Run the command and taste the magic… ?

We have the Docker image of our node, now we need to deploy it and check that everything works fine. The easiest way is to create a docker-compose file that will spawn a seed and a couple of other nodes.

version: '3.5' networks: cluster-network: services: seed: networks: - cluster-network image: elleflorio/akka-cluster-playground ports: - '2552:2552' - '8000:8000' environment: SERVER_IP: 0.0.0.0 CLUSTER_IP: seed CLUSTER_SEED_IP: seed node1: networks: - cluster-network image: elleflorio/akka-cluster-playground ports: - '8001:8000' environment: SERVER_IP: 0.0.0.0 CLUSTER_IP: node1 CLUSTER_PORT: 1600 CLUSTER_SEED_IP: seed CLUSTER_SEED_PORT: 2552 node2: networks: - cluster-network image: elleflorio/akka-cluster-playground ports: - '8002:8000' environment: SERVER_IP: 0.0.0.0 CLUSTER_IP: node2 CLUSTER_PORT: 1600 CLUSTER_SEED_IP: seed CLUSTER_SEED_PORT: 2552

I won’t spend time going through it, since it is quite simple.

Let’s run it!

Time to test our work! Once we run the docker-compose up command, we will have a cluster of three nodes up and running. The seed will respond to requests at port :8000, while node1 and node2 at port :8001 and :8002. Play a bit with the various endpoints. You will see that the requests for a Fibonacci number will be computed by a different node each time, following a round-robin policy. That’s good, we are proud of our work and can get out for a beer to celebrate! ?

Conclusion

We are done here! We learned a lot of things in these ten minutes:

  • What Akka Cluster is and what can do for us.
  • How to create a distributed application with it.
  • How to configure a Group Router for load-balancing in the cluster.
  • How to Dockerize everything and deploy it using docker-compose.

You can find the complete application in my GitHub repo. Feel free to contribute or play with it as you like! ?

See you! ?