
Apache Spark es un marco de computación en clúster tolerante a fallas de código abierto que también admite análisis de SQL, aprendizaje automático y procesamiento de gráficos.
Funciona dividiendo sus datos en particiones y luego procesando esas particiones en paralelo en todos los nodos del clúster. Si algún nodo deja de funcionar, reasigna la tarea de ese nodo a un nodo diferente y, por tanto, proporciona tolerancia a fallos.
Ser 100 veces más rápido que Hadoop lo ha hecho muy popular para el procesamiento de Big Data. Spark está escrito en Scala y se ejecuta en la JVM, pero la buena noticia es que también proporciona API para Python y R, así como para C #. Está bien documentado con ejemplos que debería consultar.
Cuando esté listo para probarlo, este artículo lo guiará desde la descarga y la configuración hasta el ajuste del rendimiento. Mi pequeño grupo Spark realizó 100 millones de coincidencias de cadenas en todos los artículos de Wikipedia, en menos de dos horas.
Es cuando superas los tutoriales y haces un trabajo serio que te das cuenta de todas las molestias de la pila tecnológica que estás usando. Aprender a través de los errores es la mejor forma de aprender. Pero a veces tienes poco tiempo y desearías saber todo lo posible que podría salir mal.
A continuación, describo algunos de los problemas que enfrenté al comenzar con Spark y cómo puede evitarlos.
Cómo empezar
Descargue el binario Spark que viene con las dependencias empaquetadas de Hadoop
Si se propone descargar Spark, notará que hay varios binarios disponibles para la misma versión. Spark anuncia que no necesita Hadoop, por lo que puede descargar la versión de hadoop proporcionada por el usuario, que es más pequeña. No hagas eso .
Aunque Spark no usa el marco MapReduce de Hadoop, tiene dependencias en otras bibliotecas de Hadoop como HDFS y YARN. La versión sin hadoop es para cuando ya tiene bibliotecas Hadoop proporcionadas en otro lugar.
Utilice el modo de clúster independiente, no Mesos o YARN
Una vez que pruebe los ejemplos integrados en el local
clúster y se asegure de que todo esté instalado y funcione correctamente, proceda a configurar su clúster.
Spark te ofrece tres opciones: Mesos, YARN y autónomo.
Los dos primeros son asignadores de recursos que controlan sus nodos de réplica. Spark tiene que solicitarles que asignen sus propias instancias. Como principiante, no aumente su complejidad yendo por ese camino.
El clúster independiente es el más fácil de configurar. Viene con valores predeterminados sensibles, como usar todos sus núcleos para ejecutores. Es parte de la distribución Spark en sí y tiene un sbin/start-all.sh
script que puede mostrar el primario y todas las réplicas enumeradas en el conf/slaves
uso de ssh.
Mesos / YARN son programas separados que se utilizan cuando su clúster no es solo un clúster de chispa. Además, no vienen con valores predeterminados razonables: los ejecutores no usan todos los núcleos en las réplicas a menos que se especifique explícitamente.
También tiene la opción de un modo de alta disponibilidad con Zookeeper, que mantiene una lista de los primarios de respaldo en caso de que alguno de los primarios falle. Si es un principiante, es probable que no esté manejando un clúster de mil nodos donde el riesgo de falla del nodo es significativo. Es más probable que configure un clúster en una plataforma en la nube administrada como Amazon o Google, que ya se ocupa de las fallas de los nodos.
No necesita alta disponibilidad con infraestructura en la nube o un clúster pequeño
Hice instalar mi clúster en un entorno hostil donde los factores humanos eran responsables de las fallas de energía y los nodos se desconectaban de la red. (Básicamente, mi laboratorio de computación de la universidad donde los estudiantes diligentes apagan la máquina y los estudiantes descuidados sacan los cables LAN). Aún podría lograrlo sin alta disponibilidad mediante una elección cuidadosa del nodo principal. No tendrías que preocuparte por eso.
Verifique la versión de Java que usa para ejecutar Spark
Un aspecto muy importante es la versión de Java que usa para ejecutar Spark. Normalmente, una versión posterior de Java funciona con algo compilado para versiones anteriores.
Pero con Project Jigsaw, la modularidad introdujo un aislamiento y límites más estrictos en Java 9 que rompe ciertas cosas que usan la reflexión. En Spark 2.3.0 ejecutándose en Java 9, obtuve acceso de reflexión ilegal. Java 8 no tuvo problemas.
Esto definitivamente cambiará en un futuro cercano, pero téngalo en cuenta hasta entonces.
Especifique la URL principal exactamente como está. No resuelva los nombres de dominio a direcciones IP, o viceversa
El clúster independiente es muy sensible a las URL que se utilizan para resolver los nodos primarios y de réplica. Suponga que inicia el nodo principal como se muestra a continuación:
> sbin/start-master.sh
y tu primaria está en localhost:8080

De forma predeterminada, el nombre de host de su PC se elige como la dirección URL principal. x360
se resuelve, localhost
pero iniciar una réplica como la siguiente no funcionará.
# does not work > sbin/start-slave.sh spark://localhost:7077
# works > sbin/start-slave.sh spark://x360:7077
Esto funciona y nuestra réplica se ha agregado al clúster:

Nuestra réplica tiene una dirección IP en el subdominio 172.17.xx, que en realidad es el subdominio configurado por Docker en mi máquina.
El principal puede comunicarse con esta réplica porque ambos están en la misma máquina. Pero la réplica no se puede comunicar con otras réplicas en la red, o un primario en una máquina diferente, porque su dirección IP no es enrutable.
Como en el caso principal anterior, una réplica en una máquina sin principal ocupará el nombre de host de la máquina. Cuando tienes máquinas idénticas, todas terminan usando el mismo nombre de host como dirección. Esto crea un lío total y nadie puede comunicarse con el otro.
Entonces los comandos anteriores cambiarían a:
# start master> sbin/start-master.sh -h $myIP # start slave > sbin/start-slave.sh -h $myIP spark://:7077 # submit a job > SPARK_LOCAL_IP=$myIP bin/spark-submit ...
donde myIP
es la dirección IP de la máquina que se puede enrutar entre los nodos del clúster. Es más probable que todos los nodos estén en la misma red, por lo que puede escribir un script que se establecerá myIP
en cada máquina.
# assume all nodes in the 10.1.26.x subdomain [email protected]:~$ myIP=`hostname -I | tr " " "\n" | grep 10.1.26. | head`
Flujo del código
So far we have set up our cluster and seen that it is functional. Now its time to code. Spark is quite well-documented and comes with lots of examples, so its very easy to get started with coding. What is less obvious is how the whole thing works which results in some very hard to debug errors during runtime. Suppose you coded something like this:
class SomeClass { static SparkSession spark; static LongAccumulator numSentences;
public static void main(String[] args) { spark = SparkSession.builder() .appName("Sparkl") .getOrCreate(); (1) numSentences = spark.sparkContext() .longAccumulator("sentences"); (2) spark.read() .textFile(args[0]) .foreach(SomeClass::countSentences); (3) } static void countSentences(String s) { numSentences.add(1); } (4) }
1 create a spark session
2 create a long counter to keep track of job progress
3 traverse a file line by line calling countSentences for each line
4 add 1 to the accumulator for each sentence
The above code works on a local
cluster but will fail with a null pointer exception when run on a multinode cluster. Both spark
as well as numSentences
will be null on the replica machine.
To solve this problem, encapsulate all initialized states in non-static fields of an object. Use main
to create the object and defer further processing to it.
What you need to understand is that the code you write is run by the driver node exactly as is, but what the replica nodes execute is a serialized job that spark gives them. Your classes will be loaded by the JVM on the replica.
Static initializers will run as expected, but functions like main
won’t, so static values initialized in the driver won’t be seen in the replica. I am not sure how the whole thing works, and am only inferring from experience, so take my explanation with a grain of salt. So your code now looks like:
class SomeClass { SparkSession spark; (1) LongAccumulator numSentences; String[] args; SomeClass(String[] args) { this.args = args; } public static void main(String[] args){ new SomeClass(args).process(); (2) } void process() { spark = SparkSession.builder().appName("Sparkl").getOrCreate(); numSentences = spark.sparkContext().longAccumulator("sentences"); spark.read().textFile(args[0]).foreach(this::countSentences); (3) } void countSentences(String s) { numSentences.add(1); }}
1 Make fields non static
2 create instance of the class and then execute spark jobs
3 reference to this
in the foreach lambda brings the object in the closure of accessible objects and thus gets serialized and sent to all replicas.
Those of you who are programming in Scala might use Scala objects which are singleton classes and hence may never come across this problem. Nevertheless, it is something you should know.
Submit app and dependencies
There is more to coding above, but before that you need to submit your application to the cluster. Unless your app is extremely trivial, chances are you are using external libraries.
When you submit your app jar, you also need to tell Spark the dependent libraries that you are using, so it will make them available on all nodes. It is pretty straightforward. The syntax is:
bin/spark-submit --packages groupId:artifactId:version,...
I have had no issues with this scheme. It works flawlessly. I generally develop on my laptop and then submit jobs from a node on the cluster. So I need to transfer the app and its dependencies to whatever node I ssh into.
Spark looks for dependencies in the local maven repo, then the central repo and any repos you specify using --repositories
option. It is a little cumbersome to sync all that on the driver and then type out all those dependencies on the command line. So I prefer all dependencies packaged in a single jar, called an uber jar.
Use Maven shade plugin to generate an uber jar with all dependencies so job submitting becomes easier
Just include the following lines in your pom.xml
org.apache.maven.plugins maven-shade-plugin shade
When you build and package your project, the default distribution jar will have all dependencies included.
As you submit jobs, the application jars get accumulated in the work
directory and fill up over time.
Set spark.worker.cleanup.enabled
to true in conf/spark-defaults.conf
This option is false by default and is applicable to the stand-alone mode.
Input and Output files
This was the most confusing part that was difficult to diagnose.
Spark supports reading/writing of various sources such as hdfs
, ftp
, jdbc
or local files on the system when the protocol is file://
or missing. My first attempt was to read from a file on my driver. I assumed that the driver would read the file, turn it into partitions, and then distribute those across the cluster. Turns out it doesn’t work that way.
When you read
a file from the local filesystem, ensure that the file is present on all the worker nodes at exactly the same location. Spark does not implicitly distribute files from the driver to the workers.
So I had to copy the file to every worker at the same location. The location of the file was passed as an argument to my app. Since the file was located in the parent folder, I specified its path as ../wikiArticles.txt
. This did not work on the worker nodes.
Always pass absolute file paths for reading
It could be a mistake from my side, but I know that the filepath made it as is into the textFile
function and it caused “file not found” errors.
Spark supports common compression schemes, so most gzipped or bzipped text files will be uncompressed before use. It might seem that compressed files will be more efficient, but do not fall for that trap.
Don’t read from compressed text files, especially gzip
. Uncompressed files are faster to process.
Gzip cannot be uncompressed in parallel like bzip2, so nodes spend the bulk of their time uncompressing large files.
It is a hassle to make the input files available on all workers. You can instead use Spark’s file broadcast mechanism. When submitting a job, specify a comma separated list of input files with the --files
option. Accessing these files requires SparkFiles.get(filename)
. I could not find enough documentation on this feature.
To read a file broadcasted with the --files
option, use SparkFiles.get(
So a file submitted as
--files /opt/data/wikiAbstracts.txt
would be accesed as SparkFiles.get("WikiAbstracts.txt")
. This returns a string which you can use in any read function that expects a path. Again, remember to specify absolute paths.
Since my input file was 5GB gzipped, and my network was quite slow at 12MB/s, I tried to use Spark’s file broadcast feature. But the decompression itself was taking so long that I manually copied the file to every worker. If your network is fast enough, you can use uncompressed files. Or alternatively, use HDFS or FTP server.
Writing files also follows the semantics of reading. I was saving my DataFrame to a csv file on the local system. Again I had the assumption that the results would be sent back to the driver node. Didn’t work for me.
When a DataFrame is saved to local file path, each worker saves its computed partitions to its own disk. No data is sent back to the driver
I was only getting a fraction of the results I was expecting. Initially I had misdiagnosed this problem as an error in my code. Later I found out that each worker was storing its computed results on its own disk.
Partitions
The number of partitions you make affects the performance. By default, Spark will make as many partitions as there are cores in the cluster. This is not always optimal.
Keep an eye on how many workers are actively processing tasks. If too few, increase the number of partitions.
If you read from a gzipped file, Spark creates just one partition which will be processed by only one worker. That is also one reason why gzipped files are slow to process. I have observed slower performance with small number of large partitions as compared to a large number of small partitions.
It’s better to explicitly set the number of partitions while reading data.
You may not have to do this when reading from HDFS, as Hadoop files are already partitioned.
Wikipedia and DBpedia
There are no gotchas here, but I thought it would be good to make you aware of alternatives. The entire Wikipedia xml dump is 14GB compressed and 65 GB uncompressed. Most of the time you only want the plain text of the article, but the dump is in MediaWiki markup so it needs some preprocessing. There are many tools available for this in various languages. Although I haven’t used them personally, I am pretty sure it must be a time consuming task. But there are alternatives.
If all you want is the Wikipedia article plaintext, mostly for NLP, then download the dataset made available by DBpedia.
I used the full article dump (
NIF Context
) available at DBpedia (direct download from here). This dataset gets rid of unwanted stuff like tables, infoboxes, and references. The compressed download is 4.3GB in the turtle
format. You can covert it to tsv
like so
Similar datasets are available for other properties like page links, anchor texts, and so on. Do check out DBpedia.
A word about databases
I never quite understood why there is a plethora of databases, all so similar, and on top of that people buy database licenses. Until this project I hadn’t seriously used any. I ever only used MySQL and Apache Derby.
For my project I used a SPARQL triple store database, Apache Jena TDB, accessed over a REST API served by Jena Fuseki. This database would give me RDF urls, labels, and predicates for all the resources mentioned in the supplied article. Every node would make a database call and only then would proceed with further processing.
My workload had become IO bound, as I could see near 0% CPU utilization on worker nodes. Each partition of the data would result in two SPARQL queries. In the worst case scenario, one of the two queries was taking 500–1000 seconds to process. Thankfully, the TDB database relies on Linux’s memory mapping. I could map the whole DB into RAM and significantly improve performance.
If you are IO bound and your database can fit into RAM, run it in memory.
I found a tool called vmtouch which would show what percentage of the database directory had been mapped into memory. This tool also allows you to explicitly map any files/directories into the RAM and optionally lock it so it wont get paged out.
My 16GB database could easily fit into my 32 GB RAM server. This boosted query performance by orders of magnitude to 1–2 seconds per query. Using a rudimentary form of database load balancing based on partition number, I could cut down my execution time to half by using 2 SPARQL servers instead of one.
Conclusion
I truly enjoyed distributed computing on Spark. Without it I could not have completed my project. It was quite easy to take my existing app and have it run on Spark. I definitely would recommend anyone to give it a try.
Originally published at siddheshrane.github.io.

Original text