Continuando con la
entrada anterior en la que explicaba qué son las interfaces Writable y WritableComparable y cómo es posible crear nuestros propios tipos usándolas, vamos a ver un ejemplo.
El código fuente y el fichero de ejemplo de esta entrada también los podréis encontrar en
este enlace.
Ejemplo de aplicación MapReduce utilizando nuestra propia clase como key.
Al programar nuestra clase PersonaWritableComparable que implementa WritableComparable, en la fase del Shuffle and Sort se consigue que el Reducer reciba las key ordenadas y con sus valores correspondientes agrupados para poder operar con ellos.
Recibimos un fichero de texto cuya información es
Fecha [tab] Nombre Apellido1 Apellido2 Puntuación
Queremos como salida un listado de personas (con los nombres y apellidos) y la suma de todas sus puntuaciones.
Fichero de entrada score.txt:
01-11-2012 Pepe Perez Gonzalez 21
01-11-2012 Ana Lopez Fernandez 14
01-11-2012 Maria Garcia Martinez 11
01-11-2012 Pablo Sanchez Rodriguez 9
01-11-2012 Angel Martin Hernandez 3
15-11-2012 Pepe Perez Gonzalez 22
15-11-2012 Maria Garcia Martinez 15
15-11-2012 John Smith 13
01-12-2012 Pepe Perez Gonzalez 25
01-12-2012 Ana Lopez Fernandez 15
01-12-2012 Pablo Sanchez Rodriguez 8
01-12-2012 Maria Garcia Martinez 32
15-12-2012 Maria Garcia Martinez 47
15-12-2012 Pepe Perez Gonzalez 13
15-12-2012 Angel Martin Hernandez 13
15-12-2012 John Smith 27
01-01-2013 Ana Lopez Fernandez 5
01-01-2013 Pablo Sanchez Rodriguez 2
01-01-2013 Pepe Perez Gonzalez 17
01-01-2013 Maria Garcia Martinez 3
01-01-2013 Angel Martin Hernandez 32
01-01-2013 John Smith 21
Nuestra propia clase PersonaWritableComparable:
public class PersonaWritableComparable
implements WritableComparable<PersonaWritableComparable>{
Text nombre, primerApellido, segundoApellido;
public void set(String nom, String prApell, String sgApell){
nombre.set(nom);
primerApellido.set(prApell);
segundoApellido.set(sgApell);
}
public PersonaWritableComparable() {
this.nombre = new Text();
this.primerApellido = new Text();
this.segundoApellido = new Text();
}
public PersonaWritableComparable(Text nombre,
Text primerApellido, Text segundoApellido) {
this.nombre = nombre;
this.primerApellido = primerApellido;
this.segundoApellido = segundoApellido;
}
@Override
public void readFields(DataInput arg0) throws IOException {
this.nombre.readFields(arg0);
this.primerApellido.readFields(arg0);
this.segundoApellido.readFields(arg0);
}
@Override
public void write(DataOutput arg0) throws IOException {
this.nombre.write(arg0);
this.primerApellido.write(arg0);
this.segundoApellido.write(arg0);
}
@Override
public int compareTo(PersonaWritableComparable o) {
if(this.nombre.compareTo(o.nombre) != 0){
return this.nombre.compareTo(o.nombre);
}else if(this.primerApellido.compareTo(o.primerApellido) != 0){
return this.primerApellido.compareTo(o.primerApellido);
}else if(this.segundoApellido.compareTo(o.segundoApellido) != 0){
return this.segundoApellido.compareTo(o.segundoApellido);
}
return 0;
}
@Override
public boolean equals(Object obj) {
if(obj instanceof PersonaWritableComparable){
PersonaWritableComparable p = (PersonaWritableComparable) obj;
return this.nombre.equals(p.nombre) &&
this.primerApellido.equals(p.primerApellido) &&
this.segundoApellido.equals(p.segundoApellido);
}
return false;
}
@Override
public int hashCode() {
return this.nombre.hashCode()*163 +
this.primerApellido.hashCode()*163 +
this.segundoApellido.hashCode()*163;
}
@Override
public String toString() {
return nombre.toString()+" "+primerApellido.toString()+" "
+segundoApellido.toString();
}
}
El Driver de la aplicación:
public class PersonaScoreDriver {
public static void main(String[] args) throws Exception {
if(args.length != 2){
System.out.println("Ha ocurrido un error en la entrada");
System.exit(-1);
}
Configuration conf = new Configuration();
Job job = new Job(conf);
job.setJarByClass(PersonaScoreDriver.class);
job.setJobName("Persona Score");
job.setOutputKeyClass(PersonaWritableComparable.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(PersonaScoreMapper.class);
job.setReducerClass(PersonaScoreReducer.class);
boolean success = job.waitForCompletion(true);
System.exit(success ? 0:1);
}
}
La clase Mapper:
public class PersonaScoreMapper extends
Mapper<LongWritable, Text,
PersonaWritableComparable, IntWritable> {
private IntWritable score = new IntWritable();
PersonaWritableComparable persona = new PersonaWritableComparable();
public void map(LongWritable key, Text values,
Context context) throws IOException, InterruptedException {
// El texto tiene este formato:
// 01-11-2012 Maria Garcia Martinez 11
// La fecha separada por tabulación, el resto con espacios
String[] primerSplit = values.toString().split(" ");
if(primerSplit.length == 2){
String[] segundoSplit = primerSplit[1].split(" ");
// Puede haber personas con un apellido o con dos
if(segundoSplit.length == 3 || segundoSplit.length == 4){
if(segundoSplit.length == 3){
persona.set(segundoSplit[0], segundoSplit[1], "");
score.set(Integer.valueOf(segundoSplit[2]));
}else {
persona.set(segundoSplit[0], segundoSplit[1], segundoSplit[2]);
score.set(Integer.valueOf(segundoSplit[3]));
}
context.write(persona, score);
}
}
}
}
La clase Reducer
public class PersonaScoreReducer extends
Reducer<PersonaWritableComparable, IntWritable,
PersonaWritableComparable, IntWritable> {
public void reduce(PersonaWritableComparable key,
Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int suma = 0;
for (IntWritable value : values) {
suma += value.get();
}
context.write(key, new IntWritable(suma));
}
}
Ver también:
Tipos de datos Hadoop e interfaces Writable y WritableComparable