Recuerda que puedes descargarte algunos de los ejemplos en la pestaña de Código Fuente
Mostrando entradas con la etiqueta Conceptos Básicos. Mostrar todas las entradas
Mostrando entradas con la etiqueta Conceptos Básicos. Mostrar todas las entradas

martes, 26 de febrero de 2013

Hadoop: Introducción al desarrollo en Java (Parte I)

En esta entrada voy a explicar una introducción al desarrollo de un programa MapReduce en Java.

Este blog lo estoy desarrollando a partir de la versión 1.0.4. Esta versión permite usar tanto la nueva API de Hadoop como la vieja API (me referiré a ellas como "new API" -releases 1.x- y "old API" -releases 0.20-x). En esta entrada la voy a aprovechar para mostrar las diferencias entre estas dos APIs pero las próximas entradas y desarrollos ya estarán hechas únicamente con la nueva API.

Para explicar una introducción al desarrollo de un programa MapReduce utilizaré el "Hola Mundo" de Hadoop, que es el "Word Count", básicamente se coge un fichero, se cuenta las veces que aparece una palabra y la salida será un listado de palabras (keys) con el número de veces que aparece (values).

Para desarrollar este tipo de programas necesitamos básicamente tres clases:
También antes de empezar a explicar código, hay que tener en cuenta cuáles son los tipos de datos que vamos a utilizar, ya que las clases hay que definirlas con los tipos de entrada y de salida.

En el WordCount el Mapper va a recibir como key elementos de tipo numérico ya que, como ya he explicado en otros artículos, normalmente va a ser el offset del fichero y ese valor no lo vamos a utilizar.
El value que recibe el map es cada una de las líneas del fichero, es decir, valores de tipo texto.
El map va a retornar una lista de pares key/value, donde la key es la palabra localizada (de tipo texto) y el value es un número (en este caso siempre va a devolver un 1 por cada palabra que encuentre).

El Reducer va a recibir una key que será la palabra (dada en el Mapper) de tipo texto y como valores una lista de números. Y finalmente emitirá la palabra que estamos contanto de tipo texto como key, y como value el resultado de las veces que aparece esa palabra, es decir, de tipo número.

En la clase map y en la clase reduce los tipos de entrada no tienen por qué ser los mismos que los de salida. Pero es muy importante tener en cuenta los tipos de la salida del map deben ser los mismos que los de entrada del reduce.

Con el fin de que este artículo no quede demasiado largo, lo he dividido en varios artículos e iré explicando el código escribiendo los comentarios correspondientes.

Para ejecutar y probar el programa en Eclipse podéis leer este artículo.

Continuar con:
Introducción al desarrollo en Java (Parte II): El Mapper (Ejemplo Word Count)
Introducción al desarrollo en Java (Parte III): El Reducer (Ejemplo Word Count)
Introducción al desarrollo en Java (Parte IV): El Driver (Ejemplo Word Count)
Introducción al desarrollo en Java (Parte V): Métodos setup() y cleanup()

sábado, 26 de enero de 2013

Hadoop Distributed File System (HDFS)


El HDFS es el sistema de archivos distribuído de Hadoop.

Los datos se dividen en bloques de 64MB o 128MB, estos bloques se replican en un número configurable de veces (por defecto 3) y se distribuyen en distintos nodos del clúster. Esto es porque HDFS supone que algún componente del sistema puede fallar, así que siempre va a tener disponible alguna de las copias. Por otro lado, tienen que existir tantos bloques como hayamos configurado, por eso, si HDFS detecta que un componente no funciona, se encargará de saber qué bloques estaban en ese componente y replicarlos por el resto del clúster.

Es importante saber que cuando un fichero se divide en bloques, si un bloque queda "incompleto" (respecto del tamaño del bloque establecido), ese espacio nunca se va a rellenar ni utilizar con datos de otro fichero.

Este sistema de archivos está más pensado para tener un número de ficheros razonable (no más de algunos millones) de un tamaño bastante grande (del orden del tamaño de bloque HDFS o más grande aún). Es decir, con HDFS es mejor tener menos ficheros, pero más grandes (un ítem de metadatos ocupa entre 150 y 200 bytes de memoria RAM en el NameNode).
También es más óptimo para lecturas en streaming.

Está escrito en Java. Y los bloques de datos HDFS se escriben en ficheros (dentro del sistema de archivos estándar del servidor, por ejemplo ext4) en unos directorios específicos de los nodos esclavos (se encargaría el administrador) sólo una vez y no se les puede añadir más información.

Para que HDFS pueda ser un sistema de archivos distribuído, se ha de separar los datos de los metadatos. Para esto utiliza dos tipos de nodos: el NameNode y los DataNodes de los que ya hablaré más detalle en otra entrada.
Resumidamente el NameNode contiene los metadatos, es quien se encarga de dividir y distribuir los bloques por los DataNodes del clúster. Cuando un cliente quiere hacer una lectura de los datos, primero va a preguntar al NameNode, que es quien sabe dónde está cada bloque, y luego va a leer los datos bloque a bloque y en cada uno de los nodos que los contienen.

Características:
  • Almacenamiento redundante
  • Tolerancia a fallos
  • Mecanismo de checksums
  • Política de una escritura (write-once), preparado para muchas lecturas.
  • Escalabilidad horizontal
  • No implementa POSIX
Bloques HDFS

Un cliente puede acceder a HDFS a través de un terminal utilizando los comandos propios de HDFS o vía la API de Java a través de una aplicación.
Y aunque yo o mucha gente que lea el blog sean perfiles de desarrollo, es muy importante saber moverse y tener unos conocimientos básicos de cómo funciona este sistema de ficheros a través del terminal.
Para acceder a través de la línea de comandos se hace con el comando:

     $ hadoop fs

Por sí solo este comando no hace nada (sólo muestra la ayuda), luego existen múltiples comandos que ayudarán al usuario a hacer ciertas interacciones.

Cuando accedemos a través del terminal hay que tener muy claro cuál es nuestro sistema de ficheros tradicional y cuándo estamos en HDFS.

Algunos de estos comandos asociados a hadoop son:
  • Comando encargado de copiar el archivo ficheroLocal.txt que se encuentra en el directorio actual de mi sistema de ficheros local a HDFS, el archivo se llamará ficheroHDFS.txt, pero se encontrará en el directorio HDFS configurado por nuestro administrador (nuestra carpeta de usuario en HDFS):
          $ hadoop fs -put ficheroLocal.txt ficheroHDFS.txt
  • Ahora el comando encargado de copiar un fichero desde HDFS a nuestro sistema de ficheros local:
                    $ hadoop fs -get ficheroHDFS.txt ficheroLocal.txt
  • Si queremos listar el contenido de nuestro directorio HDFS (nuestra carpeta de usuario):
          $ hadoop fs -ls
  • Para mostrar el contenido de un fichero en nuestra carpeta de usuario HDFS:
                    $ hadoop fs -cat ficheroHDFS.txt
  • Queremos crear un directorio en HDFS (en nuestra carpeta de usuario)
                    $ hadoop fs -mkdir miDirectorio
  • Queremos borrar un directorio y todo su contenido:
                    $ hadoop fs -rm -r miDirectorio


Sobre cómo acceder con la API de Java, sería a través del FileSystem API, pero es una parte extensa y ya lo mostraré en otra entrada más adelante.

martes, 15 de enero de 2013

MapReduce

MapReduce es un entorno de desarrollo para el procesado de datos.
Se caracteriza por ser capaz de trabajar con grandes cantidades de datos en paralelo dentro de sistemas distribuídos encargándose de distribuir las tareas por diversos nodos del clúster.
Hay que tener en cuenta que no todos los problemas pueden ser solucionados con este framework. Por regla general se utiliza para abordar problemas de grandes cantidades de datos de hasta petabytes o exabytes de tamaño. Por esa razón MapReduce suele ejecutarse sobre el sistema de ficheros HDFS (otros sistemas de ficheros distribuídos son posibles aunque no son recomendados porque se perdería la "localización de los datos").

Sus principales características son:
  • Distribución y paralelización automáticas
  • Tolerancia a fallos
  • Disponer de herramientas de monitorización
  • Su funcionamiento interno y mantenimiento es transparente para los desarrolladores que sólo se preocupan de escribir los algoritmos, normalmente en Java. Es decir, los desarrolladores sólo tienen que programar la lógica de negocio del algoritmo y no tienen que perder el tiempo gestionando errores o parámetros de la computación distribuída.
  • Escalabilidad horizontal: Si se necesita más potencia de computación, basta con añadir más nodos en el clúster.
  • Localización de los datos: Se desplaza el algoritmo a los datos (y no lo contrario, como suele ser en sistemas distribuídos tradicionales).
Funcionamiento de MapReduce


Su ejecución consta de dos fases principales: Map y Reduce, que programa el desarrollador. Y una fase "interna" Shuffle and Sort que permite vincular las dos fases anteriores.

La fase Map

La fase Map está en contacto directo con los ficheros de entrada del programa. Consta de un método (Java) llamado map que recibe como parámetros un par de key/value (llave/valor) por cada línea de los ficheros de entrada.
Se encarga del tratamiento de cada par key/value recibido y finalmente emite cero, o más pares key/value en cada llamada.

Al escribir el método map el desarrollador puede elegir si usar o ignorar la key de entrada, ya que normalmente se trata del valor de offset de la línea.
Los pares key/value de salida no tienen por qué ser necesariamente del mismo tipo que el par de entrada.

map (input_key, input_value) -> (output_key, output_value) list


Shuffle and Sort

Una vez finalizado el Mapper, los datos intermedios se envían a través de la red para continuar con las siguientes fases.
Entre el Map y el Reduce existe una fase intermedia transparente al cliente/desarrollador llamada Shuffle and Sort.

Como su nombre indica, este proceso se encarga de ordenar por key todos los resultados emitidos por el Mapper y recoger todos los valores intermedios pertenecientes a una key y combinarlos en una lista asociada a esa key.
La lista de keys intermedias y sus valores se envía a los procesos Reducer (que puede ser uno o múltiples según la configuración realizada).

En esta fase Shuffle and Sort, todos los datos intermedios generados por el Mapper se tienen que mandar a través de la red a los Reducers, por lo tanto la red se puede transformar en un cuello de botella para los procesos MapReduce. Por eso suele ser importante intentar reducir el tamaño de esos datos intermedios a través de técnicas como compresión (Snappy), Combiner o "in-mapper combining".

La fase Reducer

La fase Reducer sólo se ejecuta a partir del momento en el que la fase del Mapper ha finalizado por completo.
Los datos intermedios generados se envían a través de la red y se escriben en disco local para que el Reducer pueda realizar su tarea. Una vez finalizado el proceso, estos datos se eliminarán.

La función reducer recibe una key y por cada key una lista de valores asociados, tras realizar las operaciones deseadas emitirá uno o más pares key/value que no tienen por qué ser del mismo tipo que los de entrada

reducer (input_key, (input_values)iterator) -> (output_key, output_value) list

Hay ciertos algoritmos donde es posible hacer trabajos "map-only" es decir, sin realizar el Reducer. Esto es muy sencillo de hacer simplemente indicando en la configuración que deseamos 0 tareas Reducer. La ventaja de un trabajo "map-only" es que sólo ejecutamos una de las tres fases vistas hasta ahora y por tanto el algoritmo se suele ejecutar más rápido.

Algunas situaciones en las que deseamos tener sólo las tareas map pueden ser para el procesado de imágenes o la conversión de formatos de fichero.


En esta otra entrada podréis ver un ejemplo teórico del MapReduce para apoyar toda esta teoría.

viernes, 11 de enero de 2013

Introducción a Hadoop

En esta (mi primera) entrada voy a realizar un pequeño resumen de qué es Hadoop y sus componentes. Quizás muchos de los que lleguéis hasta aquí ya sabéis cuál es la historia de Hadoop, Doug Cutting y el peluche en forma de elefante. Pero creo que un blog hay que empezarlo por donde se debe, el principio, y al fin y al cabo el principio de Hadoop es entender de dónde viene, sus conceptos básicos, sus componentes, sus características y el por qué de su existencia.

¿Qué es Hadoop?

Hadoop es una tecnología open source perteneciente a la Apache Software Fundation inspirado en el sistema de ficheros de Google (GFS) y en el modelo de programación MapReduce.
De las tres empresas (HortonWorks, MapR y Cloudera) que se dedican a Hadoop, Cloudera es la referencia a nivel mundial en soluciones BigData Hadoop

Ha llegado un momento en el que muchas organizaciones tienen que tratar diariamente enormes cantidades de datos llegando a terabytes, petabytes e incluso a exabytes, para lo cual los sistemas tradicionales de computación a gran escala (sean sistemas monolíticos o sistemas distribuídos tipo MPI) se están quedando cortos y empiezan a sufrir varios cuellos de botella.

Hadoop ofrece un cambio radical en la computación distribuída y se le considera como una estrategia completamente nueva a la hora de gestionar y tratar todos estos datos. Además ofrece otras ventajas como la tolerancia a fallos, la recuperabilidad de los datos y de los componentes, la consistencia y la escalabilidad.

Su concepto base se puede resumir en que los datos no van al programa, sino que el programa va a los datos. Gracias a esta premisa el tiempo de computación es considerablemente menor.

Hadoop se compone de dos partes principales:
  • El Hadoop Distributed File System (HDFS): Se trata de un sistema de ficheros responsable de almacenar los datos en el clúster dividiéndolos en bloques y distribuyéndolos a través de los nodos del clúster.
  • MapReduce: Es un modelo de programación encargado de distribuir una tarea por diversos nodos del clúster y procesar los datos que se encuentran en esos nodos. Consta de dos fases: Map y Reduce.
En otras entradas explicaré más detalladamente cada una de estas partes.

Ventajas que ofrece Hadoop respecto a los sistemas distribuídos tradicionales:
  • Tolerancia a fallos parciales: Si falla un elemento del sistema (un nodo) Hadoop es capaz de recuperarse mediante distintas técnicas.
  • Recuperabilidad de los datos: Si un componente del sistema falla el resultado no será la pérdida de ningún dato, el sistema es capaz de recuperarlo.
  • Recuperación de componentes: Si un componente del sistema falla y luego se recupera, tendrá que ser capar de volver a unirse al sistema sin tener que reiniciar todo el sistema.
  • Consistencia: Si un componente falla durante la ejecución de un trabajo, Hadoop tiene medios para que este fallo no afecte a la salida de la ejecución.
  • Escalabilidad: Añadir nuevos componentes es transparente al resto del sistema y estos componentes deben unirse al cluster sin ningún tipo de problema.
  • Otra ventaja puede considerarse el bajo coste: Está preparado para ejecutarse en clústeres de ordenadores que no requieren una potencia demasiado alta. También teniendo en cuenta que Hadoop es una tecnología Open Source.
Finalmente, se pueden considerar los conceptos básicos de Hadoop los siguientes:
  • Las aplicaciones se escriben en lenguajes de alto nivel (Java)
  • Su política es de comunicación prácticamente nula entre nodos.
  • Los datos se extienden por el sistema al principio dividiéndose en bloques de 64Mb o 128Mb.
  • Las tareas Map trabajan sobre cantidades pequeñas de datos, normalmente un bloque.
  • Cuando un cliente hace una petición de lectura, las tareas Map se ejecutan en los nodos donde se encuentran los bloques.
  • Si un nodo falla, el máster lo detecta y reasigna el trabajo a otro nodo.
  • Reiniciar una tarea no afecta al resto de tareas que se están ejecutando sobre otros nodos.
  • Si se reinicia un nodo fallido, éste se unirá automáticamente al sistema y empezará a asignársele tareas.
  • Si un nodo es lento, el máster puede ejecutar otra instancia de la misma tarea.