Spark (IV) - Desarrollando programas en Spark. Conceptos Previos.
mayo 13, 2014Bienvenid@s al cuarto capítulo sobre Apache Spark.
Daremos un repaso al API de Spark, utilizaremos el Shell Interactivo para ver sus componentes clave y veremos en que consisten los principales conceptos básicos para poder escribir programas eficientemente. Aviso que hoy no nos vamos a meter en Eclipse todavía, pero en el siguiente ya si que haremos los primeros programas. Necesitaremos Maven por si queréis ir pensando en instalarlo.
Recomiendo que sigáis los posts anteriores y que dejéis instalado Spark en Standalone para poder seguir los pasos que aquí mencionaré.
Este post pertenece a la serie de Spark:
Spark (V) - Desarrollando programas en Spark. Empezando a programar.
Spark (VI) - Monitorización de Spark
Spark (VII) - Ecosistema alrededor de Spark
Sin más que decir por mi parte, comenzamos.
Resumen de los tres primeros posts
A alto nivel podríamos decir que las aplicaciones Spark consisten en un Driver Program que ejecuta la función principal que el usuario haya creado y que a su vez ejecuta varias operaciones paralelas en el cluster.Primera abstracción importante: RDD
Los RDDs son una colección de elementos particionados a lo largo de los nodos del cluster con los que se puede operar en paralelo. Los RDDs se pueden crear a partir de un archivo del sistema de ficheros de Hadoop (o de cualquier otro archivo soportado por el sistema de Hadoop), o de una colección de Scala existente a través del driver program, pudiendo realizar transformaciones sobre los mismos. Los usuarios además, pueden decir a Spark que persista un RDD en memoria, permitiendo reusar eficientemente las operaciones que se realizan en paralelo. Por último, los RDD se recuperan automágimanete de los fallos en los nodos (recordemos el "lineage").Segunda abstracción importante: Variables compartidas
Se pueden utilizar variables compartidas para las operaciones que se realizan en paralelo. Por defecto, cuando Spark ejecuta una función en paralelo como un conjunto de tareas en diferentes nodos, este lleva una copia de cada variable utilizada en la función a cada tarea. En algunas ocasiones, una variable necesita ser compartida entre todas las tareas, o entre las tareas y el driver program.
Spark soporta dos tipos de variables compartidas:
· Broadcast variables: las cuales se usan para cachear un valor en memoria a todos los nodos.
· Accumulators: las cuales son variables que solo admiten "añadir algo", como contadores y sumas.
Veremos estas cosillas y alguna más.
Requisitos previos
· Spark en Standalone
· Hadoop (HDFS) instalado y configurado en Pseudo-dristribuido. Recodad que debe corresponderse con la versión que Spark utiliza, en el caso de Spark 0.9.1 es la 1.0.4. Una guía básica aquí. Descargarlo de aquí.
Una vez tengamos todo instalado y configurado deberíamos tener algo parecido a esto:
Si queréis que ponga tutorial sobre como instalar hadoop en Pseudo-distribuido comentármelo en esta misma página y en un periquete os lo pongo aquí.
De igual manera si creéis conveniente que haga un videotutorial con todos estos posts, lo haré encantado.
Shell interactivo
Si seguisteis el tercer post de Spark ya tendréis listos el Shell interactivo en scala para jugar con el, por tanto recomiendo su lectura al detalle.
Cómo paralelizar colecciones
1. Abrimos el shell interactivo de Spark. Vamos al directorio donde tengamos descargado Spark y ejecutamos:
bin/spark-shell
Veremos que se ha cargado un SparkContext. Genial, ya podemos jugar con el.
Nota: antes de seguir leyendo, vamos a jugar un poquete con el API de scala porque como comenté en uno de los posts anteriores, es mucho más cómodo trabajar con Spark. Una vez veamos las transformaciones y operaciones paralelas básicas pasaremos a las funciones in-line de Java.
2. Vamos a crear un array y lo vamos a hacer paralelizable. Nuestra intención es sumarizar cada elemento del array de forma paralela. Utilizaremos la función parallelize() que se encuentra en el SparkContext. Para sumarizar los elementos, le aplicaremos un reduce a los datos distribuidos. En la consola escribimos:
scala> val data = Array(25, 20, 15, 10, 5)
data: Array[Int] = Array(25, 20, 15, 10, 5)
scala> val distData = sc.parallelize(data)
distData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:14
scala> distData.reduce(_ + _)
res0: Int = 75
Cómo utilizar los Datasets de Hadoop
Spark puede crear datasets distribuidos para cualquier archivo almacenado en HDFS u otro sistema de almacenamiento soportado por Hadoop (incluyendo el sistema local de archivos, Amazon S3, Hypertable, HBase, etc). Spark soporta archivos de texto, Ficheros Secuenciales, y cualquier otro tipo de Hadoop InputFormat.
Se pueden crear RDDs a partir de archivos de texto utilizando el método textFile del SparkContext. Este método toma una URI de donde esté localizado el archivo (a menudo en el sistema de archivos local, o en hdfs://, s3n://, kfs://, etc).
Aquí un ejemplo de cómo invocarlo:
scala> val distFile = sc.textFile("README.md")
distFile: org.apache.spark.rdd.RDD[String] = MappedRDD[3] at textFile at <console>:12
scala> distFile.first
res5: String = # Apache Spark
Otro ejemplo pero ahora tirando de hdfs:
1. Primero cargamos el archivo README.txt del directorio de hadoop a hdfs. Para ello vamos al directorio donde descargamos hadoop y (para salir de la shell de spark anterior es con exit):
bin/hadoop fs -put README.txt /
2. Una vez cargado volvemos al directorio de spark y ejecutamos la shell de spark de nuevo. Creamos la variable distFile pero esta vez cargamos el README.txt que se encuentra en HDFS y obtenemos el primer elemento:
scala> val distFile = sc.textFile("hdfs://parallels-Parallels-Virtual-Platform:9000/README.txt")
distFile: org.apache.spark.rdd.RDD[String] = MappedRDD[5] at textFile at <console>:12
scala> distFile.first
res2: String = For the latest information about Hadoop, please visit our website at:
Parece que hacemos poco con HDFS, pero si en este punto de la serie tenéis HDFS configurado, en el siguiente que ya nos metemos manos a la obra, nos va a ser muy útil.
Existe alguna documentación más al respecto sobre esto y por ello recomiendo ir a la sección "Hadoop Datasets" de Spark, a tarvés de este link.
Operaciones con RDDs
Los RDDs soportan dos tipos de operaciones: transformations, las cuales crearán un nuevo datasets de uno ya existente, y las actions, que devolverán un valor del driver program después de la ejecución de los cálculos en el dataset. Por ejemplo, map es una transformación que pasa cada elemento del dataset a través de una función y devuelve un nuevo dataset distribuido como resultado.
Por otra parte, reduce es una action que agrega todos los elementos del dataset utilizando alguna función y devolviendo como resultado final al driver program (aunque aquí también existe un reduceByKey paralelo que devuelve un dataset distribuido).
Todas las transformaciones en Spark son perezosas, esto quiere de decir que no va a recalcular los resultados una y otra vez. En vez de eso, se aplicarán las transformaciones al mismo dataset base (por ejemplo un archivo).
Las transformaciones son solamente calculadas cuando una action requiere devolver un resultado al driver program. Este diseño permite a Spark ser más eficiente - por ejemplo, podemos hacer que un dataset creado a través de un map sea usado como un reduce y que devuelva solamente el resultado del reduce al driver, en lugar de el conjunto de datos mapeados del dataset (que es más grande en tamaño).
Por defecto, cada RDD transformado es recalculado cada vez que ejectuamos una action sobre el. Sin embargo, podemos persistir un RDD en memoria utilizando el método persist (o cache), y que en ese caso Spark mantendrá los elementos del cluster para un acceso mucho más rápido la próxima vez que ejecutemos una consulta en el (esto requiere mucha memoria, lo que no cabe en RAM lo paginará a disco). También hay soporte para persistir dataset en disco, o replicarlos a lo largo del cluster.
Transformations
Para ver el listado de transformations visitar este link. Si veis necesario explicar alguna en concreto, me lo hacéis saber y hacemos un post en profundidad.
Actions
Igualmente con las actions. Visitar este link para ver un listado resumido.
Persistencia de los RDD
Una de las características más importantes de Spark es la persistencia (o cacheo) de un dataset en memoria para poder utilizar operaciones. Cuando persistimos un RDD, cada nodo almacena una porción (slice) del mismo y que permite operar con el en memoria y reutilizarlo para otras acciones en el dataset (o datasets derivados de el). Este permite aplicar actions a futuro mucho más rápido (unas 10 veces más rápido). Utilizar bien el cacheo es la herramienta clave para crear algoritmo iterativo en Spark y utilizarlos interactivamente para poder interpretarlos.
Puedes poner un RDD a persistente a través de los métodos persists() o cache(). La primera vez se calcula en un action, que mantendrá en memoria el dataset en los nodos. El cacheo es tolterante a fallos (si una partición de un RDD se pierde, este la recalculada automáticamente utilizando las transformaciones que originalmente lo crearon).
Además, cada RDD puede ser almacenado utilizando diferentes niveles de almacenamiento, permitiéndonos, por ejemplo, persistir el dataset en disco, o persistirlo en memoria pero como una serialización de objetos Java (para guarda espacio), o replicarlo en los nodos. Estos niveles son elegidos a través del objeto org.apache.spark.storage.StorageLevel cuando hacemos persist(). El método cache() abrevia esto utilizando por defecto el storageLevel.MEMORY_ONLY (solo almacenará objetos deserializados en memoria).
Podemos ver más StorageLevel aquí.
Y la pregunta sería: ¿Cuál Storage Level elijo?
Los Storage Leve del Spark están pensando para permitirnos tratar diferentes compensaciones entre el uso de la memoria y el uso eficiente de la CPU. Desde Spark recomiendan seguir el siguiente proceso para seleccionar el nuestro:
· Si nuestros RDDs encajan de manera confortable en el storage level por defecto (MEMORY_ONLY, es decir, que caben en la RAM), lo dejamos tal cual está. Esta es la opción CPU-Eficiciente, permitiéndonos ejecutar operaciones en los RDDs tan rápido como sea de rápida nuestra máquina.
· Si no encajan, intentamos utilizar MEMORY_ONLY_SER y seleccionamos una libreria de serialización que sea muy rápida para hace que los objectos sean mucho más espacio-eficientes, pero que se penazile el acceso a los mismos.
· No volcar a disco a menos que las funciones que operan en el dataset produzcan mucho costes, o filtre una gran cantidad de datos. De lo contrario, recalcular una partición es casi tan rápido como la lectura del disco.
· Usa storage leves replicados si quieres se rápido en tolerancia a fallos (por ejemplo, si utilizas Spark para realizar peticiones de un servicio web). Todos los storage leve son completamente tolerantes a fallos mediante el recálculo de datos perdidos, pero la replicación permite ejecutar tareas de manera continua sin tener que esperar a recalcular una partición perdida.
Variables compartidas
Normalmente, cuando se pasa una función a una operación de Spark (como un map o un reduce) es ejecutado en un nodo remoto del cluster, funcionando en copias separadas de todas las variables utilizadas en la función. Estas variables son copias de cada máquina, y no actualizan las variables de la máquina remota que además son propadadas hacia el driver program. Por regla general, leer-escribir variables en las tareas tendría que ser ineficiente. Sin embargo, Spark permite dos tipos limitados de variables compartidas para dos usos que se dan frecuentemente: broadcast y accumulators.
Variables Broadcast
Las variables Broadcast permiten al programador mantener cacheada una variable de sólo lectura en cada máquina en vez de enviar una copia de ella a las tareas. Estas se pueden usar, por ejemplo, para darnos a una copia de un dataset como entrada pero de un modo eficiente en cada nodo. Spark además intenta distribuir las variables broadcast utilizando algoritmos eficientes de broadcasting para reduce el coste de comunicación.
Las variables broadcast son creadas de una variable v a través de la llamada SparkContext.broadcast(v). Esta variable es un envoltorio sobre v, y su valor puede ser accediro mediante la llamada del método value. El shell interactivo mostrará algo parecido a esto:
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(2)
scala> broadcastVar.value
res6: Array[Int] = Array(1, 2, 3)
Después de que una variable broadcast haya sido creada, esta debería utilizarse en vez de el valor v en cualquier funcion que ejecutemos en el cluster ya que v no será enviada a los nodos nada más que una vez. Además, el objeto v no debería ser modificado después de ser broadcasteado.
Variables Accumulators
Los accumulatos son variables que sólamente "añaden" a través de una operación asociantiva y puede ser paralelizadas de un modo eficiente. Estas puede ser usadas para implementar contadores (como en MapReduce) o sumas. Spark soporta de manera nativa los accumulators de tipos de valores numéricos y collections standars mutables, donde los programadores puede añadir soporte para nuevos tipos.
Un acumulador se crea de un valor inicial v llamando al SparkContext.accumulator(v). Las tareas que se ejecutan en el cluster puede ser añadidas utilizando el operador +=. Sin embargo, no pueden leer su valor. Sólamente el driver program puede leer el valor del accumulator, utilizando el método value.
A continuación veremos como se utiliza un accumulator:
scala> val accum = sc.accumulator(0)
accum: org.apache.spark.Accumulator[Int] = 0
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
scala> accum.value
res8: Int = 10
Alguna cosa más
Antes de meter las manos en Eclipse, comentar que podéis ejecutar una prueba rápida de Spark a través de sus variados ejemplos:
./bin/run-example org.apache.spark.examples.SparkPi spark://parallels-Parallels-Virtual-Platform:7077
Obtendréis algo parecido a esto:
Como dije al principio, id instalando maven que en el próximo le metemos mano.
Saludos!
Fuentes:
· Spark: http://spark.apache.org/docs/latest/
6 comentarios
solo lo he empezado a leer, pero tiene na pinta estupenda. Un gran trabajo. cuando complete la serie estoy seguro que volveré a comentar lo mucho que me ha gustado.
ResponderEliminarPor favor, puedes poner el tutorial de instalación de hadoop.
Muchas gracias.
Felicidades por el trabajo.
Perfecto! me pongo a ello. En cuanto lo tenga te respondo por aquí también.
EliminarGracias por el feedback :)
Buenas, dicho y hecho, aquí lo tienes:
Eliminarhttp://www.franciscojavierpulido.com/2014/05/descargar-instalar-configurar-hadoop.html
Un saludo!!!!!
Hola Javier. Enhorabuena. Me ha gustado mucho leer tu blog y me viene fenomenal para repasar. Por cierto, has continuado con la V parte? Gracias y saludos
ResponderEliminarHola Francisco, todo este tema de Spark es bastante interesante, gracias por el aporte. Has continuado con el capitulo "Spark (V) - Desarrollando programas en Spark. Empezando a programar."? no lo encuentro.
ResponderEliminarHola.
ResponderEliminarCuando me sale el mensaje de SPARK instalado, no me dice "created spark context" , tampoco "spark context available as sc", y tal vez por eso me dice "Error not found value sc"
Sé respetuoso/a, en este blog caben todo tipo de opiniones con respeto y serenidad.