Recuerda que puedes descargarte algunos de los ejemplos en la pestaña de Código Fuente
Mostrando entradas con la etiqueta Ejemplo. Mostrar todas las entradas
Mostrando entradas con la etiqueta Ejemplo. Mostrar todas las entradas

martes, 16 de abril de 2013

Ejemplo de Partitioner

En la entrada anterior vimos qué es el Partitioner, ahora toca ver un ejemplo y el código desarrollado para hacer ese ejemplo.

Atención si vais a hacer las pruebas en local ya que no funciona, el modo local sólo tiene un Reducer y por eso es mejor usar por lo menos el modo pseudo distribuido.

En este ejemplo, a partir del fichero scores.txt con la forma:

01-11-2012 Pepe Perez Gonzalez 21
01-11-2012 Ana Lopez Fernandez 14
15-02-2013 Angel Martin Hernandez 3
01-11-2012 Maria Garcia Martinez 11
01-11-2012 Pablo Sanchez Rodriguez 9
01-11-2012 Angel Martin Hernandez 3
15-01-2013 Pepe Perez Gonzalez 17
15-01-2013 Maria Garcia Martinez 3
...

Queremos dividir los datos por año y enviar a cada Reducer las personas que han jugado en ese año. A través de un Partitioner vamos a indicar a qué Reducer va cada registro.

El Driver es parecido a todo lo que hemos visto hasta ahora, lo único que hemos definido que el InputFormat será un KeyValueTextInputFormat (ya que la fecha está separado por una tabulación del resto de la línea, así que este formato reconocerá la entrada).
Y luego añadimos en las configuraciones nuestra clase Partitioner y el número de tareas Reduce que queremos (nuestro fichero sólo tiene datos de 2 años (2012 y 2013), entonces serán 2 tareas Reducer).


public class PersonaScoreDriver {
 public static void main(String[] args) throws Exception {
  Configuration conf = new Configuration();
  Job job = new Job(conf);
  job.setJarByClass(PersonaScoreDriver.class);
  
  job.setJobName("Persona Score");
  
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);

  FileInputFormat.setInputPaths(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  
  job.setInputFormatClass(KeyValueTextInputFormat.class);
  //Establecemos el número de tareas Reduce
  job.setNumReduceTasks(2);
  
  job.setMapperClass(PersonaScoreMapper.class);
  job.setReducerClass(PersonaScoreReducer.class);
  //Indicamos cuál es nuestro partitioner
  job.setPartitionerClass(PersonaScorePartitioner.class);

  boolean success = job.waitForCompletion(true);
  System.exit(success ? 0:1);  
 }
}

En el Mapper lo único que hacemos es sacar el nombre y apellidos del value y emite un par key/value enviando como key la fecha y como value la persona.

public class PersonaScoreMapper extends 
 Mapper<Text, Text, Text, Text> {
 
 Text persona = new Text();
 
 @Override
 public void map(Text key, Text values,
   Context context) throws IOException, InterruptedException {
  
  String[] personaSplit = values.toString().split(" ");
  StringBuilder persBuilder = new StringBuilder();
  // Puede haber personas con un apellido o con dos
  if(personaSplit.length == 3 || personaSplit.length == 4){
   if(personaSplit.length == 3){
    persBuilder.append(personaSplit[0]).append(" ")
     .append(personaSplit[1]);
   }else {
    persBuilder.append(personaSplit[0]).append(" ")
     .append(personaSplit[1]).append(" ")
     .append(personaSplit[2]);
   }
   persona.set(persBuilder.toString());
   context.write(key, persona);
  }
 }
}


La clase Reducer lo único que hace es recoger la key con su lista de values correspondientes, recorrer esa lista y emitir cada par key/value con la fecha y el nombre. Al haber realizado el Partiiioner, un mismo reducer procesará las keys de un mismo año.

public class PersonaScoreReducer extends 
 Reducer<Text, Text, Text, Text> {

 @Override
 public void reduce(Text key, Iterable<Text> values,
   Context context) throws IOException, InterruptedException {
  for (Text value : values) {
   context.write(key, value);
  }  
 }
}


Por último el Partitioner, que lo que hace es devolver un entero indicando cuál es el Reducer al que irán los datos intermedios generados por el Mapper.

public class PersonaScorePartitioner extends Partitioner<Text, Text> {
 
 @Override
 public int getPartition(Text key, Text value, int numPartitions) {
  
  if(key.toString().endsWith("2012")){
   return 0;
  }else{
   return 1;
  }
 }
}


Una vez visto el desarrollo y cómo quedaría el código sólo quedaría exportar las clases como jar (tal y como vimos en esta entrada) al directorio que tengamos preparado para la ejecución de Jobs en modo pseudo-distribuído y lo lanzaríamos (previamente habiendo puesto en HDFS el fichero scorePartMezcla).
También os recuerdo que las clases las podréis encontrar en la sección de Código Fuente

hadoop jar training/jars/EjemploPartitioner.jar PersonaScoreDriver pruebas/scorePartMezcla pruebas/resultados/ejemploPartitioner

Y este debería ser el resultado si listamos el contenido del directorio ejemploPartitioner:

elena:hadoop elena$ hadoop fs -ls pruebas/resultados/ejemploPartitioner
Found 4 items
-rw-r--r--   1 elena supergroup          0 2013-04-02 19:34 /user/elena/pruebas/resultados/ejemploPartitioner/_SUCCESS
drwxr-xr-x   - elena supergroup          0 2013-04-02 19:34 /user/elena/pruebas/resultados/ejemploPartitioner/_logs
-rw-r--r--   1 elena supergroup        562 2013-04-02 19:34 /user/elena/pruebas/resultados/ejemploPartitioner/part-r-00000
-rw-r--r--   1 elena supergroup        684 2013-04-02 19:34 /user/elena/pruebas/resultados/ejemploPartitioner/part-r-00001



Y en cada fichero quedaría el siguiente contenido.

En el part-r-00000 que correspondería al año 2012 y que le habíamos asignado el valor 0:


elena:hadoop elena$ hadoop-1.0.4/bin/hadoop fs -cat pruebas/resultados/ejemploPartitioner/part-r-00000
01-11-2012 Angel Martin Hernandez
01-11-2012 Maria Garcia Martinez
01-11-2012 Ana Lopez Fernandez
01-11-2012 Pablo Sanchez Rodriguez
01-11-2012 Pepe Perez Gonzalez
01-12-2012 Maria Garcia Martinez
01-12-2012 Pepe Perez Gonzalez
01-12-2012 Pablo Sanchez Rodriguez
01-12-2012 Ana Lopez Fernandez
15-11-2012 Pepe Perez Gonzalez
15-11-2012 Maria Garcia Martinez
15-11-2012 John Smith
15-11-2012 Cristina Ruiz Gomez
15-12-2012 John Smith
15-12-2012 Cristina Ruiz Gomez
15-12-2012 Maria Garcia Martinez
15-12-2012 Pepe Perez Gonzalez
15-12-2012 Angel Martin Hernandez




En el part-r-00001 que correspondería al año 2013 y que le habíamos asignado el valor 1:

elena:hadoop elena$ hadoop-1.0.4/bin/hadoop fs -cat pruebas/resultados/ejemploPartitioner/part-r-00001
01-01-2013 Ana Lopez Fernandez
01-01-2013 John Smith
01-01-2013 Pablo Sanchez Rodriguez
01-01-2013 Pepe Perez Gonzalez
01-01-2013 Maria Garcia Martinez
01-01-2013 Angel Martin Hernandez
01-02-2013 Ana Lopez Fernandez
01-02-2013 Cristina Ruiz Gomez
01-02-2013 Maria Garcia Martinez
01-02-2013 Pepe Perez Gonzalez
15-01-2013 Angel Martin Hernandez
15-01-2013 Maria Garcia Martinez
15-01-2013 Pepe Perez Gonzalez
15-01-2013 John Smith
15-01-2013 Pablo Sanchez Rodriguez
15-02-2013 Pepe Perez Gonzalez
15-02-2013 John Smith
15-02-2013 Pablo Sanchez Rodriguez
15-02-2013 Maria Garcia Martinez
15-02-2013 Ana Lopez Fernandez
15-02-2013 Cristina Ruiz Gomez
15-02-2013 Angel Martin Hernandez


domingo, 24 de marzo de 2013

Sequence Files: Ejemplo de creación y lectura a través del FileSystem

Como ya expliqué en esta entrada, FileSystem sirve para acceder a HDFS a través de la API de Java. Así que vamos a usar esta clase para crear y leer un SequenceFile.

Crear un SequenceFile

Crearemos un SequenceFile de un texto, que es un poema, y le pondremos como key el número correspondiente a cada línea.
 
public class CreateSequenceFile {

 private static final String[] POEMA = { 
  "El ciego sol se estrella",
  "en las duras aristas de las armas,",
  "llaga de luz los petos y espaldares",
  "y flamea en las puntas de las lanzas.",
  "El ciego sol, la sed y la fatiga",
  "Por la terrible estepa castellana,",
  "al destierro, con doce de los suyos",
  "-polvo, sudor y hierro- el Cid cabalga.",
  "Cerrado está el mesón a piedra y lodo.",
  "Nadie responde... Al pomo de la espada",
  "y al cuento de las picas el postigo",
  "va a ceder ¡Quema el sol, el aire abrasa!"};
 
 private static final String rutaDestino 
   = new String ("pruebas/poemasequencefile");
 
 public static void main(String[] args) throws IOException {
  Configuration conf = new Configuration();
  FileSystem fs = FileSystem.get( conf);
  Path path = new Path(rutaDestino);
  
  IntWritable key = new IntWritable();
  Text value = new Text();
  
  //Creamos el writer del SequenceFile para poder ir añadiendo
  // los pares key/value al fichero.
  SequenceFile.Writer writer = new SequenceFile.Writer(fs,  
    conf,  path, key.getClass(), value.getClass());
  
  for (int i = 0; i < POEMA.length; i++) { 
   // La key es el número de línea
   key.set(i+1); 
   // El value es la línea del poema correspondiente
   value.set(POEMA[i]); 
   // Escribimos el par en el sequenceFile 
   writer.append(key, value);
  }
  
  writer.close();
 }
}




Leer un SequenceFile

En este ejemplo no sólo vamos a leer el fichero creado anteriormente, sino que también vamos a buscar y a usar los puntos de sincronización, vamos a ver las posiciones del fichero y vamos a desplazarnos a alguna.
 
public class ReadSequenceFile {

 private static final String rutaOrigen 
   = new String ("pruebas/poemasequencefile");
 
 public static void main(String[] args) 
   throws Exception {
  
  Configuration conf = new Configuration();
  FileSystem fs = FileSystem.get( conf);
  Path path = new Path(rutaOrigen);
  
  //Creamos el Reader del SequenceFile
  SequenceFile.Reader reader = 
    new SequenceFile.Reader(fs, path, conf);
  // Leemos la key y value del SequenceFile, los tipos son conocidos,
  // por lo que se declaran variables de esos tipos.
  IntWritable key = 
    (IntWritable) reader.getKeyClass().newInstance();
  Text value = 
    (Text) reader.getValueClass().newInstance();
  
  StringBuilder strBuilder;
  boolean haySync = false;
  long posSync = 0;
  
  //Recorremos el reader recuperando los pares key/value
  while(reader.next(key,value)){
   
   // Comprobamos si la posición es un punto de sync
   // En principio en este fichero no encontrará ninguno ya que es muy
   // pequeño, si fuera uno más grande y tuviera varios puntos de sync
   // se guardará el último punto encontrado.
   if(reader.syncSeen()){
    haySync = true;
    posSync = reader.getPosition();
   }
   
   strBuilder = new StringBuilder("Posición: ").
     append(reader.getPosition()).append(" - Key: ").
     append(key.toString()).append(" Value: " ).
     append(value.toString());
   System.out.println(strBuilder);
  }
  
  if(haySync){
   // reader.sync posicionará el reader en el sync siguiente más próximo,
   // si no hay ninguno se posicionará al final del fichero.
   // En este caso se posicionará en el punto dado, ya que es de sync.
   strBuilder = new StringBuilder("Sync en el punto: ").
     append(posSync);
   System.out.println(strBuilder);
   reader.sync(posSync);
  }else{
   // Es un valor conocido, si no existiera, habría un error
   // al realizar el reader.next.
   posSync = 459;
   reader.seek(posSync);
  }
  
  // En un caso o en otro a pesar de haber finalizado la iteración 
  // hemos posicionado el reader en un punto intermedio, así que 
  // seguimos recorriéndolo (repetimos las líneas)
  // hasta finalizar de nuevo.
  strBuilder = new StringBuilder("Volvemos a la posición: ")
     .append(posSync);
  System.out.println(strBuilder);
  
  System.out.println("Seguimos recorriendo el reader: ");
  while(reader.next(key,value)){
   strBuilder = new StringBuilder("Posición: ").
     append(reader.getPosition()).append(" - Key: ").
     append(key.toString()).append(" Value: " ).
     append(value.toString());
   System.out.println(strBuilder);
  }
  
  reader.close();
 }

}

domingo, 17 de marzo de 2013

Ejemplo de uso de Tipos de Datos propios con las interfaces Writable y WritableComparable

Continuando con la entrada anterior en la que explicaba qué son las interfaces Writable y WritableComparable y cómo es posible crear nuestros propios tipos usándolas, vamos a ver un ejemplo.

El código fuente y el fichero de ejemplo de esta entrada también los podréis encontrar en este enlace.

Ejemplo de aplicación MapReduce utilizando nuestra propia clase como key.
Al programar nuestra clase PersonaWritableComparable que implementa WritableComparable, en la fase del Shuffle and Sort se consigue que el Reducer reciba las key ordenadas y con sus valores correspondientes agrupados para poder operar con ellos.

Recibimos un fichero de texto cuya información es
Fecha [tab] Nombre Apellido1 Apellido2 Puntuación
Queremos como salida un listado de personas (con los nombres y apellidos) y la suma de todas sus puntuaciones.

Fichero de entrada score.txt:

01-11-2012 Pepe Perez Gonzalez 21
01-11-2012 Ana Lopez Fernandez 14
01-11-2012 Maria Garcia Martinez 11
01-11-2012 Pablo Sanchez Rodriguez 9
01-11-2012 Angel Martin Hernandez 3
15-11-2012 Pepe Perez Gonzalez 22
15-11-2012 Maria Garcia Martinez 15
15-11-2012 John Smith 13
01-12-2012 Pepe Perez Gonzalez 25
01-12-2012 Ana Lopez Fernandez 15
01-12-2012 Pablo Sanchez Rodriguez 8
01-12-2012 Maria Garcia Martinez 32
15-12-2012 Maria Garcia Martinez 47
15-12-2012 Pepe Perez Gonzalez 13
15-12-2012 Angel Martin Hernandez 13
15-12-2012 John Smith 27
01-01-2013 Ana Lopez Fernandez 5
01-01-2013 Pablo Sanchez Rodriguez 2
01-01-2013 Pepe Perez Gonzalez 17
01-01-2013 Maria Garcia Martinez 3
01-01-2013 Angel Martin Hernandez 32
01-01-2013 John Smith 21


Nuestra propia clase PersonaWritableComparable:
 
public class PersonaWritableComparable 
  implements WritableComparable<PersonaWritableComparable>{

 Text nombre, primerApellido, segundoApellido;
 
 public void set(String nom, String prApell, String sgApell){
  nombre.set(nom);
  primerApellido.set(prApell);
  segundoApellido.set(sgApell);
 }
 
 public PersonaWritableComparable() {
  this.nombre = new Text();
  this.primerApellido = new Text();
  this.segundoApellido = new Text();
 }

 public PersonaWritableComparable(Text nombre, 
   Text primerApellido, Text segundoApellido) {
  this.nombre = nombre;
  this.primerApellido = primerApellido;
  this.segundoApellido = segundoApellido;
 }
 
 @Override
 public void readFields(DataInput arg0) throws IOException {
  this.nombre.readFields(arg0);
  this.primerApellido.readFields(arg0);
  this.segundoApellido.readFields(arg0);
  
 }

 @Override
 public void write(DataOutput arg0) throws IOException {
  this.nombre.write(arg0);
  this.primerApellido.write(arg0);
  this.segundoApellido.write(arg0);
 }

 @Override
 public int compareTo(PersonaWritableComparable o) {
  if(this.nombre.compareTo(o.nombre) != 0){
   return this.nombre.compareTo(o.nombre);
  }else if(this.primerApellido.compareTo(o.primerApellido) != 0){
   return this.primerApellido.compareTo(o.primerApellido);
  }else if(this.segundoApellido.compareTo(o.segundoApellido) != 0){
   return this.segundoApellido.compareTo(o.segundoApellido);
  }
  return 0;
 }

 @Override
 public boolean equals(Object obj) {
  if(obj instanceof PersonaWritableComparable){
   PersonaWritableComparable p = (PersonaWritableComparable) obj;
   return this.nombre.equals(p.nombre) && 
    this.primerApellido.equals(p.primerApellido) && 
    this.segundoApellido.equals(p.segundoApellido);
  }
  return false;
 }

 @Override
 public int hashCode() {
  return this.nombre.hashCode()*163 + 
    this.primerApellido.hashCode()*163 + 
    this.segundoApellido.hashCode()*163;
 }
 
 @Override
 public String toString() {
  return nombre.toString()+" "+primerApellido.toString()+" "
   +segundoApellido.toString();
 }
}


El Driver de la aplicación:
 
public class PersonaScoreDriver {
 public static void main(String[] args) throws Exception {
  if(args.length != 2){
   System.out.println("Ha ocurrido un error en la entrada");
   System.exit(-1);
  }
  
  Configuration conf = new Configuration();
  Job job = new Job(conf);
  job.setJarByClass(PersonaScoreDriver.class);
  
  job.setJobName("Persona Score");
  
  job.setOutputKeyClass(PersonaWritableComparable.class);
  job.setOutputValueClass(IntWritable.class);

  FileInputFormat.setInputPaths(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  
  job.setMapperClass(PersonaScoreMapper.class);
  job.setReducerClass(PersonaScoreReducer.class);

  boolean success = job.waitForCompletion(true);
  System.exit(success ? 0:1);  
 }
}


La clase Mapper:
 
public class PersonaScoreMapper extends 
 Mapper<LongWritable, Text, 
 PersonaWritableComparable, IntWritable> {

 private IntWritable score = new IntWritable();
 PersonaWritableComparable persona = new PersonaWritableComparable();
 
 public void map(LongWritable key, Text values,
   Context context) throws IOException, InterruptedException {
  
  // El texto tiene este formato:
  // 01-11-2012 Maria Garcia Martinez 11
  // La fecha separada por tabulación, el resto con espacios
  String[] primerSplit = values.toString().split(" ");
  if(primerSplit.length == 2){
   String[] segundoSplit = primerSplit[1].split(" ");
   
   // Puede haber personas con un apellido o con dos
   if(segundoSplit.length == 3 || segundoSplit.length == 4){
    if(segundoSplit.length == 3){
     persona.set(segundoSplit[0], segundoSplit[1], "");
     score.set(Integer.valueOf(segundoSplit[2]));
    }else {
     persona.set(segundoSplit[0], segundoSplit[1], segundoSplit[2]);
     score.set(Integer.valueOf(segundoSplit[3]));
    }
    context.write(persona, score);
   } 
  }
 }
}


La clase Reducer
 
public class PersonaScoreReducer extends 
 Reducer<PersonaWritableComparable, IntWritable, 
 PersonaWritableComparable, IntWritable> {

 public void reduce(PersonaWritableComparable key, 
   Iterable<IntWritable> values,
   Context context) throws IOException, InterruptedException {
  
  int suma = 0;
  for (IntWritable value : values) {
   suma += value.get();
  }
  
  context.write(key, new IntWritable(suma));
 }
}

Ver también: Tipos de datos Hadoop e interfaces Writable y WritableComparable

jueves, 17 de enero de 2013

MapReduce: Ejemplo teórico

Nada mejor que un buen ejemplo para entender la teoría vista en la entrada anterior (MapReduce)

Imaginemos que tenemos un fichero score.txt con las puntuaciones de un salón de juego.
La estructura es sencilla: fecha del juego, nombre jugador y puntuación:

01-11-2012 Maria 11
01-11-2012 Pablo 9
01-11-2012 Angel 3
23-11-2012 Pablo 22
23-11-2012 Maria 15
15-12-2012 Pablo 32
15-12-2012 Maria 47
15-12-2012 Angel 13
01-01-2013 Pablo 2
01-01-2013 Maria 3
01-01-2013 Angel 32


A partir de estos datos queremos sacar la suma total de puntuaciones de cada usuario.

Las siguientes líneas van a representar los pares (key, value) que recibe el map teniendo en cuenta que las key son el offset de la línea (en bytes), y los value son la línea de texto entera.:

(0, 01-11-2012 Maria 11)
(22, 01-11-2012 Pablo 9)
(45, 01-11-2012 Angel 3)
(65, 23-11-2012 Pablo 22)
(86, 23-11-2012 Maria 15)
(110, 15-12-2012 Pablo 32)
(134, 15-12-2012 Maria 47)
(158, 15-12-2012 Angel 13)
(182, 01-01-2013 Pablo 2)
(205, 01-01-2013 Maria 3)
(225, 01-01-2013 Angel 32)


La función map lo que hará entonces será tratar cada línea recogiendo sólo la información que nos interesa y omitiendo el resto, para finalmente emitir para cada línea un par key/value con el nombre del jugador como key y la puntuación como value:

(Maria, 11)
(Pablo, 9)
(Angel, 3)
(Pablo, 22)
(Maria, 15)
(Pablo, 32)
(Maria, 47)
(Angel, 13)
(Pablo, 2)
(Maria, 3)
(Angel, 32)

Esta salida del map va a ser tratada a continuación por el Shuffle and Short antes de enviar los datos a la función reduce. En esta fase las key se van a ordenar y los value se van a agrupar por cada key, y el resultado será la entrada de la función reduce, quedando de la siguiente forma:

(Angel, [3, 13, 32])
(Maria, [11, 15, 47, 3])
(Pablo, [9, 22, 32, 2])

Así que la función reduce va a recibir como entrada cada una de estas líneas, siendo una key y por cada key una lista de value. Por cada línea el reduce emitirá la suma de las puntuaciones de cada usuario:

(Angel, 48)
(Maria, 76)
(Pablo, 65)


Flujo de Ejemplo de MapReduce