Recuerda que puedes descargarte algunos de los ejemplos en la pestaña de Código Fuente

viernes, 22 de marzo de 2013

Sequence Files

Los SequenceFiles son unos tipos de ficheros de datos propios de Hadoop almacenados en forma de pares key/value y codificados de forma binaria.
Proporcionan una estructura de datos persistente para pares key/value de forma binaria.

En el mismo fichero se almacenan los metadatos que contienen la información de ese fichero (tipo de los datos, nombre, fecha,...).

Este tipo de ficheros se usa muy a menudo en Jobs MapReduce, sobre todo cuando la salida de un Job es la entrada de otro.
También, podemos imaginarlos como ficheros de log donde cada registro es una nueva línea.

Son un tipo de fichero muy adecuados para MapReduce porque además son splittable (que se pueden fragmentar).
Pueden almacenar distintos tipos de datos gracias a que usan una gran variedad de frameworks de serialización.

Soportan compresión, que puede ser de 3 tipos:
  • Uncompressed (No comprimido)
  • Record Compressed (Compresión a nivel de cada par key/value)
  • Block Compressed (Se comprime por bloques)
Sea cual sea el tipo de compresión que utilice, la estructura del encabezado (header) va a ser el mismo, sólo que éste contendrá la información necesaria para su posterior lectura.

Problemas: 
  • Sólo se puede acceder a ellos a través de la AP Java de Hadoop.
  • Si la definición de la key o value cambian, el fichero no se podrá leer.

Para entender de forma visual cómo es la estructura de un SequenceFile, empezamos viendo cómo sería el Header:

Estructura del Header

Esta sería la estructura de un SequenceFile Uncompressed o Record Compressed:

Estructura de un SequenceFile

Así sería la estructura de un registro cuando no está comprimido (Uncompressed):

Registro no comprimido

Y así sería cuando está comprimido por registro (Record Compressed):

Estructura de la compresión por registro


Aunque es la misma estructura de SequenceFile, no pueden existir formatos de compresión distintos dentro de un mismo fichero.

A continuación es el formato de un SequenceFile Block Compressed:

Estructura de un SequenceFile comprimido por bloques


Y por último el formato del bloque:

Estructura del bloque comprimido




Una propiedad de los SequenceFiles es que en la creación introducen puntos de sincronización (sync points).
Estos puntos se pueden utilizar cuando el reader se pierde, por ejemplo, si nos hemos desplazado a buscar en una posición cualquiera en nuestro flujo de datos. Aún más importante, estos puntos de sincronización sirven para definir los InputSplit de los Jobs MapReduce.
Estos sync points se crean automáticamente introduciéndolos cada cierto número de registros cuando realizamos el SequenceFile.Writer.
Se pueden localizar durante la lectura del SequenceFile con el SecuenceFile.Reader a través de:

      reader.syncSeen();

Otra opción que nos dan los SequenceFiles es, como hemos dicho antes, buscar una posición dada en este tipo de ficheros.
Si sabemos la posición exacta a la que queremos acceder, utilizaríamos el método:

      reader.seek(posicion);
      reader.next(key, value);


Si ponemos una posición que no existe daría un error IOException.


Si no conocemos la posición exacta, podemos utilizar:

      reader.sync (posicion)

El reader se posicionará en el siguiente sync point que encuentre después de posicion.
Puede ser el caso que el fichero sea muy pequeño y no exista ningún sync point, lo que sucederá entonces es que el reader se posicionará al final del fichero.

domingo, 17 de marzo de 2013

Ejemplo de uso de Tipos de Datos propios con las interfaces Writable y WritableComparable

Continuando con la entrada anterior en la que explicaba qué son las interfaces Writable y WritableComparable y cómo es posible crear nuestros propios tipos usándolas, vamos a ver un ejemplo.

El código fuente y el fichero de ejemplo de esta entrada también los podréis encontrar en este enlace.

Ejemplo de aplicación MapReduce utilizando nuestra propia clase como key.
Al programar nuestra clase PersonaWritableComparable que implementa WritableComparable, en la fase del Shuffle and Sort se consigue que el Reducer reciba las key ordenadas y con sus valores correspondientes agrupados para poder operar con ellos.

Recibimos un fichero de texto cuya información es
Fecha [tab] Nombre Apellido1 Apellido2 Puntuación
Queremos como salida un listado de personas (con los nombres y apellidos) y la suma de todas sus puntuaciones.

Fichero de entrada score.txt:

01-11-2012 Pepe Perez Gonzalez 21
01-11-2012 Ana Lopez Fernandez 14
01-11-2012 Maria Garcia Martinez 11
01-11-2012 Pablo Sanchez Rodriguez 9
01-11-2012 Angel Martin Hernandez 3
15-11-2012 Pepe Perez Gonzalez 22
15-11-2012 Maria Garcia Martinez 15
15-11-2012 John Smith 13
01-12-2012 Pepe Perez Gonzalez 25
01-12-2012 Ana Lopez Fernandez 15
01-12-2012 Pablo Sanchez Rodriguez 8
01-12-2012 Maria Garcia Martinez 32
15-12-2012 Maria Garcia Martinez 47
15-12-2012 Pepe Perez Gonzalez 13
15-12-2012 Angel Martin Hernandez 13
15-12-2012 John Smith 27
01-01-2013 Ana Lopez Fernandez 5
01-01-2013 Pablo Sanchez Rodriguez 2
01-01-2013 Pepe Perez Gonzalez 17
01-01-2013 Maria Garcia Martinez 3
01-01-2013 Angel Martin Hernandez 32
01-01-2013 John Smith 21


Nuestra propia clase PersonaWritableComparable:
 
public class PersonaWritableComparable 
  implements WritableComparable<PersonaWritableComparable>{

 Text nombre, primerApellido, segundoApellido;
 
 public void set(String nom, String prApell, String sgApell){
  nombre.set(nom);
  primerApellido.set(prApell);
  segundoApellido.set(sgApell);
 }
 
 public PersonaWritableComparable() {
  this.nombre = new Text();
  this.primerApellido = new Text();
  this.segundoApellido = new Text();
 }

 public PersonaWritableComparable(Text nombre, 
   Text primerApellido, Text segundoApellido) {
  this.nombre = nombre;
  this.primerApellido = primerApellido;
  this.segundoApellido = segundoApellido;
 }
 
 @Override
 public void readFields(DataInput arg0) throws IOException {
  this.nombre.readFields(arg0);
  this.primerApellido.readFields(arg0);
  this.segundoApellido.readFields(arg0);
  
 }

 @Override
 public void write(DataOutput arg0) throws IOException {
  this.nombre.write(arg0);
  this.primerApellido.write(arg0);
  this.segundoApellido.write(arg0);
 }

 @Override
 public int compareTo(PersonaWritableComparable o) {
  if(this.nombre.compareTo(o.nombre) != 0){
   return this.nombre.compareTo(o.nombre);
  }else if(this.primerApellido.compareTo(o.primerApellido) != 0){
   return this.primerApellido.compareTo(o.primerApellido);
  }else if(this.segundoApellido.compareTo(o.segundoApellido) != 0){
   return this.segundoApellido.compareTo(o.segundoApellido);
  }
  return 0;
 }

 @Override
 public boolean equals(Object obj) {
  if(obj instanceof PersonaWritableComparable){
   PersonaWritableComparable p = (PersonaWritableComparable) obj;
   return this.nombre.equals(p.nombre) && 
    this.primerApellido.equals(p.primerApellido) && 
    this.segundoApellido.equals(p.segundoApellido);
  }
  return false;
 }

 @Override
 public int hashCode() {
  return this.nombre.hashCode()*163 + 
    this.primerApellido.hashCode()*163 + 
    this.segundoApellido.hashCode()*163;
 }
 
 @Override
 public String toString() {
  return nombre.toString()+" "+primerApellido.toString()+" "
   +segundoApellido.toString();
 }
}


El Driver de la aplicación:
 
public class PersonaScoreDriver {
 public static void main(String[] args) throws Exception {
  if(args.length != 2){
   System.out.println("Ha ocurrido un error en la entrada");
   System.exit(-1);
  }
  
  Configuration conf = new Configuration();
  Job job = new Job(conf);
  job.setJarByClass(PersonaScoreDriver.class);
  
  job.setJobName("Persona Score");
  
  job.setOutputKeyClass(PersonaWritableComparable.class);
  job.setOutputValueClass(IntWritable.class);

  FileInputFormat.setInputPaths(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  
  job.setMapperClass(PersonaScoreMapper.class);
  job.setReducerClass(PersonaScoreReducer.class);

  boolean success = job.waitForCompletion(true);
  System.exit(success ? 0:1);  
 }
}


La clase Mapper:
 
public class PersonaScoreMapper extends 
 Mapper<LongWritable, Text, 
 PersonaWritableComparable, IntWritable> {

 private IntWritable score = new IntWritable();
 PersonaWritableComparable persona = new PersonaWritableComparable();
 
 public void map(LongWritable key, Text values,
   Context context) throws IOException, InterruptedException {
  
  // El texto tiene este formato:
  // 01-11-2012 Maria Garcia Martinez 11
  // La fecha separada por tabulación, el resto con espacios
  String[] primerSplit = values.toString().split(" ");
  if(primerSplit.length == 2){
   String[] segundoSplit = primerSplit[1].split(" ");
   
   // Puede haber personas con un apellido o con dos
   if(segundoSplit.length == 3 || segundoSplit.length == 4){
    if(segundoSplit.length == 3){
     persona.set(segundoSplit[0], segundoSplit[1], "");
     score.set(Integer.valueOf(segundoSplit[2]));
    }else {
     persona.set(segundoSplit[0], segundoSplit[1], segundoSplit[2]);
     score.set(Integer.valueOf(segundoSplit[3]));
    }
    context.write(persona, score);
   } 
  }
 }
}


La clase Reducer
 
public class PersonaScoreReducer extends 
 Reducer<PersonaWritableComparable, IntWritable, 
 PersonaWritableComparable, IntWritable> {

 public void reduce(PersonaWritableComparable key, 
   Iterable<IntWritable> values,
   Context context) throws IOException, InterruptedException {
  
  int suma = 0;
  for (IntWritable value : values) {
   suma += value.get();
  }
  
  context.write(key, new IntWritable(suma));
 }
}

Ver también: Tipos de datos Hadoop e interfaces Writable y WritableComparable

viernes, 15 de marzo de 2013

Tipos de datos Hadoop e Interfaces Writable y WritableComparable

Writable y WritableComparable son las interfaces de Hadoop que se utilizan para la serialización.

Hadoop define sus propios tipos de objetos a partir de los tipos primitivos de Java. Todos ellos heredan de la interfaz WritableComparable, que a su vez hereda de la interfaz Writable y que permiten la Serialización (conversión de los datos en bytes para poder ser transmitidos por red o para su escritura en almacenes persistentes).

Algunos de los tipos son:
  • IntWritable                   ints
  • Text                             strings
  • DoubleWritable           doubles
  • FloatWritable               floats
  • LongWritable               longs
  • ByteWritable                bytes
  • NullWritable

Jerarquía de Clases (Hadoop the Definitive Guide)

De los tipos vistos, aunque son propios de Hadoop, nos son más o menos conocidos, todos excepto quizas el NullWritable. Este tipo de objeto se utiliza para cuando queremos eliminar la key o el value y que Hadoop lo pueda reconocer.
Por ejemplo, si en una salida de un Job MapReduce es de tipo <Text, Text> y tenemos la key siempre a nulo, entonces el fichero de salida tendrá un formato similar a:
[separador] mivalor1
[separador] mivalor2
...
siendo [separador] el carácter de separación (por defecto una tabulación).

Y si la key la definiésemos como NullWritable, la salida sería con este formato:
mivalor1
mivalor2
...
Lo cual nos da un fichero de salida más compacto y más apropiado para usarlo como entrada de otro Job MapReduce.



Durante los desarrollos de aplicaciones Hadoop, en algún momento tendremos la necesidad de crear nuestros propios objetos.
Por ejemplo, imaginemos que queremos como objeto los datos de una persona (nombre y los dos apellidos), sería fácil crear un objeto de tipo Text:

   Text persona = new Text (nombre+" "+primerApellido+" "+segundoApellido);

Como veis, separamos los datos con un espacio, y luego al hacer:

   String[] listaPersonas = persona.toString().split(" ");

Podría ser un problema si uno de los apellidos fuera compuesto y contuviera espacios.
Así que lo mejor sería crear tu propio objeto.

Es muy importante saber que las keys deben implementar siempre la interfaz WritableComparable, y los values la interfaz Writable.

La interfaz Writable

Contiene los métodos de serialización y deserialización readFields y write:
 
public interfaz Writable{
    void readFields(DataInput in);
    void write(DataOutput out);
}

Ejemplo:
 
public class PersonaWritable implements Writable {

 Text nombre, primerApellido, segundoApellido;
 
 public PersonaWritable() {
  this.nombre = new Text();
  this.primerApellido = new Text();
  this.segundoApellido = new Text();
 }

 public PersonaWritable(Text nombre, Text primerApellido,
   Text segundoApellido) {
  this.nombre = nombre;
  this.primerApellido = primerApellido;
  this.segundoApellido = segundoApellido;
 }

 @Override
 public void readFields(DataInput arg0) throws IOException {
  this.nombre.readFields(arg0);
  this.primerApellido.readFields(arg0);
  this.segundoApellido.readFields(arg0);
 }

 @Override
 public void write(DataOutput arg0) throws IOException {
  this.nombre.write(arg0);
  this.primerApellido.write(arg0);
  this.segundoApellido.write(arg0);
 }
 
 // Implementar getters y setters
}



La interfaz WritableComparable

Además de contener los métodos de serialización y deserialización readFields y write, debe implementar los métodos compareTo, hashCode y equals, ya que extiende, además de la interfaz Writable, de la Comparable <   >.

Ejemplo:
 
public class PersonaWritableComparable 
  implements WritableComparable<PersonaWritableComparable>{

 Text nombre, primerApellido, segundoApellido;
 
 public PersonaWritableComparable() {
  this.nombre = new Text();
  this.primerApellido = new Text();
  this.segundoApellido = new Text();
 }

 public PersonaWritableComparable(Text nombre, 
   Text primerApellido, Text segundoApellido) {
  this.nombre = nombre;
  this.primerApellido = primerApellido;
  this.segundoApellido = segundoApellido;
 }
 
 @Override
 public void readFields(DataInput arg0) throws IOException {
  this.nombre.readFields(arg0);
  this.primerApellido.readFields(arg0);
  this.segundoApellido.readFields(arg0);
  
 }

 @Override
 public void write(DataOutput arg0) throws IOException {
  this.nombre.write(arg0);
  this.primerApellido.write(arg0);
  this.segundoApellido.write(arg0);
 }

 @Override
 public int compareTo(PersonaWritableComparable o) {
  if(this.nombre.compareTo(o.nombre) != 0){
   return this.nombre.compareTo(o.nombre);
  }else if(this.primerApellido.compareTo(o.primerApellido) != 0){
   return this.primerApellido.compareTo(o.primerApellido);
  }else if(this.segundoApellido.compareTo(o.segundoApellido) != 0){
   return this.segundoApellido.compareTo(o.segundoApellido);
  }
  return 0;
 }

 @Override
 public boolean equals(Object obj) {
  if(obj instanceof PersonaWritable){
   PersonaWritableComparable p = (PersonaWritableComparable) obj;
   return this.nombre == p.nombre && 
     this.primerApellido == p.primerApellido && 
     this.segundoApellido == p.segundoApellido;
  }
  return false;
 }

 @Override
 public int hashCode() {
  return this.nombre.hashCode()*163 + 
    this.primerApellido.hashCode()*163 + 
    this.segundoApellido.hashCode()*163;
 }

 // Implementar getters y setters
}


Si en algún momento quieres desarrollar un tipo propio y la salida (output) va a ser con este tipo, también hará falta implementar el método toString()

Si tuviéramos que serializar objetos binarios, se haría a través de arrays de bytes

El método write sería:

  1. Serializar el objeto en un array de bytes
  2. Escribir el número de bytes
  3. Escribir el array de bytes
Y el método readFields sería:
  1. Leer el número de bytes
  2. Crear un array de bytes de ese tamaño
  3. Leer el array
  4. Deserializar el objeto

Ver también: Ejemplo de uso de Tipos de Datos propios con las interfaces Writable y WritableComparable