Cuando todas las tareas Map o Reduce han finalizado se suman todos los valores del contador. Nunca tenemos que fiarnos de los valores intermedios que vayan generando las tareas si estamos mirando la evolución del job a través de la página de administración, ya que a causa de la speculative execution unos contadores se pueden sumar temporalmente de forma repetida.
El contador se utiliza añadiendo en el mapper o reducer la línea
context.getCounter(group, name).increment(cantidad);
El valor del contador se recupera en el driver a través de:
long tipoGrupo = job.getCounters().findCounter(group, name).getValue();
Además de recuperar los resultados en el driver, éstos también se pueden ver a través de la página web del jobtracker (en el puerto 50030) y a través del shell si se ha ejecutado el job map/reduce desde la línea de comando.
Vemos mejor el Counter en un ejemplo en el cual, a partir de un fichero scores.txt en el que tenemos información sobre jugadores, fechas del juego y sus puntuaciones, en el programa, además de contar el número de palabras que hay en el fichero, queremos saber cuántas veces aparecen algunos de los jugadores (Ana, Pepe, Maria y Pablo) y también, para estos jugadores, cuándo se ha hecho un registro incorrecto y no tienen asociada una puntuación (en el ejemplo siguiente Ana, María y Pepe aparecen y sumarían 1 cada uno, y además Ana sumaría un error):
01-11-2012 Pepe Perez Gonzalez 21
01-11-2012 Ana Lopez Fernandez
01-11-2012 Maria Garcia Martinez 11
...
En el ejemplo también aprovecho para aplicar el paso de parámetros que habíamos visto en una entrada anterior.
public class WordCounterMapper extends
Mapper <LongWritable, Text, Text, IntWritable> {
private final static IntWritable cuenta = new IntWritable(1);
private Text palabra = new Text();
private String GRUPO_JUGADOR;
private String GRUPO_ERROR;
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
Configuration conf = context.getConfiguration();
GRUPO_JUGADOR = conf.getStrings("grupos")[0];
GRUPO_ERROR = conf.getStrings("grupos")[1];
}
@Override
public void map(LongWritable key, Text values, Context context)
throws IOException, InterruptedException{
String linea = values.toString();
String[] elems = linea.split("\t");
for(String word : elems){
if (word.length() > 0){
String player = "";
if(word.contains("Ana")){
player = "Ana";
}else if(word.contains("Pepe")){
player = "Pepe";
}else if(word.contains("Maria")){
player = "Maria";
}else if(word.contains("Pablo")){
player = "Pablo";
}
if(!"".equals(player)){
context.getCounter(GRUPO_JUGADOR, player).increment(1);
if(elems.length < 3){
context.getCounter(GRUPO_ERROR, player).increment(1);
}
}
palabra.set(word);
context.write(palabra, cuenta);
}
}
}
}
public class WordCounterDriver extends Configured implements Tool{
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.setStrings("grupos", GRUPO_JUGADOR, GRUPO_ERROR);
Job job = new Job(conf);
job.setJarByClass(WordCounterDriver.class);
job.setJobName("Word Count");
job.setMapperClass(WordCounterMapper.class);
job.setReducerClass(WordCounterReducer.class);
FileInputFormat.setInputPaths(job, new Path("pruebas/score.txt"));
FileOutputFormat.setOutputPath(job, new Path("pruebas/out"));
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
boolean success = job.waitForCompletion(true);
long tipoAna = job.getCounters().findCounter(GRUPO_JUGADOR, "Ana").getValue();
long tipoPepe = job.getCounters().findCounter(GRUPO_JUGADOR, "Pepe").getValue();
long tipoMaria = job.getCounters().findCounter(GRUPO_JUGADOR, "Maria").getValue();
long tipoPablo = job.getCounters().findCounter(GRUPO_JUGADOR, "Pablo").getValue();
long tipoErrorAna = job.getCounters().findCounter(GRUPO_ERROR, "Ana").getValue();
long tipoErrorPepe = job.getCounters().findCounter(GRUPO_ERROR, "Pepe").getValue();
long tipoErrorMaria = job.getCounters().findCounter(GRUPO_ERROR, "Maria").getValue();
long tipoErrorPablo = job.getCounters().findCounter(GRUPO_ERROR, "Pablo").getValue();
System.out.println("Ana: "+tipoAna+" - Errores: "+tipoErrorAna);
System.out.println("Pepe: "+tipoPepe+" - Errores: "+tipoErrorPepe);
System.out.println("Maria: "+tipoMaria+" - Errores: "+tipoErrorMaria);
System.out.println("Pablo: "+tipoPablo+" - Errores: "+tipoErrorPablo);
return (success ? 0:1);
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(), new WordCounterDriver(), args);
System.exit(exitCode);
}
A tener en cuenta, no es necesario que el contador vaya sumando de 1 en 1, se pueden poner otros valores e incluso se puede hacer decrementar este valor poniendo un número negativo.


