Recuerda que puedes descargarte algunos de los ejemplos en la pestaña de Código Fuente

miércoles, 6 de marzo de 2013

Hadoop: Introducción al Desarrollo en Java (Parte V): Métodos setup() y cleanup()

Son métodos que se pueden implementar tanto en el Mapper como en el Reducer.

Teniendo en cuenta que cada vez que hay una tarea Map se crea un objeto de tipo mapper (o lo mismo con el reducer). Estos métodos se van a ejecutar antes o después de que todas las tareas map o reduce se hayan ejecutado, y sólo una vez.

setup()

Este método se ejecuta antes de que el método map y/o reduce (según en qué clase se haya implementado) sean llamados por primera vez.
Se usa para ejecutar código antes de que el Mapper o Reducer se ejecuten.

Se suele utilizar para leer datos de ficheros, inicializar estructuras, establecer parámetros, etc.

 
public class MyClass extends Mapper<Type, Type, Type, Type> {

 protected void setup(Context context) 
     throws IOException, InterruptedException{

       // TODO: Implementar código necesario
 }
 public void map(Type key, Type values, Context context) {...}
}


cleanup()

Es como el setup, pero se ejecuta antes de que el Mapper o el Reduccer finalicen.

 
public class MyClass extends Mapper<Type, Type, Type, Type> {

 public void map(Type key, Type values, Context context) {...}

 protected void cleanup(Context context) 
     throws IOException, InterruptedException{

       // TODO: Implementar código necesario
 }
}

Estos son los métodos utilizados en la new API, en la old API los métodos se llamaban configure() y close()



lunes, 4 de marzo de 2013

Hadoop: Introducción al desarrollo en Java (Parte IV): El Driver (Ejemplo Word Count)

El driver se ejecuta en la máquina cliente, se trata de una función main que recibe como argumentos el input y el output y que configura el Job para finalmente enviarlo al clúster.

El Driver desarrollado con la new API:

 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountDriver {
 public static void main(String[] args) throws Exception {
//  Lo primero debería ser comprobar que recibimos
//  2 argumentos (entrada y salida, si es un número
//  diferente sería erróneo
  if (args.length != 2) {
   Sysout.printf("Error");
   System.exit(-1);
  }
 
//  Se crea un nuevo Job indicando la clase que se llamará
//  al ejecutar y el nombre del Job.
//  Configuration servirá en programas más avanzados donde
//  queramos establecer configuraciones diferentes a las
//  que vienen por defecto o para el paso de parámetros.
  Configuration conf = new Configuration();
  Job job = new Job(conf);
  job.setJarByClass(WordCountDriver.class);
  job.setJobName("Word Count");
  
//  Indicamos cuáles son las clases Mapper y Reducer
  job.setMapperClass(WordCountMapper.class);
  job.setReducerClass(wordcount.WordCountReducer.class);

//  Especificamos los directorios input y output, es decir, 
//  el directorio en HDFS donde se encuentra nuestro fichero 
//  de entrada, y dónde va a depositar los resultados
//  Recalcar que es muy importante que la ruta de output no
//  exista (el Job MapReduce la creará él solo).
  FileInputFormat.setInputPaths(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  
//  Se establecen los tipos de la key y del value a la
//  salida del reduce.
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);
  
//  Se establecen los tipos de la key y del value a la
//  salida del map.
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(IntWritable.class);
  
//  Otras configuraciones posibles:
//  Por defecto el tipo del fichero de entrada es 
//  TextInputFormat, se puede cambiar con:
//   job.setInputFormatClass(KeyValueTextInputFormat.class);
//  Por defecto la salida es un fichero de texto, 
//  se puede cambiar con:
//   job.setOutputFormatClass(TextOutputFormat.class);
   
//  Lanzamos el Job al cluster, hay varios modos, en 
//  waitForCompletion si hubiera más código implementado 
//  después de esta línea, no se ejecutaría
//  hasta que no finalizara el Job.
//  Hay otros modos en los que se puede lanzar el Job.
  boolean success = job.waitForCompletion(true);
  System.exit(success ? 0:1); 
 }
}

Algunos puntos a tener en cuenta (para las 2 APIS):
  • Se configuran sólo los tipos de las salidas, no de las entradas. Los tipos de entrada del Mapper están definidos por el InputFormat (en este ejemplo usamos el input format por defecto: TextInputFormat por lo cual las key son de tipo LongWritable y los value son de tipo Text). Los tipos de entrada del Reducer son los mismos que los de salida del Mapper. Igualmente el desarrollador tendrá que indicar cuáles son en los parámetro del Mapper y del Reducer.
  • Si las salidas del mapper y del reducer son del mismo tipo, no hace falta indicar el job.setMapOutputKeyClass ni el job.setMapOutputValueClass, basta con indicar el job.setOutputKeyClass y el job.setOutputValueClass.

Con respecto la old API han cambiado unas cuantas cosas, dejo aquí un código y luego explico las diferencias:

 
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.Reducer;

public class WordCount  { 
 public static void main(String[] args) throws Exception {
  
  JobConf conf = new JobConf(WordCountDriver.class);
  conf.setJobName(this.getClass().getName());

  conf.setMapperClass(WordCountMapper.class);
  conf.setReducerClass(WordCountReducer.class);
  
  conf.setOutputKeyClass(Text.class);
  conf.setOutputValueClass(IntWritable.class);
  conf.setMapOutputKeyClass(Text.class);
  conf.setMapOutputValueClass(IntWritable.class);

  FileInputFormat.setInputPaths(conf, new Path(args[0]));
  FileOutputFormat.setOutputPath(conf, new Path(args[1]));

  JobClient.runJob(conf);
 }
}

Como veréis, las diferencias principales son:
  • En los import, en la new API se utilizan las clases que pertenecen al paquete org.apache.hadoop.mapreduce, mientras que en la old API el paquete era org.apache.hadoop.mapred
  • En la new API el Job se ejecuta a través de la clase Job, en la old API se hace a través de JobClient.
  • En la new API el objeto de configuración es Job, en la old API es JobConf.

sábado, 2 de marzo de 2013

Hadoop: Introducción al desarrollo en Java (Parte III): El Reducer (Ejemplo Word Count)

El Reducer implementa el método reduce y es la parte del programa que va a recibir los datos intermedios y tras haber sufrido el proceso "Shuffle and Sort", es decir, va a recibir para cada key su lista de valores correspondiente. Devolveré pares key/value tras haber hecho ciertas operaciones y obtener los valores que necesitamos.

 
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

// El reducer debe extender de la clase Reducer, 
// que espera 4 objetos que definen los tipos, los 
// 2 primeros la key/value de entrada (que son los
// valores intermedios )y los 2
// últimos la key/value de salida
public class WordCountReducer extends 
 Reducer<Text, IntWritable, Text, IntWritable> {
// En el reducer, al igual que en el mapper se podrían
// reutilizar los objetos declarándolos aquí. 
// Pero esta vez lo implemento sin usarlo para que 
// podáis ver cómo quedaría.


// El método reduce recibe 3 atributos, el primero
// es la key de entrada y el segundo es una lista
// de los valores intermedios asociados a esa key.
// Al igual que el Mapper, recibe el objeto Context
// para escribir la salida y otras informaciones.
 public void reduce(Text key, Iterable<IntWritable> values, 
   Context context) 
   throws IOException, InterruptedException {
  
  int count = 0;

// Se va recorriendo la lista de valores y para cada
// uno se extrae a través del .get() el valor correspondiente
// Se van sumando esos valores para obtener el total
// de veces que aparece una palabra.
  for (IntWritable value : values) {
   count += value.get();
  }
// Finalmente escribimos el resultado en HDFS usando 
// el context.write
  context.write(key, new IntWritable(count));
 }
}

Y este es el mismo código pero para la old API:
 
import java.io.IOException; 
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapred.OutputCollector; 
import org.apache.hadoop.mapred.MapReduceBase; 
import org.apache.hadoop.mapred.Reducer; 
import org.apache.hadoop.mapred.Reporter;

public class WordCountReducer extends MapReduceBase 
  implements Reducer<Text, IntWritable, Text, IntWritable> {

    public void reduce(Text key, Iterator<IntWritable> values, 
      OutputCollector<Text, IntWritable> output, Reporter reporter) 
      throws IOException {

         int wordCount = 0; 
         while (values.hasNext()) {
            IntWritable value = values.next(); 
            wordCount += value.get();
         } 
         output.collect(key, new IntWritable(wordCount));
    }
}


Las diferencias principales son las mismas que en el Mapper, pero aquí las pongo:
  • En la new API la clase sólo extiende de Reducer, mientras que en la old API necesita extender de MapReduceBase e implementar Reducer.
  • La new API recibe 3 atributos: los 2 tipos del par key/value y el context. La old API recibía 4, los 2 tipos de la key/value, un OutputCollector que es donde se escribían los datos intermedios y un objeto Reporter que servía para devolver cierta información al Driver. En la new API este paso de información se puede hacer con el Context.


Ver también: