Recuerda que puedes descargarte algunos de los ejemplos en la pestaña de Código Fuente

viernes, 15 de marzo de 2013

Tipos de datos Hadoop e Interfaces Writable y WritableComparable

Writable y WritableComparable son las interfaces de Hadoop que se utilizan para la serialización.

Hadoop define sus propios tipos de objetos a partir de los tipos primitivos de Java. Todos ellos heredan de la interfaz WritableComparable, que a su vez hereda de la interfaz Writable y que permiten la Serialización (conversión de los datos en bytes para poder ser transmitidos por red o para su escritura en almacenes persistentes).

Algunos de los tipos son:
  • IntWritable                   ints
  • Text                             strings
  • DoubleWritable           doubles
  • FloatWritable               floats
  • LongWritable               longs
  • ByteWritable                bytes
  • NullWritable

Jerarquía de Clases (Hadoop the Definitive Guide)

De los tipos vistos, aunque son propios de Hadoop, nos son más o menos conocidos, todos excepto quizas el NullWritable. Este tipo de objeto se utiliza para cuando queremos eliminar la key o el value y que Hadoop lo pueda reconocer.
Por ejemplo, si en una salida de un Job MapReduce es de tipo <Text, Text> y tenemos la key siempre a nulo, entonces el fichero de salida tendrá un formato similar a:
[separador] mivalor1
[separador] mivalor2
...
siendo [separador] el carácter de separación (por defecto una tabulación).

Y si la key la definiésemos como NullWritable, la salida sería con este formato:
mivalor1
mivalor2
...
Lo cual nos da un fichero de salida más compacto y más apropiado para usarlo como entrada de otro Job MapReduce.



Durante los desarrollos de aplicaciones Hadoop, en algún momento tendremos la necesidad de crear nuestros propios objetos.
Por ejemplo, imaginemos que queremos como objeto los datos de una persona (nombre y los dos apellidos), sería fácil crear un objeto de tipo Text:

   Text persona = new Text (nombre+" "+primerApellido+" "+segundoApellido);

Como veis, separamos los datos con un espacio, y luego al hacer:

   String[] listaPersonas = persona.toString().split(" ");

Podría ser un problema si uno de los apellidos fuera compuesto y contuviera espacios.
Así que lo mejor sería crear tu propio objeto.

Es muy importante saber que las keys deben implementar siempre la interfaz WritableComparable, y los values la interfaz Writable.

La interfaz Writable

Contiene los métodos de serialización y deserialización readFields y write:
 
public interfaz Writable{
    void readFields(DataInput in);
    void write(DataOutput out);
}

Ejemplo:
 
public class PersonaWritable implements Writable {

 Text nombre, primerApellido, segundoApellido;
 
 public PersonaWritable() {
  this.nombre = new Text();
  this.primerApellido = new Text();
  this.segundoApellido = new Text();
 }

 public PersonaWritable(Text nombre, Text primerApellido,
   Text segundoApellido) {
  this.nombre = nombre;
  this.primerApellido = primerApellido;
  this.segundoApellido = segundoApellido;
 }

 @Override
 public void readFields(DataInput arg0) throws IOException {
  this.nombre.readFields(arg0);
  this.primerApellido.readFields(arg0);
  this.segundoApellido.readFields(arg0);
 }

 @Override
 public void write(DataOutput arg0) throws IOException {
  this.nombre.write(arg0);
  this.primerApellido.write(arg0);
  this.segundoApellido.write(arg0);
 }
 
 // Implementar getters y setters
}



La interfaz WritableComparable

Además de contener los métodos de serialización y deserialización readFields y write, debe implementar los métodos compareTo, hashCode y equals, ya que extiende, además de la interfaz Writable, de la Comparable <   >.

Ejemplo:
 
public class PersonaWritableComparable 
  implements WritableComparable<PersonaWritableComparable>{

 Text nombre, primerApellido, segundoApellido;
 
 public PersonaWritableComparable() {
  this.nombre = new Text();
  this.primerApellido = new Text();
  this.segundoApellido = new Text();
 }

 public PersonaWritableComparable(Text nombre, 
   Text primerApellido, Text segundoApellido) {
  this.nombre = nombre;
  this.primerApellido = primerApellido;
  this.segundoApellido = segundoApellido;
 }
 
 @Override
 public void readFields(DataInput arg0) throws IOException {
  this.nombre.readFields(arg0);
  this.primerApellido.readFields(arg0);
  this.segundoApellido.readFields(arg0);
  
 }

 @Override
 public void write(DataOutput arg0) throws IOException {
  this.nombre.write(arg0);
  this.primerApellido.write(arg0);
  this.segundoApellido.write(arg0);
 }

 @Override
 public int compareTo(PersonaWritableComparable o) {
  if(this.nombre.compareTo(o.nombre) != 0){
   return this.nombre.compareTo(o.nombre);
  }else if(this.primerApellido.compareTo(o.primerApellido) != 0){
   return this.primerApellido.compareTo(o.primerApellido);
  }else if(this.segundoApellido.compareTo(o.segundoApellido) != 0){
   return this.segundoApellido.compareTo(o.segundoApellido);
  }
  return 0;
 }

 @Override
 public boolean equals(Object obj) {
  if(obj instanceof PersonaWritable){
   PersonaWritableComparable p = (PersonaWritableComparable) obj;
   return this.nombre == p.nombre && 
     this.primerApellido == p.primerApellido && 
     this.segundoApellido == p.segundoApellido;
  }
  return false;
 }

 @Override
 public int hashCode() {
  return this.nombre.hashCode()*163 + 
    this.primerApellido.hashCode()*163 + 
    this.segundoApellido.hashCode()*163;
 }

 // Implementar getters y setters
}


Si en algún momento quieres desarrollar un tipo propio y la salida (output) va a ser con este tipo, también hará falta implementar el método toString()

Si tuviéramos que serializar objetos binarios, se haría a través de arrays de bytes

El método write sería:

  1. Serializar el objeto en un array de bytes
  2. Escribir el número de bytes
  3. Escribir el array de bytes
Y el método readFields sería:
  1. Leer el número de bytes
  2. Crear un array de bytes de ese tamaño
  3. Leer el array
  4. Deserializar el objeto

Ver también: Ejemplo de uso de Tipos de Datos propios con las interfaces Writable y WritableComparable

lunes, 11 de marzo de 2013

Flujo de ejecución de un Job MapReduce

Cuando un cliente envía un trabajo, primero su información de configuración se empaqueta en un fichero xml, que junto con el fichero jar (que contiene el código del programa) y todo eso es gestionado por el JobTracker.

El JobTracker envía tareas individuales al TaskTracker, el cual, cuando recibe una petición de ejecutar una tarea,  instancia una nueva JVM separada para esta tarea.

Los datos intermedios se generan en el disco local del TaskTracker. Luego los datos se distribuyen por la red hacia los Reducers, que escribirán la salida en HDFS.

Una vez finalizado el Job, el TaskTracker borra los datos intermedios del disco local.


Así es como actuaría de forma genérica, ahora veremos cómo sería el flujo de un MapReduce con más detalle:

Flujo MapReduce

Cuando un cliente envía un Job, se está enviando unos datos de entrada, un Input, que MapReduce primero divide en trozos (normalmente del mismo tamaño que los bloques HDFS) llamados Input Splits, Hadoop ejecuta una tarea Task por cada split generado.
El InputSplit se hace de forma automática, quiero decir, es el InputFormat el que se encarga de crear los InputSplits y de dividirlos en registros.

La ventaja de dividir en esos trozos es que la cantidad de datos a tratar es mucho menor que si se enviara el input entero, entonces al tratarlo en sistema distribuído el tiempo total del proceso será mucho menor.
También la ventaja está en que si se dividen en el tamaño del bloque HDFS, la tarea map se va a ejecutar (la mayor parte de las veces) en el nodo donde se encuentran esos datos almacenados en HDFS, de esta forma se ahorra ancho de banda, ya que no estamos transmitiendo los datos a través de la red.

Hay que tener en cuenta que el InputSplit no contiene los datos como tal, sino una referencia a los datos.

A continuación el TaskTracker pasa el split al RecordReader, que no es más que un iterador sobre los registros de ese split y es entonces cuando la tarea map trata cada registro de tipo par key/value y generando una salida.

Los datos intermedios se almacenan en el disco local, y no en HDFS porque esto supondría realizar el proceso de replicación y eso sería excesivo.

Después, los datos se enviarían al nodo donde la tarea reduce se está ejecutando. Por defecto hay una sola tarea reduce, pero podría haber más. Así que si disponemos de varios reducer, hay una función partition que se encarga de particionar la salida del map  (se suele dividir para que cada key con sus valores vayan al mismo reducer) y enviar cada trozo a un reducer. Hay un partitioner por defecto, pero podríamos crear el nuestro propio que ya veríamos en otro artículo.

También sería posible tener el programa configurado con cero tareas reducer.

El Reduce no dispone del concepto de localización de los datos

La salida del reducer se va guardando en un RecordWriter, que posteriormente va a pasar a generar el output que se almacena en HDFS para aportar fiabilidad. Por cada bloque del output, la primera réplica se almacena en el nodo local, las otras réplicas en el resto de nodos.





miércoles, 6 de marzo de 2013

Hadoop: Introducción al Desarrollo en Java (Parte V): Métodos setup() y cleanup()

Son métodos que se pueden implementar tanto en el Mapper como en el Reducer.

Teniendo en cuenta que cada vez que hay una tarea Map se crea un objeto de tipo mapper (o lo mismo con el reducer). Estos métodos se van a ejecutar antes o después de que todas las tareas map o reduce se hayan ejecutado, y sólo una vez.

setup()

Este método se ejecuta antes de que el método map y/o reduce (según en qué clase se haya implementado) sean llamados por primera vez.
Se usa para ejecutar código antes de que el Mapper o Reducer se ejecuten.

Se suele utilizar para leer datos de ficheros, inicializar estructuras, establecer parámetros, etc.

 
public class MyClass extends Mapper<Type, Type, Type, Type> {

 protected void setup(Context context) 
     throws IOException, InterruptedException{

       // TODO: Implementar código necesario
 }
 public void map(Type key, Type values, Context context) {...}
}


cleanup()

Es como el setup, pero se ejecuta antes de que el Mapper o el Reduccer finalicen.

 
public class MyClass extends Mapper<Type, Type, Type, Type> {

 public void map(Type key, Type values, Context context) {...}

 protected void cleanup(Context context) 
     throws IOException, InterruptedException{

       // TODO: Implementar código necesario
 }
}

Estos son los métodos utilizados en la new API, en la old API los métodos se llamaban configure() y close()