El Reducer implementa el método reduce y es la parte del programa que va a recibir los datos intermedios y tras haber sufrido el proceso "Shuffle and Sort", es decir, va a recibir para cada key su lista de valores correspondiente. Devolveré pares key/value tras haber hecho ciertas operaciones y obtener los valores que necesitamos.
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
// El reducer debe extender de la clase Reducer,
// que espera 4 objetos que definen los tipos, los
// 2 primeros la key/value de entrada (que son los
// valores intermedios )y los 2
// últimos la key/value de salida
public class WordCountReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
// En el reducer, al igual que en el mapper se podrían
// reutilizar los objetos declarándolos aquí.
// Pero esta vez lo implemento sin usarlo para que
// podáis ver cómo quedaría.
// El método reduce recibe 3 atributos, el primero
// es la key de entrada y el segundo es una lista
// de los valores intermedios asociados a esa key.
// Al igual que el Mapper, recibe el objeto Context
// para escribir la salida y otras informaciones.
public void reduce(Text key, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {
int count = 0;
// Se va recorriendo la lista de valores y para cada
// uno se extrae a través del .get() el valor correspondiente
// Se van sumando esos valores para obtener el total
// de veces que aparece una palabra.
for (IntWritable value : values) {
count += value.get();
}
// Finalmente escribimos el resultado en HDFS usando
// el context.write
context.write(key, new IntWritable(count));
}
}
Y este es el mismo código pero para la old API:
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
public class WordCountReducer extends MapReduceBase
implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
int wordCount = 0;
while (values.hasNext()) {
IntWritable value = values.next();
wordCount += value.get();
}
output.collect(key, new IntWritable(wordCount));
}
}
Las diferencias principales son las mismas que en el Mapper, pero aquí las pongo:
- En la new API la clase sólo extiende de Reducer, mientras que en la old API necesita extender de MapReduceBase e implementar Reducer.
- La new API recibe 3 atributos: los 2 tipos del par key/value y el context. La old API recibía 4, los 2 tipos de la key/value, un OutputCollector que es donde se escribían los datos intermedios y un objeto Reporter que servía para devolver cierta información al Driver. En la new API este paso de información se puede hacer con el Context.
Ver también: