Recuerda que puedes descargarte algunos de los ejemplos en la pestaña de Código Fuente

lunes, 11 de marzo de 2013

Flujo de ejecución de un Job MapReduce

Cuando un cliente envía un trabajo, primero su información de configuración se empaqueta en un fichero xml, que junto con el fichero jar (que contiene el código del programa) y todo eso es gestionado por el JobTracker.

El JobTracker envía tareas individuales al TaskTracker, el cual, cuando recibe una petición de ejecutar una tarea,  instancia una nueva JVM separada para esta tarea.

Los datos intermedios se generan en el disco local del TaskTracker. Luego los datos se distribuyen por la red hacia los Reducers, que escribirán la salida en HDFS.

Una vez finalizado el Job, el TaskTracker borra los datos intermedios del disco local.


Así es como actuaría de forma genérica, ahora veremos cómo sería el flujo de un MapReduce con más detalle:

Flujo MapReduce

Cuando un cliente envía un Job, se está enviando unos datos de entrada, un Input, que MapReduce primero divide en trozos (normalmente del mismo tamaño que los bloques HDFS) llamados Input Splits, Hadoop ejecuta una tarea Task por cada split generado.
El InputSplit se hace de forma automática, quiero decir, es el InputFormat el que se encarga de crear los InputSplits y de dividirlos en registros.

La ventaja de dividir en esos trozos es que la cantidad de datos a tratar es mucho menor que si se enviara el input entero, entonces al tratarlo en sistema distribuído el tiempo total del proceso será mucho menor.
También la ventaja está en que si se dividen en el tamaño del bloque HDFS, la tarea map se va a ejecutar (la mayor parte de las veces) en el nodo donde se encuentran esos datos almacenados en HDFS, de esta forma se ahorra ancho de banda, ya que no estamos transmitiendo los datos a través de la red.

Hay que tener en cuenta que el InputSplit no contiene los datos como tal, sino una referencia a los datos.

A continuación el TaskTracker pasa el split al RecordReader, que no es más que un iterador sobre los registros de ese split y es entonces cuando la tarea map trata cada registro de tipo par key/value y generando una salida.

Los datos intermedios se almacenan en el disco local, y no en HDFS porque esto supondría realizar el proceso de replicación y eso sería excesivo.

Después, los datos se enviarían al nodo donde la tarea reduce se está ejecutando. Por defecto hay una sola tarea reduce, pero podría haber más. Así que si disponemos de varios reducer, hay una función partition que se encarga de particionar la salida del map  (se suele dividir para que cada key con sus valores vayan al mismo reducer) y enviar cada trozo a un reducer. Hay un partitioner por defecto, pero podríamos crear el nuestro propio que ya veríamos en otro artículo.

También sería posible tener el programa configurado con cero tareas reducer.

El Reduce no dispone del concepto de localización de los datos

La salida del reducer se va guardando en un RecordWriter, que posteriormente va a pasar a generar el output que se almacena en HDFS para aportar fiabilidad. Por cada bloque del output, la primera réplica se almacena en el nodo local, las otras réplicas en el resto de nodos.





miércoles, 6 de marzo de 2013

Hadoop: Introducción al Desarrollo en Java (Parte V): Métodos setup() y cleanup()

Son métodos que se pueden implementar tanto en el Mapper como en el Reducer.

Teniendo en cuenta que cada vez que hay una tarea Map se crea un objeto de tipo mapper (o lo mismo con el reducer). Estos métodos se van a ejecutar antes o después de que todas las tareas map o reduce se hayan ejecutado, y sólo una vez.

setup()

Este método se ejecuta antes de que el método map y/o reduce (según en qué clase se haya implementado) sean llamados por primera vez.
Se usa para ejecutar código antes de que el Mapper o Reducer se ejecuten.

Se suele utilizar para leer datos de ficheros, inicializar estructuras, establecer parámetros, etc.

 
public class MyClass extends Mapper<Type, Type, Type, Type> {

 protected void setup(Context context) 
     throws IOException, InterruptedException{

       // TODO: Implementar código necesario
 }
 public void map(Type key, Type values, Context context) {...}
}


cleanup()

Es como el setup, pero se ejecuta antes de que el Mapper o el Reduccer finalicen.

 
public class MyClass extends Mapper<Type, Type, Type, Type> {

 public void map(Type key, Type values, Context context) {...}

 protected void cleanup(Context context) 
     throws IOException, InterruptedException{

       // TODO: Implementar código necesario
 }
}

Estos son los métodos utilizados en la new API, en la old API los métodos se llamaban configure() y close()



lunes, 4 de marzo de 2013

Hadoop: Introducción al desarrollo en Java (Parte IV): El Driver (Ejemplo Word Count)

El driver se ejecuta en la máquina cliente, se trata de una función main que recibe como argumentos el input y el output y que configura el Job para finalmente enviarlo al clúster.

El Driver desarrollado con la new API:

 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountDriver {
 public static void main(String[] args) throws Exception {
//  Lo primero debería ser comprobar que recibimos
//  2 argumentos (entrada y salida, si es un número
//  diferente sería erróneo
  if (args.length != 2) {
   Sysout.printf("Error");
   System.exit(-1);
  }
 
//  Se crea un nuevo Job indicando la clase que se llamará
//  al ejecutar y el nombre del Job.
//  Configuration servirá en programas más avanzados donde
//  queramos establecer configuraciones diferentes a las
//  que vienen por defecto o para el paso de parámetros.
  Configuration conf = new Configuration();
  Job job = new Job(conf);
  job.setJarByClass(WordCountDriver.class);
  job.setJobName("Word Count");
  
//  Indicamos cuáles son las clases Mapper y Reducer
  job.setMapperClass(WordCountMapper.class);
  job.setReducerClass(wordcount.WordCountReducer.class);

//  Especificamos los directorios input y output, es decir, 
//  el directorio en HDFS donde se encuentra nuestro fichero 
//  de entrada, y dónde va a depositar los resultados
//  Recalcar que es muy importante que la ruta de output no
//  exista (el Job MapReduce la creará él solo).
  FileInputFormat.setInputPaths(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  
//  Se establecen los tipos de la key y del value a la
//  salida del reduce.
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);
  
//  Se establecen los tipos de la key y del value a la
//  salida del map.
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(IntWritable.class);
  
//  Otras configuraciones posibles:
//  Por defecto el tipo del fichero de entrada es 
//  TextInputFormat, se puede cambiar con:
//   job.setInputFormatClass(KeyValueTextInputFormat.class);
//  Por defecto la salida es un fichero de texto, 
//  se puede cambiar con:
//   job.setOutputFormatClass(TextOutputFormat.class);
   
//  Lanzamos el Job al cluster, hay varios modos, en 
//  waitForCompletion si hubiera más código implementado 
//  después de esta línea, no se ejecutaría
//  hasta que no finalizara el Job.
//  Hay otros modos en los que se puede lanzar el Job.
  boolean success = job.waitForCompletion(true);
  System.exit(success ? 0:1); 
 }
}

Algunos puntos a tener en cuenta (para las 2 APIS):
  • Se configuran sólo los tipos de las salidas, no de las entradas. Los tipos de entrada del Mapper están definidos por el InputFormat (en este ejemplo usamos el input format por defecto: TextInputFormat por lo cual las key son de tipo LongWritable y los value son de tipo Text). Los tipos de entrada del Reducer son los mismos que los de salida del Mapper. Igualmente el desarrollador tendrá que indicar cuáles son en los parámetro del Mapper y del Reducer.
  • Si las salidas del mapper y del reducer son del mismo tipo, no hace falta indicar el job.setMapOutputKeyClass ni el job.setMapOutputValueClass, basta con indicar el job.setOutputKeyClass y el job.setOutputValueClass.

Con respecto la old API han cambiado unas cuantas cosas, dejo aquí un código y luego explico las diferencias:

 
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.Reducer;

public class WordCount  { 
 public static void main(String[] args) throws Exception {
  
  JobConf conf = new JobConf(WordCountDriver.class);
  conf.setJobName(this.getClass().getName());

  conf.setMapperClass(WordCountMapper.class);
  conf.setReducerClass(WordCountReducer.class);
  
  conf.setOutputKeyClass(Text.class);
  conf.setOutputValueClass(IntWritable.class);
  conf.setMapOutputKeyClass(Text.class);
  conf.setMapOutputValueClass(IntWritable.class);

  FileInputFormat.setInputPaths(conf, new Path(args[0]));
  FileOutputFormat.setOutputPath(conf, new Path(args[1]));

  JobClient.runJob(conf);
 }
}

Como veréis, las diferencias principales son:
  • En los import, en la new API se utilizan las clases que pertenecen al paquete org.apache.hadoop.mapreduce, mientras que en la old API el paquete era org.apache.hadoop.mapred
  • En la new API el Job se ejecuta a través de la clase Job, en la old API se hace a través de JobClient.
  • En la new API el objeto de configuración es Job, en la old API es JobConf.