El código fuente y el fichero de ejemplo de esta entrada también los podréis encontrar en este enlace.
Ejemplo de aplicación MapReduce utilizando nuestra propia clase como key.
Al programar nuestra clase PersonaWritableComparable que implementa WritableComparable, en la fase del Shuffle and Sort se consigue que el Reducer reciba las key ordenadas y con sus valores correspondientes agrupados para poder operar con ellos.
Recibimos un fichero de texto cuya información es
Fecha [tab] Nombre Apellido1 Apellido2 Puntuación
Queremos como salida un listado de personas (con los nombres y apellidos) y la suma de todas sus puntuaciones.
Fichero de entrada score.txt:
01-11-2012 Pepe Perez Gonzalez 21 01-11-2012 Ana Lopez Fernandez 14 01-11-2012 Maria Garcia Martinez 11 01-11-2012 Pablo Sanchez Rodriguez 9 01-11-2012 Angel Martin Hernandez 3 15-11-2012 Pepe Perez Gonzalez 22 15-11-2012 Maria Garcia Martinez 15 15-11-2012 John Smith 13 01-12-2012 Pepe Perez Gonzalez 25 01-12-2012 Ana Lopez Fernandez 15 01-12-2012 Pablo Sanchez Rodriguez 8 01-12-2012 Maria Garcia Martinez 32 15-12-2012 Maria Garcia Martinez 47 15-12-2012 Pepe Perez Gonzalez 13 15-12-2012 Angel Martin Hernandez 13 15-12-2012 John Smith 27 01-01-2013 Ana Lopez Fernandez 5 01-01-2013 Pablo Sanchez Rodriguez 2 01-01-2013 Pepe Perez Gonzalez 17 01-01-2013 Maria Garcia Martinez 3 01-01-2013 Angel Martin Hernandez 32 01-01-2013 John Smith 21
Nuestra propia clase PersonaWritableComparable:
public class PersonaWritableComparable implements WritableComparable<PersonaWritableComparable>{ Text nombre, primerApellido, segundoApellido; public void set(String nom, String prApell, String sgApell){ nombre.set(nom); primerApellido.set(prApell); segundoApellido.set(sgApell); } public PersonaWritableComparable() { this.nombre = new Text(); this.primerApellido = new Text(); this.segundoApellido = new Text(); } public PersonaWritableComparable(Text nombre, Text primerApellido, Text segundoApellido) { this.nombre = nombre; this.primerApellido = primerApellido; this.segundoApellido = segundoApellido; } @Override public void readFields(DataInput arg0) throws IOException { this.nombre.readFields(arg0); this.primerApellido.readFields(arg0); this.segundoApellido.readFields(arg0); } @Override public void write(DataOutput arg0) throws IOException { this.nombre.write(arg0); this.primerApellido.write(arg0); this.segundoApellido.write(arg0); } @Override public int compareTo(PersonaWritableComparable o) { if(this.nombre.compareTo(o.nombre) != 0){ return this.nombre.compareTo(o.nombre); }else if(this.primerApellido.compareTo(o.primerApellido) != 0){ return this.primerApellido.compareTo(o.primerApellido); }else if(this.segundoApellido.compareTo(o.segundoApellido) != 0){ return this.segundoApellido.compareTo(o.segundoApellido); } return 0; } @Override public boolean equals(Object obj) { if(obj instanceof PersonaWritableComparable){ PersonaWritableComparable p = (PersonaWritableComparable) obj; return this.nombre.equals(p.nombre) && this.primerApellido.equals(p.primerApellido) && this.segundoApellido.equals(p.segundoApellido); } return false; } @Override public int hashCode() { return this.nombre.hashCode()*163 + this.primerApellido.hashCode()*163 + this.segundoApellido.hashCode()*163; } @Override public String toString() { return nombre.toString()+" "+primerApellido.toString()+" " +segundoApellido.toString(); } }
El Driver de la aplicación:
public class PersonaScoreDriver { public static void main(String[] args) throws Exception { if(args.length != 2){ System.out.println("Ha ocurrido un error en la entrada"); System.exit(-1); } Configuration conf = new Configuration(); Job job = new Job(conf); job.setJarByClass(PersonaScoreDriver.class); job.setJobName("Persona Score"); job.setOutputKeyClass(PersonaWritableComparable.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(PersonaScoreMapper.class); job.setReducerClass(PersonaScoreReducer.class); boolean success = job.waitForCompletion(true); System.exit(success ? 0:1); } }
La clase Mapper:
public class PersonaScoreMapper extends Mapper<LongWritable, Text, PersonaWritableComparable, IntWritable> { private IntWritable score = new IntWritable(); PersonaWritableComparable persona = new PersonaWritableComparable(); public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException { // El texto tiene este formato: // 01-11-2012 Maria Garcia Martinez 11 // La fecha separada por tabulación, el resto con espacios String[] primerSplit = values.toString().split(" "); if(primerSplit.length == 2){ String[] segundoSplit = primerSplit[1].split(" "); // Puede haber personas con un apellido o con dos if(segundoSplit.length == 3 || segundoSplit.length == 4){ if(segundoSplit.length == 3){ persona.set(segundoSplit[0], segundoSplit[1], ""); score.set(Integer.valueOf(segundoSplit[2])); }else { persona.set(segundoSplit[0], segundoSplit[1], segundoSplit[2]); score.set(Integer.valueOf(segundoSplit[3])); } context.write(persona, score); } } } }
La clase Reducer
public class PersonaScoreReducer extends Reducer<PersonaWritableComparable, IntWritable, PersonaWritableComparable, IntWritable> { public void reduce(PersonaWritableComparable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int suma = 0; for (IntWritable value : values) { suma += value.get(); } context.write(key, new IntWritable(suma)); } }
Ver también: Tipos de datos Hadoop e interfaces Writable y WritableComparable