Recuerda que puedes descargarte algunos de los ejemplos en la pestaña de Código Fuente

domingo, 24 de marzo de 2013

Sequence Files: Ejemplo de creación y lectura a través del FileSystem

Como ya expliqué en esta entrada, FileSystem sirve para acceder a HDFS a través de la API de Java. Así que vamos a usar esta clase para crear y leer un SequenceFile.

Crear un SequenceFile

Crearemos un SequenceFile de un texto, que es un poema, y le pondremos como key el número correspondiente a cada línea.
 
public class CreateSequenceFile {

 private static final String[] POEMA = { 
  "El ciego sol se estrella",
  "en las duras aristas de las armas,",
  "llaga de luz los petos y espaldares",
  "y flamea en las puntas de las lanzas.",
  "El ciego sol, la sed y la fatiga",
  "Por la terrible estepa castellana,",
  "al destierro, con doce de los suyos",
  "-polvo, sudor y hierro- el Cid cabalga.",
  "Cerrado está el mesón a piedra y lodo.",
  "Nadie responde... Al pomo de la espada",
  "y al cuento de las picas el postigo",
  "va a ceder ¡Quema el sol, el aire abrasa!"};
 
 private static final String rutaDestino 
   = new String ("pruebas/poemasequencefile");
 
 public static void main(String[] args) throws IOException {
  Configuration conf = new Configuration();
  FileSystem fs = FileSystem.get( conf);
  Path path = new Path(rutaDestino);
  
  IntWritable key = new IntWritable();
  Text value = new Text();
  
  //Creamos el writer del SequenceFile para poder ir añadiendo
  // los pares key/value al fichero.
  SequenceFile.Writer writer = new SequenceFile.Writer(fs,  
    conf,  path, key.getClass(), value.getClass());
  
  for (int i = 0; i < POEMA.length; i++) { 
   // La key es el número de línea
   key.set(i+1); 
   // El value es la línea del poema correspondiente
   value.set(POEMA[i]); 
   // Escribimos el par en el sequenceFile 
   writer.append(key, value);
  }
  
  writer.close();
 }
}




Leer un SequenceFile

En este ejemplo no sólo vamos a leer el fichero creado anteriormente, sino que también vamos a buscar y a usar los puntos de sincronización, vamos a ver las posiciones del fichero y vamos a desplazarnos a alguna.
 
public class ReadSequenceFile {

 private static final String rutaOrigen 
   = new String ("pruebas/poemasequencefile");
 
 public static void main(String[] args) 
   throws Exception {
  
  Configuration conf = new Configuration();
  FileSystem fs = FileSystem.get( conf);
  Path path = new Path(rutaOrigen);
  
  //Creamos el Reader del SequenceFile
  SequenceFile.Reader reader = 
    new SequenceFile.Reader(fs, path, conf);
  // Leemos la key y value del SequenceFile, los tipos son conocidos,
  // por lo que se declaran variables de esos tipos.
  IntWritable key = 
    (IntWritable) reader.getKeyClass().newInstance();
  Text value = 
    (Text) reader.getValueClass().newInstance();
  
  StringBuilder strBuilder;
  boolean haySync = false;
  long posSync = 0;
  
  //Recorremos el reader recuperando los pares key/value
  while(reader.next(key,value)){
   
   // Comprobamos si la posición es un punto de sync
   // En principio en este fichero no encontrará ninguno ya que es muy
   // pequeño, si fuera uno más grande y tuviera varios puntos de sync
   // se guardará el último punto encontrado.
   if(reader.syncSeen()){
    haySync = true;
    posSync = reader.getPosition();
   }
   
   strBuilder = new StringBuilder("Posición: ").
     append(reader.getPosition()).append(" - Key: ").
     append(key.toString()).append(" Value: " ).
     append(value.toString());
   System.out.println(strBuilder);
  }
  
  if(haySync){
   // reader.sync posicionará el reader en el sync siguiente más próximo,
   // si no hay ninguno se posicionará al final del fichero.
   // En este caso se posicionará en el punto dado, ya que es de sync.
   strBuilder = new StringBuilder("Sync en el punto: ").
     append(posSync);
   System.out.println(strBuilder);
   reader.sync(posSync);
  }else{
   // Es un valor conocido, si no existiera, habría un error
   // al realizar el reader.next.
   posSync = 459;
   reader.seek(posSync);
  }
  
  // En un caso o en otro a pesar de haber finalizado la iteración 
  // hemos posicionado el reader en un punto intermedio, así que 
  // seguimos recorriéndolo (repetimos las líneas)
  // hasta finalizar de nuevo.
  strBuilder = new StringBuilder("Volvemos a la posición: ")
     .append(posSync);
  System.out.println(strBuilder);
  
  System.out.println("Seguimos recorriendo el reader: ");
  while(reader.next(key,value)){
   strBuilder = new StringBuilder("Posición: ").
     append(reader.getPosition()).append(" - Key: ").
     append(key.toString()).append(" Value: " ).
     append(value.toString());
   System.out.println(strBuilder);
  }
  
  reader.close();
 }

}

viernes, 22 de marzo de 2013

Sequence Files

Los SequenceFiles son unos tipos de ficheros de datos propios de Hadoop almacenados en forma de pares key/value y codificados de forma binaria.
Proporcionan una estructura de datos persistente para pares key/value de forma binaria.

En el mismo fichero se almacenan los metadatos que contienen la información de ese fichero (tipo de los datos, nombre, fecha,...).

Este tipo de ficheros se usa muy a menudo en Jobs MapReduce, sobre todo cuando la salida de un Job es la entrada de otro.
También, podemos imaginarlos como ficheros de log donde cada registro es una nueva línea.

Son un tipo de fichero muy adecuados para MapReduce porque además son splittable (que se pueden fragmentar).
Pueden almacenar distintos tipos de datos gracias a que usan una gran variedad de frameworks de serialización.

Soportan compresión, que puede ser de 3 tipos:
  • Uncompressed (No comprimido)
  • Record Compressed (Compresión a nivel de cada par key/value)
  • Block Compressed (Se comprime por bloques)
Sea cual sea el tipo de compresión que utilice, la estructura del encabezado (header) va a ser el mismo, sólo que éste contendrá la información necesaria para su posterior lectura.

Problemas: 
  • Sólo se puede acceder a ellos a través de la AP Java de Hadoop.
  • Si la definición de la key o value cambian, el fichero no se podrá leer.

Para entender de forma visual cómo es la estructura de un SequenceFile, empezamos viendo cómo sería el Header:

Estructura del Header

Esta sería la estructura de un SequenceFile Uncompressed o Record Compressed:

Estructura de un SequenceFile

Así sería la estructura de un registro cuando no está comprimido (Uncompressed):

Registro no comprimido

Y así sería cuando está comprimido por registro (Record Compressed):

Estructura de la compresión por registro


Aunque es la misma estructura de SequenceFile, no pueden existir formatos de compresión distintos dentro de un mismo fichero.

A continuación es el formato de un SequenceFile Block Compressed:

Estructura de un SequenceFile comprimido por bloques


Y por último el formato del bloque:

Estructura del bloque comprimido




Una propiedad de los SequenceFiles es que en la creación introducen puntos de sincronización (sync points).
Estos puntos se pueden utilizar cuando el reader se pierde, por ejemplo, si nos hemos desplazado a buscar en una posición cualquiera en nuestro flujo de datos. Aún más importante, estos puntos de sincronización sirven para definir los InputSplit de los Jobs MapReduce.
Estos sync points se crean automáticamente introduciéndolos cada cierto número de registros cuando realizamos el SequenceFile.Writer.
Se pueden localizar durante la lectura del SequenceFile con el SecuenceFile.Reader a través de:

      reader.syncSeen();

Otra opción que nos dan los SequenceFiles es, como hemos dicho antes, buscar una posición dada en este tipo de ficheros.
Si sabemos la posición exacta a la que queremos acceder, utilizaríamos el método:

      reader.seek(posicion);
      reader.next(key, value);


Si ponemos una posición que no existe daría un error IOException.


Si no conocemos la posición exacta, podemos utilizar:

      reader.sync (posicion)

El reader se posicionará en el siguiente sync point que encuentre después de posicion.
Puede ser el caso que el fichero sea muy pequeño y no exista ningún sync point, lo que sucederá entonces es que el reader se posicionará al final del fichero.

domingo, 17 de marzo de 2013

Ejemplo de uso de Tipos de Datos propios con las interfaces Writable y WritableComparable

Continuando con la entrada anterior en la que explicaba qué son las interfaces Writable y WritableComparable y cómo es posible crear nuestros propios tipos usándolas, vamos a ver un ejemplo.

El código fuente y el fichero de ejemplo de esta entrada también los podréis encontrar en este enlace.

Ejemplo de aplicación MapReduce utilizando nuestra propia clase como key.
Al programar nuestra clase PersonaWritableComparable que implementa WritableComparable, en la fase del Shuffle and Sort se consigue que el Reducer reciba las key ordenadas y con sus valores correspondientes agrupados para poder operar con ellos.

Recibimos un fichero de texto cuya información es
Fecha [tab] Nombre Apellido1 Apellido2 Puntuación
Queremos como salida un listado de personas (con los nombres y apellidos) y la suma de todas sus puntuaciones.

Fichero de entrada score.txt:

01-11-2012 Pepe Perez Gonzalez 21
01-11-2012 Ana Lopez Fernandez 14
01-11-2012 Maria Garcia Martinez 11
01-11-2012 Pablo Sanchez Rodriguez 9
01-11-2012 Angel Martin Hernandez 3
15-11-2012 Pepe Perez Gonzalez 22
15-11-2012 Maria Garcia Martinez 15
15-11-2012 John Smith 13
01-12-2012 Pepe Perez Gonzalez 25
01-12-2012 Ana Lopez Fernandez 15
01-12-2012 Pablo Sanchez Rodriguez 8
01-12-2012 Maria Garcia Martinez 32
15-12-2012 Maria Garcia Martinez 47
15-12-2012 Pepe Perez Gonzalez 13
15-12-2012 Angel Martin Hernandez 13
15-12-2012 John Smith 27
01-01-2013 Ana Lopez Fernandez 5
01-01-2013 Pablo Sanchez Rodriguez 2
01-01-2013 Pepe Perez Gonzalez 17
01-01-2013 Maria Garcia Martinez 3
01-01-2013 Angel Martin Hernandez 32
01-01-2013 John Smith 21


Nuestra propia clase PersonaWritableComparable:
 
public class PersonaWritableComparable 
  implements WritableComparable<PersonaWritableComparable>{

 Text nombre, primerApellido, segundoApellido;
 
 public void set(String nom, String prApell, String sgApell){
  nombre.set(nom);
  primerApellido.set(prApell);
  segundoApellido.set(sgApell);
 }
 
 public PersonaWritableComparable() {
  this.nombre = new Text();
  this.primerApellido = new Text();
  this.segundoApellido = new Text();
 }

 public PersonaWritableComparable(Text nombre, 
   Text primerApellido, Text segundoApellido) {
  this.nombre = nombre;
  this.primerApellido = primerApellido;
  this.segundoApellido = segundoApellido;
 }
 
 @Override
 public void readFields(DataInput arg0) throws IOException {
  this.nombre.readFields(arg0);
  this.primerApellido.readFields(arg0);
  this.segundoApellido.readFields(arg0);
  
 }

 @Override
 public void write(DataOutput arg0) throws IOException {
  this.nombre.write(arg0);
  this.primerApellido.write(arg0);
  this.segundoApellido.write(arg0);
 }

 @Override
 public int compareTo(PersonaWritableComparable o) {
  if(this.nombre.compareTo(o.nombre) != 0){
   return this.nombre.compareTo(o.nombre);
  }else if(this.primerApellido.compareTo(o.primerApellido) != 0){
   return this.primerApellido.compareTo(o.primerApellido);
  }else if(this.segundoApellido.compareTo(o.segundoApellido) != 0){
   return this.segundoApellido.compareTo(o.segundoApellido);
  }
  return 0;
 }

 @Override
 public boolean equals(Object obj) {
  if(obj instanceof PersonaWritableComparable){
   PersonaWritableComparable p = (PersonaWritableComparable) obj;
   return this.nombre.equals(p.nombre) && 
    this.primerApellido.equals(p.primerApellido) && 
    this.segundoApellido.equals(p.segundoApellido);
  }
  return false;
 }

 @Override
 public int hashCode() {
  return this.nombre.hashCode()*163 + 
    this.primerApellido.hashCode()*163 + 
    this.segundoApellido.hashCode()*163;
 }
 
 @Override
 public String toString() {
  return nombre.toString()+" "+primerApellido.toString()+" "
   +segundoApellido.toString();
 }
}


El Driver de la aplicación:
 
public class PersonaScoreDriver {
 public static void main(String[] args) throws Exception {
  if(args.length != 2){
   System.out.println("Ha ocurrido un error en la entrada");
   System.exit(-1);
  }
  
  Configuration conf = new Configuration();
  Job job = new Job(conf);
  job.setJarByClass(PersonaScoreDriver.class);
  
  job.setJobName("Persona Score");
  
  job.setOutputKeyClass(PersonaWritableComparable.class);
  job.setOutputValueClass(IntWritable.class);

  FileInputFormat.setInputPaths(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  
  job.setMapperClass(PersonaScoreMapper.class);
  job.setReducerClass(PersonaScoreReducer.class);

  boolean success = job.waitForCompletion(true);
  System.exit(success ? 0:1);  
 }
}


La clase Mapper:
 
public class PersonaScoreMapper extends 
 Mapper<LongWritable, Text, 
 PersonaWritableComparable, IntWritable> {

 private IntWritable score = new IntWritable();
 PersonaWritableComparable persona = new PersonaWritableComparable();
 
 public void map(LongWritable key, Text values,
   Context context) throws IOException, InterruptedException {
  
  // El texto tiene este formato:
  // 01-11-2012 Maria Garcia Martinez 11
  // La fecha separada por tabulación, el resto con espacios
  String[] primerSplit = values.toString().split(" ");
  if(primerSplit.length == 2){
   String[] segundoSplit = primerSplit[1].split(" ");
   
   // Puede haber personas con un apellido o con dos
   if(segundoSplit.length == 3 || segundoSplit.length == 4){
    if(segundoSplit.length == 3){
     persona.set(segundoSplit[0], segundoSplit[1], "");
     score.set(Integer.valueOf(segundoSplit[2]));
    }else {
     persona.set(segundoSplit[0], segundoSplit[1], segundoSplit[2]);
     score.set(Integer.valueOf(segundoSplit[3]));
    }
    context.write(persona, score);
   } 
  }
 }
}


La clase Reducer
 
public class PersonaScoreReducer extends 
 Reducer<PersonaWritableComparable, IntWritable, 
 PersonaWritableComparable, IntWritable> {

 public void reduce(PersonaWritableComparable key, 
   Iterable<IntWritable> values,
   Context context) throws IOException, InterruptedException {
  
  int suma = 0;
  for (IntWritable value : values) {
   suma += value.get();
  }
  
  context.write(key, new IntWritable(suma));
 }
}

Ver también: Tipos de datos Hadoop e interfaces Writable y WritableComparable