Cuando todas las tareas Map o Reduce han finalizado se suman todos los valores del contador. Nunca tenemos que fiarnos de los valores intermedios que vayan generando las tareas si estamos mirando la evolución del job a través de la página de administración, ya que a causa de la speculative execution unos contadores se pueden sumar temporalmente de forma repetida.
El contador se utiliza añadiendo en el mapper o reducer la línea
context.getCounter(group, name).increment(cantidad);
El valor del contador se recupera en el driver a través de:
long tipoGrupo = job.getCounters().findCounter(group, name).getValue();
Además de recuperar los resultados en el driver, éstos también se pueden ver a través de la página web del jobtracker (en el puerto 50030) y a través del shell si se ha ejecutado el job map/reduce desde la línea de comando.
Vemos mejor el Counter en un ejemplo en el cual, a partir de un fichero scores.txt en el que tenemos información sobre jugadores, fechas del juego y sus puntuaciones, en el programa, además de contar el número de palabras que hay en el fichero, queremos saber cuántas veces aparecen algunos de los jugadores (Ana, Pepe, Maria y Pablo) y también, para estos jugadores, cuándo se ha hecho un registro incorrecto y no tienen asociada una puntuación (en el ejemplo siguiente Ana, María y Pepe aparecen y sumarían 1 cada uno, y además Ana sumaría un error):
01-11-2012 Pepe Perez Gonzalez 21
01-11-2012 Ana Lopez Fernandez
01-11-2012 Maria Garcia Martinez 11
...
En el ejemplo también aprovecho para aplicar el paso de parámetros que habíamos visto en una entrada anterior.
public class WordCounterMapper extends Mapper <LongWritable, Text, Text, IntWritable> { private final static IntWritable cuenta = new IntWritable(1); private Text palabra = new Text(); private String GRUPO_JUGADOR; private String GRUPO_ERROR; @Override protected void setup(Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); GRUPO_JUGADOR = conf.getStrings("grupos")[0]; GRUPO_ERROR = conf.getStrings("grupos")[1]; } @Override public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException{ String linea = values.toString(); String[] elems = linea.split("\t"); for(String word : elems){ if (word.length() > 0){ String player = ""; if(word.contains("Ana")){ player = "Ana"; }else if(word.contains("Pepe")){ player = "Pepe"; }else if(word.contains("Maria")){ player = "Maria"; }else if(word.contains("Pablo")){ player = "Pablo"; } if(!"".equals(player)){ context.getCounter(GRUPO_JUGADOR, player).increment(1); if(elems.length < 3){ context.getCounter(GRUPO_ERROR, player).increment(1); } } palabra.set(word); context.write(palabra, cuenta); } } } }
public class WordCounterDriver extends Configured implements Tool{ public int run(String[] args) throws Exception { Configuration conf = new Configuration(); conf.setStrings("grupos", GRUPO_JUGADOR, GRUPO_ERROR); Job job = new Job(conf); job.setJarByClass(WordCounterDriver.class); job.setJobName("Word Count"); job.setMapperClass(WordCounterMapper.class); job.setReducerClass(WordCounterReducer.class); FileInputFormat.setInputPaths(job, new Path("pruebas/score.txt")); FileOutputFormat.setOutputPath(job, new Path("pruebas/out")); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); boolean success = job.waitForCompletion(true); long tipoAna = job.getCounters().findCounter(GRUPO_JUGADOR, "Ana").getValue(); long tipoPepe = job.getCounters().findCounter(GRUPO_JUGADOR, "Pepe").getValue(); long tipoMaria = job.getCounters().findCounter(GRUPO_JUGADOR, "Maria").getValue(); long tipoPablo = job.getCounters().findCounter(GRUPO_JUGADOR, "Pablo").getValue(); long tipoErrorAna = job.getCounters().findCounter(GRUPO_ERROR, "Ana").getValue(); long tipoErrorPepe = job.getCounters().findCounter(GRUPO_ERROR, "Pepe").getValue(); long tipoErrorMaria = job.getCounters().findCounter(GRUPO_ERROR, "Maria").getValue(); long tipoErrorPablo = job.getCounters().findCounter(GRUPO_ERROR, "Pablo").getValue(); System.out.println("Ana: "+tipoAna+" - Errores: "+tipoErrorAna); System.out.println("Pepe: "+tipoPepe+" - Errores: "+tipoErrorPepe); System.out.println("Maria: "+tipoMaria+" - Errores: "+tipoErrorMaria); System.out.println("Pablo: "+tipoPablo+" - Errores: "+tipoErrorPablo); return (success ? 0:1); } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new Configuration(), new WordCounterDriver(), args); System.exit(exitCode); }
A tener en cuenta, no es necesario que el contador vaya sumando de 1 en 1, se pueden poner otros valores e incluso se puede hacer decrementar este valor poniendo un número negativo.