Recuerda que puedes descargarte algunos de los ejemplos en la pestaña de Código Fuente
Mostrando entradas con la etiqueta Driver. Mostrar todas las entradas
Mostrando entradas con la etiqueta Driver. Mostrar todas las entradas

domingo, 17 de marzo de 2013

Ejemplo de uso de Tipos de Datos propios con las interfaces Writable y WritableComparable

Continuando con la entrada anterior en la que explicaba qué son las interfaces Writable y WritableComparable y cómo es posible crear nuestros propios tipos usándolas, vamos a ver un ejemplo.

El código fuente y el fichero de ejemplo de esta entrada también los podréis encontrar en este enlace.

Ejemplo de aplicación MapReduce utilizando nuestra propia clase como key.
Al programar nuestra clase PersonaWritableComparable que implementa WritableComparable, en la fase del Shuffle and Sort se consigue que el Reducer reciba las key ordenadas y con sus valores correspondientes agrupados para poder operar con ellos.

Recibimos un fichero de texto cuya información es
Fecha [tab] Nombre Apellido1 Apellido2 Puntuación
Queremos como salida un listado de personas (con los nombres y apellidos) y la suma de todas sus puntuaciones.

Fichero de entrada score.txt:

01-11-2012 Pepe Perez Gonzalez 21
01-11-2012 Ana Lopez Fernandez 14
01-11-2012 Maria Garcia Martinez 11
01-11-2012 Pablo Sanchez Rodriguez 9
01-11-2012 Angel Martin Hernandez 3
15-11-2012 Pepe Perez Gonzalez 22
15-11-2012 Maria Garcia Martinez 15
15-11-2012 John Smith 13
01-12-2012 Pepe Perez Gonzalez 25
01-12-2012 Ana Lopez Fernandez 15
01-12-2012 Pablo Sanchez Rodriguez 8
01-12-2012 Maria Garcia Martinez 32
15-12-2012 Maria Garcia Martinez 47
15-12-2012 Pepe Perez Gonzalez 13
15-12-2012 Angel Martin Hernandez 13
15-12-2012 John Smith 27
01-01-2013 Ana Lopez Fernandez 5
01-01-2013 Pablo Sanchez Rodriguez 2
01-01-2013 Pepe Perez Gonzalez 17
01-01-2013 Maria Garcia Martinez 3
01-01-2013 Angel Martin Hernandez 32
01-01-2013 John Smith 21


Nuestra propia clase PersonaWritableComparable:
 
public class PersonaWritableComparable 
  implements WritableComparable<PersonaWritableComparable>{

 Text nombre, primerApellido, segundoApellido;
 
 public void set(String nom, String prApell, String sgApell){
  nombre.set(nom);
  primerApellido.set(prApell);
  segundoApellido.set(sgApell);
 }
 
 public PersonaWritableComparable() {
  this.nombre = new Text();
  this.primerApellido = new Text();
  this.segundoApellido = new Text();
 }

 public PersonaWritableComparable(Text nombre, 
   Text primerApellido, Text segundoApellido) {
  this.nombre = nombre;
  this.primerApellido = primerApellido;
  this.segundoApellido = segundoApellido;
 }
 
 @Override
 public void readFields(DataInput arg0) throws IOException {
  this.nombre.readFields(arg0);
  this.primerApellido.readFields(arg0);
  this.segundoApellido.readFields(arg0);
  
 }

 @Override
 public void write(DataOutput arg0) throws IOException {
  this.nombre.write(arg0);
  this.primerApellido.write(arg0);
  this.segundoApellido.write(arg0);
 }

 @Override
 public int compareTo(PersonaWritableComparable o) {
  if(this.nombre.compareTo(o.nombre) != 0){
   return this.nombre.compareTo(o.nombre);
  }else if(this.primerApellido.compareTo(o.primerApellido) != 0){
   return this.primerApellido.compareTo(o.primerApellido);
  }else if(this.segundoApellido.compareTo(o.segundoApellido) != 0){
   return this.segundoApellido.compareTo(o.segundoApellido);
  }
  return 0;
 }

 @Override
 public boolean equals(Object obj) {
  if(obj instanceof PersonaWritableComparable){
   PersonaWritableComparable p = (PersonaWritableComparable) obj;
   return this.nombre.equals(p.nombre) && 
    this.primerApellido.equals(p.primerApellido) && 
    this.segundoApellido.equals(p.segundoApellido);
  }
  return false;
 }

 @Override
 public int hashCode() {
  return this.nombre.hashCode()*163 + 
    this.primerApellido.hashCode()*163 + 
    this.segundoApellido.hashCode()*163;
 }
 
 @Override
 public String toString() {
  return nombre.toString()+" "+primerApellido.toString()+" "
   +segundoApellido.toString();
 }
}


El Driver de la aplicación:
 
public class PersonaScoreDriver {
 public static void main(String[] args) throws Exception {
  if(args.length != 2){
   System.out.println("Ha ocurrido un error en la entrada");
   System.exit(-1);
  }
  
  Configuration conf = new Configuration();
  Job job = new Job(conf);
  job.setJarByClass(PersonaScoreDriver.class);
  
  job.setJobName("Persona Score");
  
  job.setOutputKeyClass(PersonaWritableComparable.class);
  job.setOutputValueClass(IntWritable.class);

  FileInputFormat.setInputPaths(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  
  job.setMapperClass(PersonaScoreMapper.class);
  job.setReducerClass(PersonaScoreReducer.class);

  boolean success = job.waitForCompletion(true);
  System.exit(success ? 0:1);  
 }
}


La clase Mapper:
 
public class PersonaScoreMapper extends 
 Mapper<LongWritable, Text, 
 PersonaWritableComparable, IntWritable> {

 private IntWritable score = new IntWritable();
 PersonaWritableComparable persona = new PersonaWritableComparable();
 
 public void map(LongWritable key, Text values,
   Context context) throws IOException, InterruptedException {
  
  // El texto tiene este formato:
  // 01-11-2012 Maria Garcia Martinez 11
  // La fecha separada por tabulación, el resto con espacios
  String[] primerSplit = values.toString().split(" ");
  if(primerSplit.length == 2){
   String[] segundoSplit = primerSplit[1].split(" ");
   
   // Puede haber personas con un apellido o con dos
   if(segundoSplit.length == 3 || segundoSplit.length == 4){
    if(segundoSplit.length == 3){
     persona.set(segundoSplit[0], segundoSplit[1], "");
     score.set(Integer.valueOf(segundoSplit[2]));
    }else {
     persona.set(segundoSplit[0], segundoSplit[1], segundoSplit[2]);
     score.set(Integer.valueOf(segundoSplit[3]));
    }
    context.write(persona, score);
   } 
  }
 }
}


La clase Reducer
 
public class PersonaScoreReducer extends 
 Reducer<PersonaWritableComparable, IntWritable, 
 PersonaWritableComparable, IntWritable> {

 public void reduce(PersonaWritableComparable key, 
   Iterable<IntWritable> values,
   Context context) throws IOException, InterruptedException {
  
  int suma = 0;
  for (IntWritable value : values) {
   suma += value.get();
  }
  
  context.write(key, new IntWritable(suma));
 }
}

Ver también: Tipos de datos Hadoop e interfaces Writable y WritableComparable

lunes, 4 de marzo de 2013

Hadoop: Introducción al desarrollo en Java (Parte IV): El Driver (Ejemplo Word Count)

El driver se ejecuta en la máquina cliente, se trata de una función main que recibe como argumentos el input y el output y que configura el Job para finalmente enviarlo al clúster.

El Driver desarrollado con la new API:

 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountDriver {
 public static void main(String[] args) throws Exception {
//  Lo primero debería ser comprobar que recibimos
//  2 argumentos (entrada y salida, si es un número
//  diferente sería erróneo
  if (args.length != 2) {
   Sysout.printf("Error");
   System.exit(-1);
  }
 
//  Se crea un nuevo Job indicando la clase que se llamará
//  al ejecutar y el nombre del Job.
//  Configuration servirá en programas más avanzados donde
//  queramos establecer configuraciones diferentes a las
//  que vienen por defecto o para el paso de parámetros.
  Configuration conf = new Configuration();
  Job job = new Job(conf);
  job.setJarByClass(WordCountDriver.class);
  job.setJobName("Word Count");
  
//  Indicamos cuáles son las clases Mapper y Reducer
  job.setMapperClass(WordCountMapper.class);
  job.setReducerClass(wordcount.WordCountReducer.class);

//  Especificamos los directorios input y output, es decir, 
//  el directorio en HDFS donde se encuentra nuestro fichero 
//  de entrada, y dónde va a depositar los resultados
//  Recalcar que es muy importante que la ruta de output no
//  exista (el Job MapReduce la creará él solo).
  FileInputFormat.setInputPaths(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  
//  Se establecen los tipos de la key y del value a la
//  salida del reduce.
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);
  
//  Se establecen los tipos de la key y del value a la
//  salida del map.
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(IntWritable.class);
  
//  Otras configuraciones posibles:
//  Por defecto el tipo del fichero de entrada es 
//  TextInputFormat, se puede cambiar con:
//   job.setInputFormatClass(KeyValueTextInputFormat.class);
//  Por defecto la salida es un fichero de texto, 
//  se puede cambiar con:
//   job.setOutputFormatClass(TextOutputFormat.class);
   
//  Lanzamos el Job al cluster, hay varios modos, en 
//  waitForCompletion si hubiera más código implementado 
//  después de esta línea, no se ejecutaría
//  hasta que no finalizara el Job.
//  Hay otros modos en los que se puede lanzar el Job.
  boolean success = job.waitForCompletion(true);
  System.exit(success ? 0:1); 
 }
}

Algunos puntos a tener en cuenta (para las 2 APIS):
  • Se configuran sólo los tipos de las salidas, no de las entradas. Los tipos de entrada del Mapper están definidos por el InputFormat (en este ejemplo usamos el input format por defecto: TextInputFormat por lo cual las key son de tipo LongWritable y los value son de tipo Text). Los tipos de entrada del Reducer son los mismos que los de salida del Mapper. Igualmente el desarrollador tendrá que indicar cuáles son en los parámetro del Mapper y del Reducer.
  • Si las salidas del mapper y del reducer son del mismo tipo, no hace falta indicar el job.setMapOutputKeyClass ni el job.setMapOutputValueClass, basta con indicar el job.setOutputKeyClass y el job.setOutputValueClass.

Con respecto la old API han cambiado unas cuantas cosas, dejo aquí un código y luego explico las diferencias:

 
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.Reducer;

public class WordCount  { 
 public static void main(String[] args) throws Exception {
  
  JobConf conf = new JobConf(WordCountDriver.class);
  conf.setJobName(this.getClass().getName());

  conf.setMapperClass(WordCountMapper.class);
  conf.setReducerClass(WordCountReducer.class);
  
  conf.setOutputKeyClass(Text.class);
  conf.setOutputValueClass(IntWritable.class);
  conf.setMapOutputKeyClass(Text.class);
  conf.setMapOutputValueClass(IntWritable.class);

  FileInputFormat.setInputPaths(conf, new Path(args[0]));
  FileOutputFormat.setOutputPath(conf, new Path(args[1]));

  JobClient.runJob(conf);
 }
}

Como veréis, las diferencias principales son:
  • En los import, en la new API se utilizan las clases que pertenecen al paquete org.apache.hadoop.mapreduce, mientras que en la old API el paquete era org.apache.hadoop.mapred
  • En la new API el Job se ejecuta a través de la clase Job, en la old API se hace a través de JobClient.
  • En la new API el objeto de configuración es Job, en la old API es JobConf.