Una biblioteca de streaming con un superpoder: FS2 y programación funcional

Scala tiene una biblioteca de transmisión muy especial llamada FS2 (Functional Streams for Scala). Esta biblioteca incorpora todas las ventajas de la programación funcional (FP). Al comprender sus objetivos de diseño, se expondrá a las ideas centrales que hacen que FP sea tan atractivo.

FS2 tiene un tipo central: Stream[Effect,Output]

Puede obtener de este tipo que es un Streamy que emite valores de tipo Output.

La pregunta obvia aquí es ¿qué es Effect? ¿Cuál es el vínculo entre Effecty Output? ¿Y qué ventajas tiene FS2 sobre otras bibliotecas de transmisión?

Visión general

Comenzaré revisando qué problemas resuelve FS2. Luego comparo Listy Streamcon varios ejemplos de código. Después de eso, me enfocaré en cómo usarlo Streamcon una base de datos o cualquier otro IO. Ahí es donde brilla FS2 y donde Effectse usa el tipo. Una vez que comprenda lo que Effectes, las ventajas de la programación funcional deberían ser evidentes para usted.

Al final de esta publicación, obtendrá las respuestas a las siguientes preguntas:

  • ¿Qué problemas puedo solucionar con FS2?
  • ¿Qué puedo hacer con Streameso que Listno puedo?
  • ¿Cómo puedo alimentar datos desde una API / archivo / base de datos a Stream?
  • ¿Qué es este Effecttipo y cómo se relaciona con la programación funcional?

Nota: El código está en Scala y debería ser comprensible incluso sin conocimiento previo de la sintaxis.

¿Qué problemas puedo solucionar con FS2?

  1. E / S de transmisión: carga incremental de conjuntos de big data que no caben en la memoria y opera con ellos sin gastar mucho.
  2. Flujo de control (no cubierto): mover datos de uno / varios DB / archivos / API a otros de una manera declarativa agradable.
  3. Simultaneidad (no cubierto): ejecute diferentes transmisiones en paralelo y haga que se comuniquen entre sí. Por ejemplo, cargar datos de varios archivos y procesarlos simultáneamente en lugar de secuencialmente. Puedes hacer algunas cosas avanzadas aquí. Los flujos pueden comunicarse entre sí durante la etapa de procesamiento y no solo al final.

List vs Stream

Listes la estructura de datos más conocida y utilizada. Para tener una idea de cómo se diferencia de un FS2 Stream, analizaremos algunos casos de uso. Veremos cómo se Streampueden resolver problemas que Listno pueden.

Tus datos son demasiado grandes y no caben en la memoria

Digamos que tiene un archivo muy grande (40 GB) fahrenheit.txt. El archivo tiene una temperatura en cada línea y desea convertirlo a celsius.txt.

Cargando un archivo grande usando List

import scala.io.Source val list = Source.fromFile("testdata/fahrenheit.txt").getLines.toList java.lang.OutOfMemoryError: Java heap space java.util.Arrays.copyOfRange(Arrays.java:3664) java.lang.String.(String.java:207) java.io.BufferedReader.readLine(BufferedReader.java:356) java.io.BufferedReader.readLine(BufferedReader.java:389)

Listfalla miserablemente porque, por supuesto, el archivo es demasiado grande para caber en la memoria. Si tiene curiosidad, puede verificar la solución completa usando Streamaquí, pero hágalo más tarde, siga leyendo :)

Cuando List no sirve ... ¡Transmítete al rescate!

Digamos que logré leer mi archivo y quiero volver a escribirlo. Me gustaría conservar la estructura de la línea. Necesito insertar un carácter de nueva línea \ndespués de cada temperatura.

Puedo usar el interspersecombinador para hacer eso

import fs2._ Stream(1,2,3,4).intersperse("\n").toList

Otro bonito es zipWithNext

scala> Stream(1,2,3,4).zipWithNext.toList res1: List[(Int, Option[Int])] = List((1,Some(2)), (2,Some(3)), (3,Some(4)), (4,None))

Agrupa cosas consecutivas, muy útil si desea eliminar duplicados consecutivos.

Estos son solo algunos de muchos muy útiles, aquí está la lista completa.

Obviamente, se Streampueden hacer muchas cosas que Listno se pueden hacer, pero la mejor característica viene en la siguiente sección, se trata de cómo usar Streamen el mundo real con DB / Archivos / API ...

¿Cómo puedo alimentar datos desde una API / archivo / base de datos a Stream?

Digamos por ahora que este es nuestro programa

scala> Stream(1,2,3) res2: fs2.Stream[fs2.Pure,Int] = Stream(..)

¿Qué significa esto Pure? Aquí está el scaladoc del código fuente:

/** * Indicates that a stream evaluates no effects. * * A `Stream[Pure,O]` can be safely converted to a `Stream[F,O]` for all `F`. */ type Pure[A] <: Nothing

Significa que no hay efectos, ok ..., pero ¿Qué es un efecto? y más específicamente, ¿cuál es el efecto de nuestro programa Stream(1,2,3)?

Este programa, literalmente, no tiene ningún efecto en el mundo. ¡Su único efecto será que su CPU funcione y consuma algo de energía! No afecta al mundo que te rodea.

Al afectar al mundo me refiero a que consume cualquier recurso significativo como un archivo, una base de datos o produce algo como un archivo, carga algunos datos en algún lugar, escribe en su terminal, etc.

¿Cómo convierto una Puretransmisión en algo útil?

Digamos que quiero cargar identificadores de usuario desde una base de datos, me dan esta función, hace una llamada a la base de datos y devuelve el ID de usuario como Long.

import scala.concurrent.Future def loadUserIdByName(userName: String): Future[Long] = ???

Devuelve un Futureque indica que esta llamada es asincrónica y el valor estará disponible en algún momento en el futuro. Envuelve el valor devuelto por la base de datos.

Tengo esta Purecorriente.

scala> val names = Stream("bob", "alice", "joe") names: fs2.Stream[fs2.Pure,String] = Stream(..)

¿Cómo obtengo un StreamID?

The naive approach would be to use the map function, it should run the function for each value in the Stream.

scala> userIdsFromDB.compile res5: fs2.Stream.ToEffect[scala.concurrent.Future,Long] = [email protected]

I still got back a Pure! I gave the Stream a function that affects the world and I still got a Pure, not cool ... It would have been neat if FS2 would have detected automatically that the loadUserIdByName function has an effect on the world and returned me something that is NOT Pure but it does not work like that. You have to use a special combinator instead of map: you have to use evalMap.

scala> userIdsFromDB.toList :18: error: value toList is not a member of fs2.Stream[scala.concurrent.Future,Long] userIdsFromDB.toList ^

No more Pure! we got Future instead, yay! What just happened?

It took:

  • loadUserIdByName: Future[Long]
  • Stream[Pure, String]

And switched the types of the stream to

  • Stream[Future, Long]

It separated the Future and isolated it! The left side that was the Effect type parameter is now the concrete Future type.

Neat trick, but how does it help me?

You just witnessed true separation of concerns. You can continue to operate on the stream with all the nice List like combinators and you don't have to worry about if the DB is down, slow or all the stuff that is related to the network (effect) concerns.

It all works until I want to use toList to get the values back

scala> userIdsFromDB.toList :18: error: value toList is not a member of fs2.Stream[scala.concurrent.Future,Long] userIdsFromDB.toList ^

What???!!! I could swear that I used toList before and it worked, how can it say that toList is not a member of fs2.Stream[Future,String] any more? It is as if this function was removed the moment I started using an effect-ful stream, that's impressive! But how do I get my values back?

scala> userIdsFromDB.compile res5: fs2.Stream.ToEffect[scala.concurrent.Future,Long] = [email protected]

First we use compile to tell the Stream to combine all the effects into one, effectively it folds all the calls to loadUserIdByName into one big Future. This is needed by the framework, and it will become apparent why this step is needed soon.

Now toList should work

scala> userIdsFromDB.compile.toList :18: error: could not find implicit value for parameter F: cats.effect.Sync[scala.concurrent.Future] userIdsFromDB.compile.toList ^

What?! the compiler is still complaining. That’s because Future is not a good Effect type — it breaks the philosophy of separation of concerns as explained in the next very important section.

IMPORTANT: The ONE thing to take away from this post

A key point here, is that the DB has not been called at this point. Nothing happened really, the full program does not produce anything.

def loadUserIdByName(userName: String): Future[Long] = ??? Stream("bob", "alice", "joe").evalMap(loadUserIdByName).compile

Separating program description from evaluation

Yes it might be surprising but the major theme in FP is separating the

  • Description of your program: a good example is the program we just wrote, it’s a pure description of the problem “I give you names and a DB, give me back IDs”

And the

  • Execution of your program: running the actual code and asking it to go to the DB

One more time our program has literally no effect on the world besides making your computer warm, exactly like our Pure stream.

Code that does not have an effect is called pure and that’s what all Functional Programming is about: writing programs with functions that are pure. Bravo, you now know what FP is all about.

Why would you want write code this way? Simple: to achieve separation of concerns between the IO parts and the rest of our code.

Now let’s fix our program and take care of this Future problem.

As we said Future is a bad Effect type, it goes against the separation of concerns principle. Indeed, Future is eager in Scala: the moment you create one it starts to executes on some thread, you don't have control of the execution and thus it breaks. FS2 is well aware of that and does not let you compile. To fix this we have to use a type called IO that wraps our bad Future.

That brings us to the last part, what is this IO type? and how do I finally get my list of usedIds back?

scala> import cats.effect.IO import cats.effect.IO scala> Stream("bob", "alice", "joe").evalMap(name => IO.fromFuture(IO(loadUserIdByName(name)))).compile.toList res8: cats.effect.IO[List[Long]] = IO$2104439279

It now gives us back a List but still, we didn't get our IDs back, so one last thing must be missing.

What does IO really mean?

IO comes from cats-effect library. First let's finish our program and finally get out the ids back from the DB.

scala> userIds.compile.toList.unsafeRunSync :18: error: not found: value userIds userIds.compile.toList.unsafeRunSync ^

The proof that it’s doing something is the fact that it’s failing.

loadUserIdByName(userName: String): Future[Long] = ???

When ??? is called you will get this exception, it means the function was executed (as opposed to before when we made the point that nothing was really happening). When we implement this function it will go to the DB and load the ids, and it will have an effect on the world (network/files system).

IO[Long] is a description of how to get a value of type Long and it most certainly involves doing some I/O i.e going to the network, loading a file,...

It’s the How and not the What. It describes how to get the value from the network. If you want to execute this description, you can use unsafeRunSync (or other functions prefixed unsafe). You can guess why they are called this way: indeed a call to a DB is inherently unsafe as it could fail if, for example, your Internet connection is out.

Recap

Let’s take a last look at Stream[Effect,Output].

Output is the type that the stream emits (could be a stream of String, Long or whatever type you defined).

Effect is the way (the recipe) to produce the Output (i.e go to the DB and give me an id of type Long).

It’s important to understand that if these types are separated to make things easier, breaking down a problem in subproblems allows you to reason about the subproblems independently. You can then solve them and combine their solutions.

The link between these 2 types is the following :

In order for the Stream to emit an element of type

  • Output

It needs to evaluate a type

  • Effect

A special type that encodes an effective action as a value of type IO, this IO value allows the separation of 2 concerns:

  • Description:IO is a simple immutable value, it’s a recipe to get a type A by doing some kind of IO(network/filesystem/…)
  • Execution: in order forIO to do something, you need to execute/run it using io.unsafeRunSync

Putting it all together

Stream[IO,Long] says:

This is a Stream that emits values of type Long and in order to do so, it needs to run an effective function that producesIO[Long] for each value.

That’s a lot of details packed in this very short type. The more details you get about how things happen the fewer errors you make.

Takeaways

  • Stream is a super charged version of List
  • Stream(1,2,3) is of type Stream[Pure, Int] , the second type Int is the type of all values that this stream will emit
  • Pure means no effect on the world. It just makes your CPU work and consumes some power, but besides that it does not affect the world around you.
  • Use evalMap instead of map when you want to apply a function that has an effect like loadUserIdByName to a Stream.
  • Stream[IO, Long] separates the concerns of What and How by letting you work only with the values and not worrying about how to get them (loading from the db).
  • Separating program description from evaluation is a key aspect of FP.
  • All the programs you write with Stream will do nothing until you use unsafeRunSync. Before that your code is effectively pure.
  • IO[Long] is an effect type that tells you: you will get Long values from IO (could be a file, the network, the console ...). It's a description and not a wrapper!r
  • Future does not abide by this philosophy and thus is not compatible with FS2, you have to use IO type instead.

FS2 videos

  • Hands on screencast by Michael Pilquist: //www.youtube.com/watch?v=B1wb4fIdtn4
  • Talk by Fabio Labella //www.youtube.com/watch?v=x3GLwl1FxcA