Recuerda que puedes descargarte algunos de los ejemplos en la pestaña de Código Fuente

domingo, 24 de marzo de 2013

Sequence Files: Ejemplo de creación y lectura a través del FileSystem

Como ya expliqué en esta entrada, FileSystem sirve para acceder a HDFS a través de la API de Java. Así que vamos a usar esta clase para crear y leer un SequenceFile.

Crear un SequenceFile

Crearemos un SequenceFile de un texto, que es un poema, y le pondremos como key el número correspondiente a cada línea.
 
public class CreateSequenceFile {

 private static final String[] POEMA = { 
  "El ciego sol se estrella",
  "en las duras aristas de las armas,",
  "llaga de luz los petos y espaldares",
  "y flamea en las puntas de las lanzas.",
  "El ciego sol, la sed y la fatiga",
  "Por la terrible estepa castellana,",
  "al destierro, con doce de los suyos",
  "-polvo, sudor y hierro- el Cid cabalga.",
  "Cerrado está el mesón a piedra y lodo.",
  "Nadie responde... Al pomo de la espada",
  "y al cuento de las picas el postigo",
  "va a ceder ¡Quema el sol, el aire abrasa!"};
 
 private static final String rutaDestino 
   = new String ("pruebas/poemasequencefile");
 
 public static void main(String[] args) throws IOException {
  Configuration conf = new Configuration();
  FileSystem fs = FileSystem.get( conf);
  Path path = new Path(rutaDestino);
  
  IntWritable key = new IntWritable();
  Text value = new Text();
  
  //Creamos el writer del SequenceFile para poder ir añadiendo
  // los pares key/value al fichero.
  SequenceFile.Writer writer = new SequenceFile.Writer(fs,  
    conf,  path, key.getClass(), value.getClass());
  
  for (int i = 0; i < POEMA.length; i++) { 
   // La key es el número de línea
   key.set(i+1); 
   // El value es la línea del poema correspondiente
   value.set(POEMA[i]); 
   // Escribimos el par en el sequenceFile 
   writer.append(key, value);
  }
  
  writer.close();
 }
}




Leer un SequenceFile

En este ejemplo no sólo vamos a leer el fichero creado anteriormente, sino que también vamos a buscar y a usar los puntos de sincronización, vamos a ver las posiciones del fichero y vamos a desplazarnos a alguna.
 
public class ReadSequenceFile {

 private static final String rutaOrigen 
   = new String ("pruebas/poemasequencefile");
 
 public static void main(String[] args) 
   throws Exception {
  
  Configuration conf = new Configuration();
  FileSystem fs = FileSystem.get( conf);
  Path path = new Path(rutaOrigen);
  
  //Creamos el Reader del SequenceFile
  SequenceFile.Reader reader = 
    new SequenceFile.Reader(fs, path, conf);
  // Leemos la key y value del SequenceFile, los tipos son conocidos,
  // por lo que se declaran variables de esos tipos.
  IntWritable key = 
    (IntWritable) reader.getKeyClass().newInstance();
  Text value = 
    (Text) reader.getValueClass().newInstance();
  
  StringBuilder strBuilder;
  boolean haySync = false;
  long posSync = 0;
  
  //Recorremos el reader recuperando los pares key/value
  while(reader.next(key,value)){
   
   // Comprobamos si la posición es un punto de sync
   // En principio en este fichero no encontrará ninguno ya que es muy
   // pequeño, si fuera uno más grande y tuviera varios puntos de sync
   // se guardará el último punto encontrado.
   if(reader.syncSeen()){
    haySync = true;
    posSync = reader.getPosition();
   }
   
   strBuilder = new StringBuilder("Posición: ").
     append(reader.getPosition()).append(" - Key: ").
     append(key.toString()).append(" Value: " ).
     append(value.toString());
   System.out.println(strBuilder);
  }
  
  if(haySync){
   // reader.sync posicionará el reader en el sync siguiente más próximo,
   // si no hay ninguno se posicionará al final del fichero.
   // En este caso se posicionará en el punto dado, ya que es de sync.
   strBuilder = new StringBuilder("Sync en el punto: ").
     append(posSync);
   System.out.println(strBuilder);
   reader.sync(posSync);
  }else{
   // Es un valor conocido, si no existiera, habría un error
   // al realizar el reader.next.
   posSync = 459;
   reader.seek(posSync);
  }
  
  // En un caso o en otro a pesar de haber finalizado la iteración 
  // hemos posicionado el reader en un punto intermedio, así que 
  // seguimos recorriéndolo (repetimos las líneas)
  // hasta finalizar de nuevo.
  strBuilder = new StringBuilder("Volvemos a la posición: ")
     .append(posSync);
  System.out.println(strBuilder);
  
  System.out.println("Seguimos recorriendo el reader: ");
  while(reader.next(key,value)){
   strBuilder = new StringBuilder("Posición: ").
     append(reader.getPosition()).append(" - Key: ").
     append(key.toString()).append(" Value: " ).
     append(value.toString());
   System.out.println(strBuilder);
  }
  
  reader.close();
 }

}

viernes, 22 de marzo de 2013

Sequence Files

Los SequenceFiles son unos tipos de ficheros de datos propios de Hadoop almacenados en forma de pares key/value y codificados de forma binaria.
Proporcionan una estructura de datos persistente para pares key/value de forma binaria.

En el mismo fichero se almacenan los metadatos que contienen la información de ese fichero (tipo de los datos, nombre, fecha,...).

Este tipo de ficheros se usa muy a menudo en Jobs MapReduce, sobre todo cuando la salida de un Job es la entrada de otro.
También, podemos imaginarlos como ficheros de log donde cada registro es una nueva línea.

Son un tipo de fichero muy adecuados para MapReduce porque además son splittable (que se pueden fragmentar).
Pueden almacenar distintos tipos de datos gracias a que usan una gran variedad de frameworks de serialización.

Soportan compresión, que puede ser de 3 tipos:
  • Uncompressed (No comprimido)
  • Record Compressed (Compresión a nivel de cada par key/value)
  • Block Compressed (Se comprime por bloques)
Sea cual sea el tipo de compresión que utilice, la estructura del encabezado (header) va a ser el mismo, sólo que éste contendrá la información necesaria para su posterior lectura.

Problemas: 
  • Sólo se puede acceder a ellos a través de la AP Java de Hadoop.
  • Si la definición de la key o value cambian, el fichero no se podrá leer.

Para entender de forma visual cómo es la estructura de un SequenceFile, empezamos viendo cómo sería el Header:

Estructura del Header

Esta sería la estructura de un SequenceFile Uncompressed o Record Compressed:

Estructura de un SequenceFile

Así sería la estructura de un registro cuando no está comprimido (Uncompressed):

Registro no comprimido

Y así sería cuando está comprimido por registro (Record Compressed):

Estructura de la compresión por registro


Aunque es la misma estructura de SequenceFile, no pueden existir formatos de compresión distintos dentro de un mismo fichero.

A continuación es el formato de un SequenceFile Block Compressed:

Estructura de un SequenceFile comprimido por bloques


Y por último el formato del bloque:

Estructura del bloque comprimido




Una propiedad de los SequenceFiles es que en la creación introducen puntos de sincronización (sync points).
Estos puntos se pueden utilizar cuando el reader se pierde, por ejemplo, si nos hemos desplazado a buscar en una posición cualquiera en nuestro flujo de datos. Aún más importante, estos puntos de sincronización sirven para definir los InputSplit de los Jobs MapReduce.
Estos sync points se crean automáticamente introduciéndolos cada cierto número de registros cuando realizamos el SequenceFile.Writer.
Se pueden localizar durante la lectura del SequenceFile con el SecuenceFile.Reader a través de:

      reader.syncSeen();

Otra opción que nos dan los SequenceFiles es, como hemos dicho antes, buscar una posición dada en este tipo de ficheros.
Si sabemos la posición exacta a la que queremos acceder, utilizaríamos el método:

      reader.seek(posicion);
      reader.next(key, value);


Si ponemos una posición que no existe daría un error IOException.


Si no conocemos la posición exacta, podemos utilizar:

      reader.sync (posicion)

El reader se posicionará en el siguiente sync point que encuentre después de posicion.
Puede ser el caso que el fichero sea muy pequeño y no exista ningún sync point, lo que sucederá entonces es que el reader se posicionará al final del fichero.

domingo, 17 de marzo de 2013

Ejemplo de uso de Tipos de Datos propios con las interfaces Writable y WritableComparable

Continuando con la entrada anterior en la que explicaba qué son las interfaces Writable y WritableComparable y cómo es posible crear nuestros propios tipos usándolas, vamos a ver un ejemplo.

El código fuente y el fichero de ejemplo de esta entrada también los podréis encontrar en este enlace.

Ejemplo de aplicación MapReduce utilizando nuestra propia clase como key.
Al programar nuestra clase PersonaWritableComparable que implementa WritableComparable, en la fase del Shuffle and Sort se consigue que el Reducer reciba las key ordenadas y con sus valores correspondientes agrupados para poder operar con ellos.

Recibimos un fichero de texto cuya información es
Fecha [tab] Nombre Apellido1 Apellido2 Puntuación
Queremos como salida un listado de personas (con los nombres y apellidos) y la suma de todas sus puntuaciones.

Fichero de entrada score.txt:

01-11-2012 Pepe Perez Gonzalez 21
01-11-2012 Ana Lopez Fernandez 14
01-11-2012 Maria Garcia Martinez 11
01-11-2012 Pablo Sanchez Rodriguez 9
01-11-2012 Angel Martin Hernandez 3
15-11-2012 Pepe Perez Gonzalez 22
15-11-2012 Maria Garcia Martinez 15
15-11-2012 John Smith 13
01-12-2012 Pepe Perez Gonzalez 25
01-12-2012 Ana Lopez Fernandez 15
01-12-2012 Pablo Sanchez Rodriguez 8
01-12-2012 Maria Garcia Martinez 32
15-12-2012 Maria Garcia Martinez 47
15-12-2012 Pepe Perez Gonzalez 13
15-12-2012 Angel Martin Hernandez 13
15-12-2012 John Smith 27
01-01-2013 Ana Lopez Fernandez 5
01-01-2013 Pablo Sanchez Rodriguez 2
01-01-2013 Pepe Perez Gonzalez 17
01-01-2013 Maria Garcia Martinez 3
01-01-2013 Angel Martin Hernandez 32
01-01-2013 John Smith 21


Nuestra propia clase PersonaWritableComparable:
 
public class PersonaWritableComparable 
  implements WritableComparable<PersonaWritableComparable>{

 Text nombre, primerApellido, segundoApellido;
 
 public void set(String nom, String prApell, String sgApell){
  nombre.set(nom);
  primerApellido.set(prApell);
  segundoApellido.set(sgApell);
 }
 
 public PersonaWritableComparable() {
  this.nombre = new Text();
  this.primerApellido = new Text();
  this.segundoApellido = new Text();
 }

 public PersonaWritableComparable(Text nombre, 
   Text primerApellido, Text segundoApellido) {
  this.nombre = nombre;
  this.primerApellido = primerApellido;
  this.segundoApellido = segundoApellido;
 }
 
 @Override
 public void readFields(DataInput arg0) throws IOException {
  this.nombre.readFields(arg0);
  this.primerApellido.readFields(arg0);
  this.segundoApellido.readFields(arg0);
  
 }

 @Override
 public void write(DataOutput arg0) throws IOException {
  this.nombre.write(arg0);
  this.primerApellido.write(arg0);
  this.segundoApellido.write(arg0);
 }

 @Override
 public int compareTo(PersonaWritableComparable o) {
  if(this.nombre.compareTo(o.nombre) != 0){
   return this.nombre.compareTo(o.nombre);
  }else if(this.primerApellido.compareTo(o.primerApellido) != 0){
   return this.primerApellido.compareTo(o.primerApellido);
  }else if(this.segundoApellido.compareTo(o.segundoApellido) != 0){
   return this.segundoApellido.compareTo(o.segundoApellido);
  }
  return 0;
 }

 @Override
 public boolean equals(Object obj) {
  if(obj instanceof PersonaWritableComparable){
   PersonaWritableComparable p = (PersonaWritableComparable) obj;
   return this.nombre.equals(p.nombre) && 
    this.primerApellido.equals(p.primerApellido) && 
    this.segundoApellido.equals(p.segundoApellido);
  }
  return false;
 }

 @Override
 public int hashCode() {
  return this.nombre.hashCode()*163 + 
    this.primerApellido.hashCode()*163 + 
    this.segundoApellido.hashCode()*163;
 }
 
 @Override
 public String toString() {
  return nombre.toString()+" "+primerApellido.toString()+" "
   +segundoApellido.toString();
 }
}


El Driver de la aplicación:
 
public class PersonaScoreDriver {
 public static void main(String[] args) throws Exception {
  if(args.length != 2){
   System.out.println("Ha ocurrido un error en la entrada");
   System.exit(-1);
  }
  
  Configuration conf = new Configuration();
  Job job = new Job(conf);
  job.setJarByClass(PersonaScoreDriver.class);
  
  job.setJobName("Persona Score");
  
  job.setOutputKeyClass(PersonaWritableComparable.class);
  job.setOutputValueClass(IntWritable.class);

  FileInputFormat.setInputPaths(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  
  job.setMapperClass(PersonaScoreMapper.class);
  job.setReducerClass(PersonaScoreReducer.class);

  boolean success = job.waitForCompletion(true);
  System.exit(success ? 0:1);  
 }
}


La clase Mapper:
 
public class PersonaScoreMapper extends 
 Mapper<LongWritable, Text, 
 PersonaWritableComparable, IntWritable> {

 private IntWritable score = new IntWritable();
 PersonaWritableComparable persona = new PersonaWritableComparable();
 
 public void map(LongWritable key, Text values,
   Context context) throws IOException, InterruptedException {
  
  // El texto tiene este formato:
  // 01-11-2012 Maria Garcia Martinez 11
  // La fecha separada por tabulación, el resto con espacios
  String[] primerSplit = values.toString().split(" ");
  if(primerSplit.length == 2){
   String[] segundoSplit = primerSplit[1].split(" ");
   
   // Puede haber personas con un apellido o con dos
   if(segundoSplit.length == 3 || segundoSplit.length == 4){
    if(segundoSplit.length == 3){
     persona.set(segundoSplit[0], segundoSplit[1], "");
     score.set(Integer.valueOf(segundoSplit[2]));
    }else {
     persona.set(segundoSplit[0], segundoSplit[1], segundoSplit[2]);
     score.set(Integer.valueOf(segundoSplit[3]));
    }
    context.write(persona, score);
   } 
  }
 }
}


La clase Reducer
 
public class PersonaScoreReducer extends 
 Reducer<PersonaWritableComparable, IntWritable, 
 PersonaWritableComparable, IntWritable> {

 public void reduce(PersonaWritableComparable key, 
   Iterable<IntWritable> values,
   Context context) throws IOException, InterruptedException {
  
  int suma = 0;
  for (IntWritable value : values) {
   suma += value.get();
  }
  
  context.write(key, new IntWritable(suma));
 }
}

Ver también: Tipos de datos Hadoop e interfaces Writable y WritableComparable

viernes, 15 de marzo de 2013

Tipos de datos Hadoop e Interfaces Writable y WritableComparable

Writable y WritableComparable son las interfaces de Hadoop que se utilizan para la serialización.

Hadoop define sus propios tipos de objetos a partir de los tipos primitivos de Java. Todos ellos heredan de la interfaz WritableComparable, que a su vez hereda de la interfaz Writable y que permiten la Serialización (conversión de los datos en bytes para poder ser transmitidos por red o para su escritura en almacenes persistentes).

Algunos de los tipos son:
  • IntWritable                   ints
  • Text                             strings
  • DoubleWritable           doubles
  • FloatWritable               floats
  • LongWritable               longs
  • ByteWritable                bytes
  • NullWritable

Jerarquía de Clases (Hadoop the Definitive Guide)

De los tipos vistos, aunque son propios de Hadoop, nos son más o menos conocidos, todos excepto quizas el NullWritable. Este tipo de objeto se utiliza para cuando queremos eliminar la key o el value y que Hadoop lo pueda reconocer.
Por ejemplo, si en una salida de un Job MapReduce es de tipo <Text, Text> y tenemos la key siempre a nulo, entonces el fichero de salida tendrá un formato similar a:
[separador] mivalor1
[separador] mivalor2
...
siendo [separador] el carácter de separación (por defecto una tabulación).

Y si la key la definiésemos como NullWritable, la salida sería con este formato:
mivalor1
mivalor2
...
Lo cual nos da un fichero de salida más compacto y más apropiado para usarlo como entrada de otro Job MapReduce.



Durante los desarrollos de aplicaciones Hadoop, en algún momento tendremos la necesidad de crear nuestros propios objetos.
Por ejemplo, imaginemos que queremos como objeto los datos de una persona (nombre y los dos apellidos), sería fácil crear un objeto de tipo Text:

   Text persona = new Text (nombre+" "+primerApellido+" "+segundoApellido);

Como veis, separamos los datos con un espacio, y luego al hacer:

   String[] listaPersonas = persona.toString().split(" ");

Podría ser un problema si uno de los apellidos fuera compuesto y contuviera espacios.
Así que lo mejor sería crear tu propio objeto.

Es muy importante saber que las keys deben implementar siempre la interfaz WritableComparable, y los values la interfaz Writable.

La interfaz Writable

Contiene los métodos de serialización y deserialización readFields y write:
 
public interfaz Writable{
    void readFields(DataInput in);
    void write(DataOutput out);
}

Ejemplo:
 
public class PersonaWritable implements Writable {

 Text nombre, primerApellido, segundoApellido;
 
 public PersonaWritable() {
  this.nombre = new Text();
  this.primerApellido = new Text();
  this.segundoApellido = new Text();
 }

 public PersonaWritable(Text nombre, Text primerApellido,
   Text segundoApellido) {
  this.nombre = nombre;
  this.primerApellido = primerApellido;
  this.segundoApellido = segundoApellido;
 }

 @Override
 public void readFields(DataInput arg0) throws IOException {
  this.nombre.readFields(arg0);
  this.primerApellido.readFields(arg0);
  this.segundoApellido.readFields(arg0);
 }

 @Override
 public void write(DataOutput arg0) throws IOException {
  this.nombre.write(arg0);
  this.primerApellido.write(arg0);
  this.segundoApellido.write(arg0);
 }
 
 // Implementar getters y setters
}



La interfaz WritableComparable

Además de contener los métodos de serialización y deserialización readFields y write, debe implementar los métodos compareTo, hashCode y equals, ya que extiende, además de la interfaz Writable, de la Comparable <   >.

Ejemplo:
 
public class PersonaWritableComparable 
  implements WritableComparable<PersonaWritableComparable>{

 Text nombre, primerApellido, segundoApellido;
 
 public PersonaWritableComparable() {
  this.nombre = new Text();
  this.primerApellido = new Text();
  this.segundoApellido = new Text();
 }

 public PersonaWritableComparable(Text nombre, 
   Text primerApellido, Text segundoApellido) {
  this.nombre = nombre;
  this.primerApellido = primerApellido;
  this.segundoApellido = segundoApellido;
 }
 
 @Override
 public void readFields(DataInput arg0) throws IOException {
  this.nombre.readFields(arg0);
  this.primerApellido.readFields(arg0);
  this.segundoApellido.readFields(arg0);
  
 }

 @Override
 public void write(DataOutput arg0) throws IOException {
  this.nombre.write(arg0);
  this.primerApellido.write(arg0);
  this.segundoApellido.write(arg0);
 }

 @Override
 public int compareTo(PersonaWritableComparable o) {
  if(this.nombre.compareTo(o.nombre) != 0){
   return this.nombre.compareTo(o.nombre);
  }else if(this.primerApellido.compareTo(o.primerApellido) != 0){
   return this.primerApellido.compareTo(o.primerApellido);
  }else if(this.segundoApellido.compareTo(o.segundoApellido) != 0){
   return this.segundoApellido.compareTo(o.segundoApellido);
  }
  return 0;
 }

 @Override
 public boolean equals(Object obj) {
  if(obj instanceof PersonaWritable){
   PersonaWritableComparable p = (PersonaWritableComparable) obj;
   return this.nombre == p.nombre && 
     this.primerApellido == p.primerApellido && 
     this.segundoApellido == p.segundoApellido;
  }
  return false;
 }

 @Override
 public int hashCode() {
  return this.nombre.hashCode()*163 + 
    this.primerApellido.hashCode()*163 + 
    this.segundoApellido.hashCode()*163;
 }

 // Implementar getters y setters
}


Si en algún momento quieres desarrollar un tipo propio y la salida (output) va a ser con este tipo, también hará falta implementar el método toString()

Si tuviéramos que serializar objetos binarios, se haría a través de arrays de bytes

El método write sería:

  1. Serializar el objeto en un array de bytes
  2. Escribir el número de bytes
  3. Escribir el array de bytes
Y el método readFields sería:
  1. Leer el número de bytes
  2. Crear un array de bytes de ese tamaño
  3. Leer el array
  4. Deserializar el objeto

Ver también: Ejemplo de uso de Tipos de Datos propios con las interfaces Writable y WritableComparable

lunes, 11 de marzo de 2013

Flujo de ejecución de un Job MapReduce

Cuando un cliente envía un trabajo, primero su información de configuración se empaqueta en un fichero xml, que junto con el fichero jar (que contiene el código del programa) y todo eso es gestionado por el JobTracker.

El JobTracker envía tareas individuales al TaskTracker, el cual, cuando recibe una petición de ejecutar una tarea,  instancia una nueva JVM separada para esta tarea.

Los datos intermedios se generan en el disco local del TaskTracker. Luego los datos se distribuyen por la red hacia los Reducers, que escribirán la salida en HDFS.

Una vez finalizado el Job, el TaskTracker borra los datos intermedios del disco local.


Así es como actuaría de forma genérica, ahora veremos cómo sería el flujo de un MapReduce con más detalle:

Flujo MapReduce

Cuando un cliente envía un Job, se está enviando unos datos de entrada, un Input, que MapReduce primero divide en trozos (normalmente del mismo tamaño que los bloques HDFS) llamados Input Splits, Hadoop ejecuta una tarea Task por cada split generado.
El InputSplit se hace de forma automática, quiero decir, es el InputFormat el que se encarga de crear los InputSplits y de dividirlos en registros.

La ventaja de dividir en esos trozos es que la cantidad de datos a tratar es mucho menor que si se enviara el input entero, entonces al tratarlo en sistema distribuído el tiempo total del proceso será mucho menor.
También la ventaja está en que si se dividen en el tamaño del bloque HDFS, la tarea map se va a ejecutar (la mayor parte de las veces) en el nodo donde se encuentran esos datos almacenados en HDFS, de esta forma se ahorra ancho de banda, ya que no estamos transmitiendo los datos a través de la red.

Hay que tener en cuenta que el InputSplit no contiene los datos como tal, sino una referencia a los datos.

A continuación el TaskTracker pasa el split al RecordReader, que no es más que un iterador sobre los registros de ese split y es entonces cuando la tarea map trata cada registro de tipo par key/value y generando una salida.

Los datos intermedios se almacenan en el disco local, y no en HDFS porque esto supondría realizar el proceso de replicación y eso sería excesivo.

Después, los datos se enviarían al nodo donde la tarea reduce se está ejecutando. Por defecto hay una sola tarea reduce, pero podría haber más. Así que si disponemos de varios reducer, hay una función partition que se encarga de particionar la salida del map  (se suele dividir para que cada key con sus valores vayan al mismo reducer) y enviar cada trozo a un reducer. Hay un partitioner por defecto, pero podríamos crear el nuestro propio que ya veríamos en otro artículo.

También sería posible tener el programa configurado con cero tareas reducer.

El Reduce no dispone del concepto de localización de los datos

La salida del reducer se va guardando en un RecordWriter, que posteriormente va a pasar a generar el output que se almacena en HDFS para aportar fiabilidad. Por cada bloque del output, la primera réplica se almacena en el nodo local, las otras réplicas en el resto de nodos.





miércoles, 6 de marzo de 2013

Hadoop: Introducción al Desarrollo en Java (Parte V): Métodos setup() y cleanup()

Son métodos que se pueden implementar tanto en el Mapper como en el Reducer.

Teniendo en cuenta que cada vez que hay una tarea Map se crea un objeto de tipo mapper (o lo mismo con el reducer). Estos métodos se van a ejecutar antes o después de que todas las tareas map o reduce se hayan ejecutado, y sólo una vez.

setup()

Este método se ejecuta antes de que el método map y/o reduce (según en qué clase se haya implementado) sean llamados por primera vez.
Se usa para ejecutar código antes de que el Mapper o Reducer se ejecuten.

Se suele utilizar para leer datos de ficheros, inicializar estructuras, establecer parámetros, etc.

 
public class MyClass extends Mapper<Type, Type, Type, Type> {

 protected void setup(Context context) 
     throws IOException, InterruptedException{

       // TODO: Implementar código necesario
 }
 public void map(Type key, Type values, Context context) {...}
}


cleanup()

Es como el setup, pero se ejecuta antes de que el Mapper o el Reduccer finalicen.

 
public class MyClass extends Mapper<Type, Type, Type, Type> {

 public void map(Type key, Type values, Context context) {...}

 protected void cleanup(Context context) 
     throws IOException, InterruptedException{

       // TODO: Implementar código necesario
 }
}

Estos son los métodos utilizados en la new API, en la old API los métodos se llamaban configure() y close()



lunes, 4 de marzo de 2013

Hadoop: Introducción al desarrollo en Java (Parte IV): El Driver (Ejemplo Word Count)

El driver se ejecuta en la máquina cliente, se trata de una función main que recibe como argumentos el input y el output y que configura el Job para finalmente enviarlo al clúster.

El Driver desarrollado con la new API:

 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountDriver {
 public static void main(String[] args) throws Exception {
//  Lo primero debería ser comprobar que recibimos
//  2 argumentos (entrada y salida, si es un número
//  diferente sería erróneo
  if (args.length != 2) {
   Sysout.printf("Error");
   System.exit(-1);
  }
 
//  Se crea un nuevo Job indicando la clase que se llamará
//  al ejecutar y el nombre del Job.
//  Configuration servirá en programas más avanzados donde
//  queramos establecer configuraciones diferentes a las
//  que vienen por defecto o para el paso de parámetros.
  Configuration conf = new Configuration();
  Job job = new Job(conf);
  job.setJarByClass(WordCountDriver.class);
  job.setJobName("Word Count");
  
//  Indicamos cuáles son las clases Mapper y Reducer
  job.setMapperClass(WordCountMapper.class);
  job.setReducerClass(wordcount.WordCountReducer.class);

//  Especificamos los directorios input y output, es decir, 
//  el directorio en HDFS donde se encuentra nuestro fichero 
//  de entrada, y dónde va a depositar los resultados
//  Recalcar que es muy importante que la ruta de output no
//  exista (el Job MapReduce la creará él solo).
  FileInputFormat.setInputPaths(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  
//  Se establecen los tipos de la key y del value a la
//  salida del reduce.
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);
  
//  Se establecen los tipos de la key y del value a la
//  salida del map.
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(IntWritable.class);
  
//  Otras configuraciones posibles:
//  Por defecto el tipo del fichero de entrada es 
//  TextInputFormat, se puede cambiar con:
//   job.setInputFormatClass(KeyValueTextInputFormat.class);
//  Por defecto la salida es un fichero de texto, 
//  se puede cambiar con:
//   job.setOutputFormatClass(TextOutputFormat.class);
   
//  Lanzamos el Job al cluster, hay varios modos, en 
//  waitForCompletion si hubiera más código implementado 
//  después de esta línea, no se ejecutaría
//  hasta que no finalizara el Job.
//  Hay otros modos en los que se puede lanzar el Job.
  boolean success = job.waitForCompletion(true);
  System.exit(success ? 0:1); 
 }
}

Algunos puntos a tener en cuenta (para las 2 APIS):
  • Se configuran sólo los tipos de las salidas, no de las entradas. Los tipos de entrada del Mapper están definidos por el InputFormat (en este ejemplo usamos el input format por defecto: TextInputFormat por lo cual las key son de tipo LongWritable y los value son de tipo Text). Los tipos de entrada del Reducer son los mismos que los de salida del Mapper. Igualmente el desarrollador tendrá que indicar cuáles son en los parámetro del Mapper y del Reducer.
  • Si las salidas del mapper y del reducer son del mismo tipo, no hace falta indicar el job.setMapOutputKeyClass ni el job.setMapOutputValueClass, basta con indicar el job.setOutputKeyClass y el job.setOutputValueClass.

Con respecto la old API han cambiado unas cuantas cosas, dejo aquí un código y luego explico las diferencias:

 
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.Reducer;

public class WordCount  { 
 public static void main(String[] args) throws Exception {
  
  JobConf conf = new JobConf(WordCountDriver.class);
  conf.setJobName(this.getClass().getName());

  conf.setMapperClass(WordCountMapper.class);
  conf.setReducerClass(WordCountReducer.class);
  
  conf.setOutputKeyClass(Text.class);
  conf.setOutputValueClass(IntWritable.class);
  conf.setMapOutputKeyClass(Text.class);
  conf.setMapOutputValueClass(IntWritable.class);

  FileInputFormat.setInputPaths(conf, new Path(args[0]));
  FileOutputFormat.setOutputPath(conf, new Path(args[1]));

  JobClient.runJob(conf);
 }
}

Como veréis, las diferencias principales son:
  • En los import, en la new API se utilizan las clases que pertenecen al paquete org.apache.hadoop.mapreduce, mientras que en la old API el paquete era org.apache.hadoop.mapred
  • En la new API el Job se ejecuta a través de la clase Job, en la old API se hace a través de JobClient.
  • En la new API el objeto de configuración es Job, en la old API es JobConf.

sábado, 2 de marzo de 2013

Hadoop: Introducción al desarrollo en Java (Parte III): El Reducer (Ejemplo Word Count)

El Reducer implementa el método reduce y es la parte del programa que va a recibir los datos intermedios y tras haber sufrido el proceso "Shuffle and Sort", es decir, va a recibir para cada key su lista de valores correspondiente. Devolveré pares key/value tras haber hecho ciertas operaciones y obtener los valores que necesitamos.

 
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

// El reducer debe extender de la clase Reducer, 
// que espera 4 objetos que definen los tipos, los 
// 2 primeros la key/value de entrada (que son los
// valores intermedios )y los 2
// últimos la key/value de salida
public class WordCountReducer extends 
 Reducer<Text, IntWritable, Text, IntWritable> {
// En el reducer, al igual que en el mapper se podrían
// reutilizar los objetos declarándolos aquí. 
// Pero esta vez lo implemento sin usarlo para que 
// podáis ver cómo quedaría.


// El método reduce recibe 3 atributos, el primero
// es la key de entrada y el segundo es una lista
// de los valores intermedios asociados a esa key.
// Al igual que el Mapper, recibe el objeto Context
// para escribir la salida y otras informaciones.
 public void reduce(Text key, Iterable<IntWritable> values, 
   Context context) 
   throws IOException, InterruptedException {
  
  int count = 0;

// Se va recorriendo la lista de valores y para cada
// uno se extrae a través del .get() el valor correspondiente
// Se van sumando esos valores para obtener el total
// de veces que aparece una palabra.
  for (IntWritable value : values) {
   count += value.get();
  }
// Finalmente escribimos el resultado en HDFS usando 
// el context.write
  context.write(key, new IntWritable(count));
 }
}

Y este es el mismo código pero para la old API:
 
import java.io.IOException; 
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapred.OutputCollector; 
import org.apache.hadoop.mapred.MapReduceBase; 
import org.apache.hadoop.mapred.Reducer; 
import org.apache.hadoop.mapred.Reporter;

public class WordCountReducer extends MapReduceBase 
  implements Reducer<Text, IntWritable, Text, IntWritable> {

    public void reduce(Text key, Iterator<IntWritable> values, 
      OutputCollector<Text, IntWritable> output, Reporter reporter) 
      throws IOException {

         int wordCount = 0; 
         while (values.hasNext()) {
            IntWritable value = values.next(); 
            wordCount += value.get();
         } 
         output.collect(key, new IntWritable(wordCount));
    }
}


Las diferencias principales son las mismas que en el Mapper, pero aquí las pongo:
  • En la new API la clase sólo extiende de Reducer, mientras que en la old API necesita extender de MapReduceBase e implementar Reducer.
  • La new API recibe 3 atributos: los 2 tipos del par key/value y el context. La old API recibía 4, los 2 tipos de la key/value, un OutputCollector que es donde se escribían los datos intermedios y un objeto Reporter que servía para devolver cierta información al Driver. En la new API este paso de información se puede hacer con el Context.


Ver también: