Recuerda que puedes descargarte algunos de los ejemplos en la pestaña de Código Fuente
Mostrando entradas con la etiqueta Terminología. Mostrar todas las entradas
Mostrando entradas con la etiqueta Terminología. Mostrar todas las entradas

miércoles, 8 de mayo de 2013

Uso del Combiner para optimización de Jobs MapReduce

El Combiner es una función propia de Hadoop utilizada para optimizar los job MapReduce.

Cuando en un Job la salida del Mapper genera una gran cantidad de datos intermedios, éstos se tienen que transmitir por la red hacia los Reducer. Si la cantidad de datos es excesivamente grande aquí se puede producir un cuello de botella.

Las técnicas utilizadas para reducir la cantidad de datos y mejorar la eficiencia de los job MapReduce, se llama Local Aggregation.
Existen dos técnicas, el Combiner que vamos a ver en esta entrada y el In-Mapper Combining que veremos en la siguiente entrada.

Una buena solución para estos casos es la de implementar un Combiner, que se ejecuta a la salida de la fase Map y de forma local a este antes de enviar los datos a través de la red.

El Combiner implementa la misma interfaz que el Reducer e incluso muchas veces suele ser la misma clase que el Reducer.
Sin embargo hay que tener cuidado que la operación que se realiza en el Combiner sea asociativa y conmutativa. Por ejemplo la operación suma (es decir, el ejemplo del Wordcount) cumple con estos dos requisitos, pero por ejemplo la operación de cálculo de la media aritmética no es asociativa.

Para configurarlo se hace en el Driver a través de:
  job.setCombinerClass (MyCombiner.class);

¿Se puede implementar un Combiner distinto al Reducer?
Sí, pueden ser distintos. Hay que tener en cuenta que el Combiner va a seguir implementando la interfaz Reducer. Y también hay que tener cuidado de no poner dentro algún tipo de código "sensible" ya que el Combiner se puede ejecutar cero, una o más veces a la salida desde cualquier Mapper.

Si observamos los logs generados tras la ejecución de un Job del WordCount cuyo código he dejado publicado en este enlace.

Sin Combiner:

Map input records=1928
Map output records=187029
Combine input records=0
Combine output records=0
Reduce input records=187029
Reduce output records=22948


Con Combiner:

Map input records=1928
Map output records=187029
Combine input records=219738
Combine output records=55657
Reduce input records=22948
Reduce output records=22948   



Como he comentado anteriormente el Combiner se ejecuta a la salida del Mapper, antes de que los datos se transmitan por la red hacia el Reducer, la conclusión es que la cantidad de datos transmitida y por tanto el input del Reducer, es considerablemente menor si utilizamos ese Combiner.

Algo a tener en cuenta en el código publicado, es que he hecho un Combiner del WordCount, el código que contiene la clase WordCountCombiner.java es exactamente igual que el código de la clase WordCountReducer.java, en situaciones como esta no haría falta crear esa nueva clase, bastaría con definir en el Driver:

  job.setCombinerClass (WordCountReducer.class);

Así que creé una clase por separado para mostrar que es posible hacer el Combiner en una clase distinta (extendiendo de Reducer), recordando que no debe haber "código sensible" en ella ya que el Combiner puede llegar a ejecutarse varias veces o puede llegar a no ejecutarse.

domingo, 28 de abril de 2013

Speculative Execution

La meta del modelo MapReduce es dividir trabajos en tareas más pequeñas y ejecutarlas en paralelo para que el tiempo de ejecución total del trabajo sea menor que si se ejecutaran de forma secuencial.

Muchas veces, para grandes cantidades de datos, el número de divisiones es mayor que el número de nodos en el cluster, así que aunque las tareas se van ejecutan de forma paralela, otras tareas tienen que esperar a que las primeras finalicen para poder continuar.

Si por alguna causa una de las tareas se está ejecutando más lentamente de lo normal se podrían producir cuellos de botella y el tiempo dedicado para la ejecución total podría aumentar de forma considerable con respecto a si todo el clúster estuviera funcionando de manera correcta.

Hadoop no se va a encargar de buscar la causa de qué es lo que está retardando una tarea, sino que va a buscar soluciones para mitigar esto, y la Speculative Execution es la técnica que utiliza.

En el momento que JobTracker detecta que una tarea es más lenta de lo normal, va a lanzar una tarea especulativa (speculative task), pero sólo en el momento en el que el resto de las tareas del trabajo han finalizado, que la tarea haya estado funcionando al menos un minuto o que el tiempo medio de ejecución de esa tarea sea más alto que el del resto de tareas. Esto es, va a lanzar un duplicado de la tarea que está funcionando lentamente y las tareas se van a estar ejecutando al mismo tiempo.

Cuando una de las tareas finaliza su ejecución, el JobTracker se encargará de desechar el otro proceso (hace un kill de la tarea).

Si el defecto en la tarea se trata de un bug en el programa, éste va a suceder también en la tarea especulativa, así que tendría que ser el desarrollador quien corrigiera ese bug.

La Speculative Execution está activada por defecto, pero está permitido desactivarla en los ficheros de configuración.
En general es conveniente desactivar la speculative execution para las tareas Reduce porque, por un lado, siempre que se duplica una tarea Reduce ésta tiene que recuperar los datos intermedios de la red, lo que incrementaría el tráfico en la red.
Y por otro lado porque si la distribución de las key que reciben los Reducer no es equilibrada, es normal que una tarea Reduce tarde más que otra, por ejemplo, si pensamos en el WordCount, en inglés, la palabra "the" va a tener un array de valores mucho mayor que otras key, por lo que será normal que esta tarea Reduce tarde más.

Por el contrario, como regla general, se recomienda activar la Speculative Execution para las tareas Map. Una excepción podría ser para tareas que no son idempotentes




sábado, 13 de abril de 2013

Componentes de Hadoop: Partitioner

El partitioner te permite que las Key del mismo valor vayan al mismo Reducer, es decir, divide y distribuye el espacio de claves.

Hadoop tiene un Partitioner por defecto, el HashPartitioner, que a través de su método hashCode() determina a qué partición pertenece una determinada Key y por tanto a qué Reducer va a ser enviado el registro. El número de particiones es igual al número de tareas reduce del job.

A veces, por ciertas razones necesitamos implementar nuestro propio Partitioner para controlar que una serie de Keys vayan al mismo Reducer, es por esta razón que crearíamos nuestra propia clase MyPartitioner que heredaría de la interfaz Partitioner y que implementará el método getPartition.


public class  MyPartitioner<K2, V2> 
    extends Partitioner<KEY, VALUE> implements Configurable {
      public int getPartition(KEY key, VALUE value,  int  numPartitions){}  
}


getPartition recibe una key, un valor y el número de particiones en el que se deben dividir los datos cuyo rango está entre 0 y "numPartitions -1" y devolverá un número entre 0 y numPartitions indicando a qué partición pertenecen esos datos recibidos.

Para configurar un partitioner, basta con añadir en el Driver la línea:

job.setPartitionerClass(MyPartitioner.class);


También nos tenemos que acordar de configurar el número de Reducer al número de particiones que vamos a realizar:

job.setNumReduceTasks(numPartitions);


En la entrada siguiente mostraré un ejemplo concreto del Partitioner.

lunes, 18 de febrero de 2013

Hadoop Ecosystem

¿A qué llamamos Ecosistema Hadoop?
Desde que nació esta tecnología se han creado varios proyectos con distintas utilidades que nos pueden solucionar muchas cosas a la hora de trabajar con Hadoop.

Hadoop es la parte central del sistema y el ecosistema Hadoop es todo el conjunto de proyectos que ayudan a implementar esta solución.

Algunos de los proyectos del Ecosistema son:
  • Hive: Es un sistema de almacenamiento de datos (Data Warehouse) para Hadoop y basado en metadatos. Tiene un lenguaje parecido a SQL (HiveQL) y genera código MapReduce que funciona sobre Hadoop. En Facebook, más del 99% de los Jobs MapReduce se generan a partir de Hive.
  • Pig: Al igual que Hive, tiene un lenguaje propio (pero más tipo scripting) llamado PigLatin y que también se convierte en Jobs MapReduce que funcionan en el clúster. En este caso no tiene metadatos (por eso es más sencillo de instalar que Hive).
  • Impala: Permite queries en tiempo real sobre datos en HDFS. No usa MapReduce, usa sus propios demonios y luego ejecutándose en cada nodo esclavo. Su lenguaje es similar a HiveQL y tanto los datos como los metadatos que usa son los mismos que los de Hive.
  • Flume: Es un software para importar datos en tiempo real en un cluster Hadoop (sirve para la ETL: Extraction Transformation and Load). Ideal para recolectar logs de múltiples sistemas y almacenarlos a HDFS. Soporta una gran variedad de fuentes (como syslog, log4j, etc).
  • Chuckwa: Este proyecto realiza ETL de una forma parecida a Flume. También contiene un módulo de visualización de los mismos.
  • Sqoop: Es un proyecto desarrollado para importar datos desde bases de datos relacionales a HDFS y por lo tanto, es también un sistema de ETL. Se puede importar toda la BD, sólo algunas tablas o sólo partes de la tabla. También permite realizar la operación inversa: exportar datos en HDFS a una base de datos relacional.
  • Oozie: Se puede decir que es un motor de workflow utilizado para el encadenamiento de Jobs. Se ejecuta en un servidor fuera del clúster y se suele utilizar para workflows complejos, que sean lineales o gráfos. Puede encadenar Jobs MapReduce, Pig, Hive, etc.
  • HBase: Se podría decir que "es la Base de Datos NoSql Hadoop". Puede almacenar hasta petabytes de datos, está orientado a columnas (las relacionales a filas), permite miles de insert por segundo, las tablas pueden tener varios miles de columnas. Al contrario de Hive, su lenguaje de query no es intuitivo y no se parece a SQL.
  • Avro: Es un sistema para la serialización de datos que se usa como alternativa a los SquenceFiles. Tiene un formato propio de compresión, se puede exportar a otros lenguajes que no sean Java y tiene sus propios formatos para el Mapper y el Reducer (AvroMapper y AvroReducer).
  • ZooKeeper: Herramienta para la configuración y sincronización de aplicaciones distribuídas.

En otras entradas profundizaré más sobre algunos de estos proyectos.

sábado, 9 de febrero de 2013

Alta Disponibilidad

En la última versión de Hadoop, la CDH4, se ha puesto en marcha la Alta Disponibilidad.

Hasta ahora, el NameNode era un punto de fallo crítico (SPOF en inglés) de nuestro sistema Hadoop. Si el NameNode falla, todos los clientes, incluído los Job MapReduce se inutilizan.
El administrador tendría que encargarse de crear un nuevo NameNode a partir de una réplica de los metadata y configurar los clientes y DataNodes para utilizar este nuevo NameNode, más todas las tareas que hay por detrás hasta que el NameNode está disponible para funcionar.
Hasta que no se hace todo eso, el sistema se encuentra bloqueado.

Un tiempo de recuperación largo puede ser un grabe problema en ciertas situaciones.

Como hemos visto, en un cluster Hadoop disponemos de un NameNode y de un Secondary Namenode, pero si la Alta Disponibilidad está activada, en vez de disponer de estos dos elementos dispondríamos de un Active NameNode y de un Standby NameNode que estarían haciendo las mismas funciones de principal y secundario y el Standby realizando las tareas que hemos visto que ejerce el Secondary.
Pero en este caso, si el Active NameNode falla, el Standby NameNode se activa automáticamente (para esto se necesita tener un cluster Zookeeper) y empezaría a funcionar como el NameNode ya que el Standby NameNode siempre tiene en memoria la misma información de metadata que el NameNode activo (en caso de alta disponibilidad, los DataNodes envían sus informes tanto al NameNode activo como al Standby).

sábado, 26 de enero de 2013

Hadoop Distributed File System (HDFS)


El HDFS es el sistema de archivos distribuído de Hadoop.

Los datos se dividen en bloques de 64MB o 128MB, estos bloques se replican en un número configurable de veces (por defecto 3) y se distribuyen en distintos nodos del clúster. Esto es porque HDFS supone que algún componente del sistema puede fallar, así que siempre va a tener disponible alguna de las copias. Por otro lado, tienen que existir tantos bloques como hayamos configurado, por eso, si HDFS detecta que un componente no funciona, se encargará de saber qué bloques estaban en ese componente y replicarlos por el resto del clúster.

Es importante saber que cuando un fichero se divide en bloques, si un bloque queda "incompleto" (respecto del tamaño del bloque establecido), ese espacio nunca se va a rellenar ni utilizar con datos de otro fichero.

Este sistema de archivos está más pensado para tener un número de ficheros razonable (no más de algunos millones) de un tamaño bastante grande (del orden del tamaño de bloque HDFS o más grande aún). Es decir, con HDFS es mejor tener menos ficheros, pero más grandes (un ítem de metadatos ocupa entre 150 y 200 bytes de memoria RAM en el NameNode).
También es más óptimo para lecturas en streaming.

Está escrito en Java. Y los bloques de datos HDFS se escriben en ficheros (dentro del sistema de archivos estándar del servidor, por ejemplo ext4) en unos directorios específicos de los nodos esclavos (se encargaría el administrador) sólo una vez y no se les puede añadir más información.

Para que HDFS pueda ser un sistema de archivos distribuído, se ha de separar los datos de los metadatos. Para esto utiliza dos tipos de nodos: el NameNode y los DataNodes de los que ya hablaré más detalle en otra entrada.
Resumidamente el NameNode contiene los metadatos, es quien se encarga de dividir y distribuir los bloques por los DataNodes del clúster. Cuando un cliente quiere hacer una lectura de los datos, primero va a preguntar al NameNode, que es quien sabe dónde está cada bloque, y luego va a leer los datos bloque a bloque y en cada uno de los nodos que los contienen.

Características:
  • Almacenamiento redundante
  • Tolerancia a fallos
  • Mecanismo de checksums
  • Política de una escritura (write-once), preparado para muchas lecturas.
  • Escalabilidad horizontal
  • No implementa POSIX
Bloques HDFS

Un cliente puede acceder a HDFS a través de un terminal utilizando los comandos propios de HDFS o vía la API de Java a través de una aplicación.
Y aunque yo o mucha gente que lea el blog sean perfiles de desarrollo, es muy importante saber moverse y tener unos conocimientos básicos de cómo funciona este sistema de ficheros a través del terminal.
Para acceder a través de la línea de comandos se hace con el comando:

     $ hadoop fs

Por sí solo este comando no hace nada (sólo muestra la ayuda), luego existen múltiples comandos que ayudarán al usuario a hacer ciertas interacciones.

Cuando accedemos a través del terminal hay que tener muy claro cuál es nuestro sistema de ficheros tradicional y cuándo estamos en HDFS.

Algunos de estos comandos asociados a hadoop son:
  • Comando encargado de copiar el archivo ficheroLocal.txt que se encuentra en el directorio actual de mi sistema de ficheros local a HDFS, el archivo se llamará ficheroHDFS.txt, pero se encontrará en el directorio HDFS configurado por nuestro administrador (nuestra carpeta de usuario en HDFS):
          $ hadoop fs -put ficheroLocal.txt ficheroHDFS.txt
  • Ahora el comando encargado de copiar un fichero desde HDFS a nuestro sistema de ficheros local:
                    $ hadoop fs -get ficheroHDFS.txt ficheroLocal.txt
  • Si queremos listar el contenido de nuestro directorio HDFS (nuestra carpeta de usuario):
          $ hadoop fs -ls
  • Para mostrar el contenido de un fichero en nuestra carpeta de usuario HDFS:
                    $ hadoop fs -cat ficheroHDFS.txt
  • Queremos crear un directorio en HDFS (en nuestra carpeta de usuario)
                    $ hadoop fs -mkdir miDirectorio
  • Queremos borrar un directorio y todo su contenido:
                    $ hadoop fs -rm -r miDirectorio


Sobre cómo acceder con la API de Java, sería a través del FileSystem API, pero es una parte extensa y ya lo mostraré en otra entrada más adelante.