Recuerda que puedes descargarte algunos de los ejemplos en la pestaña de Código Fuente

jueves, 31 de enero de 2013

El Clúster Hadoop y sus 5 demonios


Una vez que hemos visto teóricamente qué es Hadoop y sus dos partes principales HDFS y MapReduce, vamos a ver quién hace que todo esto funcione.

Un Clúster Hadoop es un conjunto de máquinas donde se ejecuta HDFS. Cada máquina es un nodo del clúster. La ventaja de este tipo de sistemas distribuídos es que las máquinas pueden ser de bajo coste y que la infraestructura puede escalar de forma horizontal facilmente.

Un Clúster puede tener como poco un sólo nodo y como máximo varios miles (hasta 30 000 a día de hoy según la web de Apache). Cuantos más nodos, mejor rendimiento.

Para que Hadoop funcione hay 5 demonios que lo manejan repartidos en dos tipos de máquinas.
Los 5 demonios son:
  • NameNode
  • Secondary Namenode
  • DataNode
  • JobTracker
  • TaskTracker

Cada demonio se ejecuta en su propia máquina virtual de Java (JVM)

Y los dos tipos de máquinas son:
  • Master Node: Que incluye el NameNode, el Secondary NameNode y el JobTracker
  • Slave Node: Que incluye los DataNode y el TaskTracker.

Algo muy importante que hay que tener claro es que no es lo mismo HDFS que MapReduce, en una aplicación Hadoop participan las dos partes, y quizás varios de los demonios se están ejecutando en las mismas máquinas, pero hay que tener claro que no son lo mismo, cuál pertenece a qué parte y cómo funcionan.

Demonios Hadoop

Antes de empezar a describir los distintos demonios estaría bien aclarar la terminología:

  • Job: Es una ejecución completa de una aplicación. La ejecución de todos los Mappers y Reducers de todos los datos.
  • Task: Es la ejecución de un sólo Mapper o de un sólo Reducer sobre un conjunto de datos.


NameNode

Es el nodo máster encargado de gestionar el namespace del sistema de ficheros.
También se encarga del mantenimiento de los metadata de todos los ficheros y directorios que forman parte del sistema HDFS.

Conoce los DataNodes del clúster y sabe cómo están repartidos en estos DataNodes los diferentes bloques que pertenecen a un fichero.

Todos los metadata los tiene en RAM para acceder más rápido, aunque también existe una copia de esta información en el disco duro (de esta forma, no se pierden estos metadata si se apaga el NameNode),

Tiene dos funciones:

Por un lado, cuando llega un nuevo fichero, es quien se encarga de decir qué bloques van y dónde en el clúster. No hay movimiento de datos a través del NameNode, sino que ese fichero se particiona y los bloques van directamente a guardarse en los DataNodes. Luego el NameNode actualiza su metadata.

Por otro lado, cuando una aplicación cliente quiere leer un fichero ésta se comunica con el NameNode para determinar qué bloques forman el fichero y dónde están y posteriormente ir a buscarlos a los DataNodes correspondientes.

Si el NameNode deja de funcionar, el sistema no puede seguir usándose. Y si además el NameNode se destruye, todos los datos del clúster se perderían ya que no habría forma de reconstruir los ficheros en base a los bloques que se encuentran en los DataNodes (a no ser que tengamos una copia de seguridad de los metadata de HDFS en otro servidor).

Secondary Namenode

Este demonio causa un poco de confusión, ya que a primera vista parece una copia de seguridad o backup del NameNode, pero hay que tener mucho cuidado, ya que NO lo es.

Su papel consiste en realizar una tarea de mantenimento de los ficheros de metadata del NameNode (consolidando el fichero "fsimage" con el fichero de logs "edits").

También hay que decir que hay un cambio entre la versión CDH3 y la versión CDH4 de Hadoop.
En la versión CDH3, sigue siendo el Secondary NameNode que se encarga tan sólo de tareas de mantenimiento del NameNode.
En la versión CDH4, se introduce un nuevo concepto: la Alta Disponibilidad. Si esta está activa, el Secondary NameNode desaparece y pasa a ser el Standby NameNode.

DataNode

Los DataNode son los nodos encargados de almacenar los bloques de datos en el clúster Hadoop.
Almacenan los bloques e informan periódicamente al NameNode las listas de bloques que están almacenando.

Cuando un cliente quiere leer un fichero, primero se comunica con el metadata del NameNode quien le dice dónde están y cómo se distribuye el fichero a leer.
Luego el cliente pasaría a conectarse directamente con los DataNode correspondientes sin que los datos pasen por el NameNode.

JobTracker

Se encarga de gestionar todo el Job (uno o varios) que ha solicitado un cliente.
Es decir, cuando recibe la petición, asigna tareas (task) Map y Reduce a los nodos del clúster.

También se encarga de saber cuáles son TaskTracker existentes. En caso de fallo de un TaskTracker, recupera las tareas map o reduce que ejecutaba ese TaskTracker y las asigna a otro.

Finalmente, gestiona el scheduling entre los distintos jobs MapReduce.

TaskTracker

Cada nodo ejecuta un demonio del TaskTracker, y es el responsable de crear instancias del Map o del Reduce  y de reportar la situación del proceso al JobTracker.

Para disfrutar de la localización de los datos, es imprescidible que el TaskTracker esté instalado en cada nodo que contenga un DataNode.


Por último vamos a ver que existen tres modos de ejecutar un cluster Hadoop:

  • Local: No se usa HDFS, sino el sistema de ficheros nativo del ordenador. Todo Hadoop se ejecuta en un único ordenador. Tampoco existe ningún demonio MapReduce. Cuando un cliente ejecuta un Job MapReduce en este modo el programa cliente "contiene" el demonio JobTracker y TasckTracker.
  • Pseudo-Distribuído: En una máquina tienes los cinco demonios funcionando.
  • Distribuído: Dispones de un clúster de dos o más nodos con los demonios separados en Master Nodes y Slave Nodes y funcionando en paralelo.

sábado, 26 de enero de 2013

Hadoop Distributed File System (HDFS)


El HDFS es el sistema de archivos distribuído de Hadoop.

Los datos se dividen en bloques de 64MB o 128MB, estos bloques se replican en un número configurable de veces (por defecto 3) y se distribuyen en distintos nodos del clúster. Esto es porque HDFS supone que algún componente del sistema puede fallar, así que siempre va a tener disponible alguna de las copias. Por otro lado, tienen que existir tantos bloques como hayamos configurado, por eso, si HDFS detecta que un componente no funciona, se encargará de saber qué bloques estaban en ese componente y replicarlos por el resto del clúster.

Es importante saber que cuando un fichero se divide en bloques, si un bloque queda "incompleto" (respecto del tamaño del bloque establecido), ese espacio nunca se va a rellenar ni utilizar con datos de otro fichero.

Este sistema de archivos está más pensado para tener un número de ficheros razonable (no más de algunos millones) de un tamaño bastante grande (del orden del tamaño de bloque HDFS o más grande aún). Es decir, con HDFS es mejor tener menos ficheros, pero más grandes (un ítem de metadatos ocupa entre 150 y 200 bytes de memoria RAM en el NameNode).
También es más óptimo para lecturas en streaming.

Está escrito en Java. Y los bloques de datos HDFS se escriben en ficheros (dentro del sistema de archivos estándar del servidor, por ejemplo ext4) en unos directorios específicos de los nodos esclavos (se encargaría el administrador) sólo una vez y no se les puede añadir más información.

Para que HDFS pueda ser un sistema de archivos distribuído, se ha de separar los datos de los metadatos. Para esto utiliza dos tipos de nodos: el NameNode y los DataNodes de los que ya hablaré más detalle en otra entrada.
Resumidamente el NameNode contiene los metadatos, es quien se encarga de dividir y distribuir los bloques por los DataNodes del clúster. Cuando un cliente quiere hacer una lectura de los datos, primero va a preguntar al NameNode, que es quien sabe dónde está cada bloque, y luego va a leer los datos bloque a bloque y en cada uno de los nodos que los contienen.

Características:
  • Almacenamiento redundante
  • Tolerancia a fallos
  • Mecanismo de checksums
  • Política de una escritura (write-once), preparado para muchas lecturas.
  • Escalabilidad horizontal
  • No implementa POSIX
Bloques HDFS

Un cliente puede acceder a HDFS a través de un terminal utilizando los comandos propios de HDFS o vía la API de Java a través de una aplicación.
Y aunque yo o mucha gente que lea el blog sean perfiles de desarrollo, es muy importante saber moverse y tener unos conocimientos básicos de cómo funciona este sistema de ficheros a través del terminal.
Para acceder a través de la línea de comandos se hace con el comando:

     $ hadoop fs

Por sí solo este comando no hace nada (sólo muestra la ayuda), luego existen múltiples comandos que ayudarán al usuario a hacer ciertas interacciones.

Cuando accedemos a través del terminal hay que tener muy claro cuál es nuestro sistema de ficheros tradicional y cuándo estamos en HDFS.

Algunos de estos comandos asociados a hadoop son:
  • Comando encargado de copiar el archivo ficheroLocal.txt que se encuentra en el directorio actual de mi sistema de ficheros local a HDFS, el archivo se llamará ficheroHDFS.txt, pero se encontrará en el directorio HDFS configurado por nuestro administrador (nuestra carpeta de usuario en HDFS):
          $ hadoop fs -put ficheroLocal.txt ficheroHDFS.txt
  • Ahora el comando encargado de copiar un fichero desde HDFS a nuestro sistema de ficheros local:
                    $ hadoop fs -get ficheroHDFS.txt ficheroLocal.txt
  • Si queremos listar el contenido de nuestro directorio HDFS (nuestra carpeta de usuario):
          $ hadoop fs -ls
  • Para mostrar el contenido de un fichero en nuestra carpeta de usuario HDFS:
                    $ hadoop fs -cat ficheroHDFS.txt
  • Queremos crear un directorio en HDFS (en nuestra carpeta de usuario)
                    $ hadoop fs -mkdir miDirectorio
  • Queremos borrar un directorio y todo su contenido:
                    $ hadoop fs -rm -r miDirectorio


Sobre cómo acceder con la API de Java, sería a través del FileSystem API, pero es una parte extensa y ya lo mostraré en otra entrada más adelante.

martes, 22 de enero de 2013

Desventajas de Hadoop


Como hemos visto en entradas anteriores, Hadoop es una tecnología que mejora considerablemente respecto de los típicos sistemas distribuídos.
Eso no quita que tenga una serie de desventajas y que si en algún momento estamos estudiando si usarlo o no, hay que tenerlas en cuenta.

En lo que respecta al HDFS:
  • Latencia para el acceso a datos: HDFS está orientado a procesos batch y operaciones en streaming. Por lo tanto, la latencia de cualquier operación IO no ha sido optimizada y sistemas de archivos tradicionales (como ext4, XFS...) suelen ser más rápidos en estos aspectos.
  • Cantidades grandes de ficheros pequeños: El límite del número de ficheros en este sistema está limitado por la memoria del NameNode, que es en su RAM donde se encuentran los metadata. Cada fichero, directorio y bloque ocupa un tamaño de entre 150 y 200 bytes en los metadata, lo que quiere decir, que si hay millones de ficheros pequeños va a ocupar mucho más espacio en la RAM que si tenemos menos cantidad de ficheros de gran tamaño (recomendable 100 MB o más).
  • Escribe una vez, lee varias: En HDFS los ficheros solo se pueden escribir una vez (aunque HDFS se ha mejorado con el modo "append", Cloudera no suele recomendarlo porque no lo considera estable).
  • No se puede acceder con los comandos tradicionales de Linux (ls, cat, vim...). Esto complica mucho la integración con otras herramientas comerciales (como sistemas de backup, por ejemplo). Y aunque exista "HDFS fuse" para montar HDFS como cualquier otro sistema de archivo Linux, esta solución no ofrece un buen rendimiento.

En lo que respecta a MapReduce:
  • Es muy difícil de depurar: Al procesarse el programa en los nodos donde se encuentran los bloques de datos, no es fácil encontrar los fallos de código. Tampoco es conveniente utilizar funciones de escritura de logs en el código ya que eso podría suponer un gran aumento en la ejecución de procesos MapReduce.
  • No todos los algoritmos se pueden escribir con el paradigma MapReduce. Por ejemplo, el famoso algoritmo de grafos Dijkstra, al necesitar un procesamiento secuencial, no se puede escribir en MapReduce.
  • Latencia: cualquier job MapReduce suele tardar por lo menos 10 segundos. Por lo tanto, si el volumen de información a tratar es pequeño, es posible que Hadoop no sea la solución más rápida.

jueves, 17 de enero de 2013

MapReduce: Ejemplo teórico

Nada mejor que un buen ejemplo para entender la teoría vista en la entrada anterior (MapReduce)

Imaginemos que tenemos un fichero score.txt con las puntuaciones de un salón de juego.
La estructura es sencilla: fecha del juego, nombre jugador y puntuación:

01-11-2012 Maria 11
01-11-2012 Pablo 9
01-11-2012 Angel 3
23-11-2012 Pablo 22
23-11-2012 Maria 15
15-12-2012 Pablo 32
15-12-2012 Maria 47
15-12-2012 Angel 13
01-01-2013 Pablo 2
01-01-2013 Maria 3
01-01-2013 Angel 32


A partir de estos datos queremos sacar la suma total de puntuaciones de cada usuario.

Las siguientes líneas van a representar los pares (key, value) que recibe el map teniendo en cuenta que las key son el offset de la línea (en bytes), y los value son la línea de texto entera.:

(0, 01-11-2012 Maria 11)
(22, 01-11-2012 Pablo 9)
(45, 01-11-2012 Angel 3)
(65, 23-11-2012 Pablo 22)
(86, 23-11-2012 Maria 15)
(110, 15-12-2012 Pablo 32)
(134, 15-12-2012 Maria 47)
(158, 15-12-2012 Angel 13)
(182, 01-01-2013 Pablo 2)
(205, 01-01-2013 Maria 3)
(225, 01-01-2013 Angel 32)


La función map lo que hará entonces será tratar cada línea recogiendo sólo la información que nos interesa y omitiendo el resto, para finalmente emitir para cada línea un par key/value con el nombre del jugador como key y la puntuación como value:

(Maria, 11)
(Pablo, 9)
(Angel, 3)
(Pablo, 22)
(Maria, 15)
(Pablo, 32)
(Maria, 47)
(Angel, 13)
(Pablo, 2)
(Maria, 3)
(Angel, 32)

Esta salida del map va a ser tratada a continuación por el Shuffle and Short antes de enviar los datos a la función reduce. En esta fase las key se van a ordenar y los value se van a agrupar por cada key, y el resultado será la entrada de la función reduce, quedando de la siguiente forma:

(Angel, [3, 13, 32])
(Maria, [11, 15, 47, 3])
(Pablo, [9, 22, 32, 2])

Así que la función reduce va a recibir como entrada cada una de estas líneas, siendo una key y por cada key una lista de value. Por cada línea el reduce emitirá la suma de las puntuaciones de cada usuario:

(Angel, 48)
(Maria, 76)
(Pablo, 65)


Flujo de Ejemplo de MapReduce



martes, 15 de enero de 2013

MapReduce

MapReduce es un entorno de desarrollo para el procesado de datos.
Se caracteriza por ser capaz de trabajar con grandes cantidades de datos en paralelo dentro de sistemas distribuídos encargándose de distribuir las tareas por diversos nodos del clúster.
Hay que tener en cuenta que no todos los problemas pueden ser solucionados con este framework. Por regla general se utiliza para abordar problemas de grandes cantidades de datos de hasta petabytes o exabytes de tamaño. Por esa razón MapReduce suele ejecutarse sobre el sistema de ficheros HDFS (otros sistemas de ficheros distribuídos son posibles aunque no son recomendados porque se perdería la "localización de los datos").

Sus principales características son:
  • Distribución y paralelización automáticas
  • Tolerancia a fallos
  • Disponer de herramientas de monitorización
  • Su funcionamiento interno y mantenimiento es transparente para los desarrolladores que sólo se preocupan de escribir los algoritmos, normalmente en Java. Es decir, los desarrolladores sólo tienen que programar la lógica de negocio del algoritmo y no tienen que perder el tiempo gestionando errores o parámetros de la computación distribuída.
  • Escalabilidad horizontal: Si se necesita más potencia de computación, basta con añadir más nodos en el clúster.
  • Localización de los datos: Se desplaza el algoritmo a los datos (y no lo contrario, como suele ser en sistemas distribuídos tradicionales).
Funcionamiento de MapReduce


Su ejecución consta de dos fases principales: Map y Reduce, que programa el desarrollador. Y una fase "interna" Shuffle and Sort que permite vincular las dos fases anteriores.

La fase Map

La fase Map está en contacto directo con los ficheros de entrada del programa. Consta de un método (Java) llamado map que recibe como parámetros un par de key/value (llave/valor) por cada línea de los ficheros de entrada.
Se encarga del tratamiento de cada par key/value recibido y finalmente emite cero, o más pares key/value en cada llamada.

Al escribir el método map el desarrollador puede elegir si usar o ignorar la key de entrada, ya que normalmente se trata del valor de offset de la línea.
Los pares key/value de salida no tienen por qué ser necesariamente del mismo tipo que el par de entrada.

map (input_key, input_value) -> (output_key, output_value) list


Shuffle and Sort

Una vez finalizado el Mapper, los datos intermedios se envían a través de la red para continuar con las siguientes fases.
Entre el Map y el Reduce existe una fase intermedia transparente al cliente/desarrollador llamada Shuffle and Sort.

Como su nombre indica, este proceso se encarga de ordenar por key todos los resultados emitidos por el Mapper y recoger todos los valores intermedios pertenecientes a una key y combinarlos en una lista asociada a esa key.
La lista de keys intermedias y sus valores se envía a los procesos Reducer (que puede ser uno o múltiples según la configuración realizada).

En esta fase Shuffle and Sort, todos los datos intermedios generados por el Mapper se tienen que mandar a través de la red a los Reducers, por lo tanto la red se puede transformar en un cuello de botella para los procesos MapReduce. Por eso suele ser importante intentar reducir el tamaño de esos datos intermedios a través de técnicas como compresión (Snappy), Combiner o "in-mapper combining".

La fase Reducer

La fase Reducer sólo se ejecuta a partir del momento en el que la fase del Mapper ha finalizado por completo.
Los datos intermedios generados se envían a través de la red y se escriben en disco local para que el Reducer pueda realizar su tarea. Una vez finalizado el proceso, estos datos se eliminarán.

La función reducer recibe una key y por cada key una lista de valores asociados, tras realizar las operaciones deseadas emitirá uno o más pares key/value que no tienen por qué ser del mismo tipo que los de entrada

reducer (input_key, (input_values)iterator) -> (output_key, output_value) list

Hay ciertos algoritmos donde es posible hacer trabajos "map-only" es decir, sin realizar el Reducer. Esto es muy sencillo de hacer simplemente indicando en la configuración que deseamos 0 tareas Reducer. La ventaja de un trabajo "map-only" es que sólo ejecutamos una de las tres fases vistas hasta ahora y por tanto el algoritmo se suele ejecutar más rápido.

Algunas situaciones en las que deseamos tener sólo las tareas map pueden ser para el procesado de imágenes o la conversión de formatos de fichero.


En esta otra entrada podréis ver un ejemplo teórico del MapReduce para apoyar toda esta teoría.

viernes, 11 de enero de 2013

Introducción a Hadoop

En esta (mi primera) entrada voy a realizar un pequeño resumen de qué es Hadoop y sus componentes. Quizás muchos de los que lleguéis hasta aquí ya sabéis cuál es la historia de Hadoop, Doug Cutting y el peluche en forma de elefante. Pero creo que un blog hay que empezarlo por donde se debe, el principio, y al fin y al cabo el principio de Hadoop es entender de dónde viene, sus conceptos básicos, sus componentes, sus características y el por qué de su existencia.

¿Qué es Hadoop?

Hadoop es una tecnología open source perteneciente a la Apache Software Fundation inspirado en el sistema de ficheros de Google (GFS) y en el modelo de programación MapReduce.
De las tres empresas (HortonWorks, MapR y Cloudera) que se dedican a Hadoop, Cloudera es la referencia a nivel mundial en soluciones BigData Hadoop

Ha llegado un momento en el que muchas organizaciones tienen que tratar diariamente enormes cantidades de datos llegando a terabytes, petabytes e incluso a exabytes, para lo cual los sistemas tradicionales de computación a gran escala (sean sistemas monolíticos o sistemas distribuídos tipo MPI) se están quedando cortos y empiezan a sufrir varios cuellos de botella.

Hadoop ofrece un cambio radical en la computación distribuída y se le considera como una estrategia completamente nueva a la hora de gestionar y tratar todos estos datos. Además ofrece otras ventajas como la tolerancia a fallos, la recuperabilidad de los datos y de los componentes, la consistencia y la escalabilidad.

Su concepto base se puede resumir en que los datos no van al programa, sino que el programa va a los datos. Gracias a esta premisa el tiempo de computación es considerablemente menor.

Hadoop se compone de dos partes principales:
  • El Hadoop Distributed File System (HDFS): Se trata de un sistema de ficheros responsable de almacenar los datos en el clúster dividiéndolos en bloques y distribuyéndolos a través de los nodos del clúster.
  • MapReduce: Es un modelo de programación encargado de distribuir una tarea por diversos nodos del clúster y procesar los datos que se encuentran en esos nodos. Consta de dos fases: Map y Reduce.
En otras entradas explicaré más detalladamente cada una de estas partes.

Ventajas que ofrece Hadoop respecto a los sistemas distribuídos tradicionales:
  • Tolerancia a fallos parciales: Si falla un elemento del sistema (un nodo) Hadoop es capaz de recuperarse mediante distintas técnicas.
  • Recuperabilidad de los datos: Si un componente del sistema falla el resultado no será la pérdida de ningún dato, el sistema es capaz de recuperarlo.
  • Recuperación de componentes: Si un componente del sistema falla y luego se recupera, tendrá que ser capar de volver a unirse al sistema sin tener que reiniciar todo el sistema.
  • Consistencia: Si un componente falla durante la ejecución de un trabajo, Hadoop tiene medios para que este fallo no afecte a la salida de la ejecución.
  • Escalabilidad: Añadir nuevos componentes es transparente al resto del sistema y estos componentes deben unirse al cluster sin ningún tipo de problema.
  • Otra ventaja puede considerarse el bajo coste: Está preparado para ejecutarse en clústeres de ordenadores que no requieren una potencia demasiado alta. También teniendo en cuenta que Hadoop es una tecnología Open Source.
Finalmente, se pueden considerar los conceptos básicos de Hadoop los siguientes:
  • Las aplicaciones se escriben en lenguajes de alto nivel (Java)
  • Su política es de comunicación prácticamente nula entre nodos.
  • Los datos se extienden por el sistema al principio dividiéndose en bloques de 64Mb o 128Mb.
  • Las tareas Map trabajan sobre cantidades pequeñas de datos, normalmente un bloque.
  • Cuando un cliente hace una petición de lectura, las tareas Map se ejecutan en los nodos donde se encuentran los bloques.
  • Si un nodo falla, el máster lo detecta y reasigna el trabajo a otro nodo.
  • Reiniciar una tarea no afecta al resto de tareas que se están ejecutando sobre otros nodos.
  • Si se reinicia un nodo fallido, éste se unirá automáticamente al sistema y empezará a asignársele tareas.
  • Si un nodo es lento, el máster puede ejecutar otra instancia de la misma tarea.