Recuerda que puedes descargarte algunos de los ejemplos en la pestaña de Código Fuente

sábado, 2 de marzo de 2013

Hadoop: Introducción al desarrollo en Java (Parte III): El Reducer (Ejemplo Word Count)

El Reducer implementa el método reduce y es la parte del programa que va a recibir los datos intermedios y tras haber sufrido el proceso "Shuffle and Sort", es decir, va a recibir para cada key su lista de valores correspondiente. Devolveré pares key/value tras haber hecho ciertas operaciones y obtener los valores que necesitamos.

 
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

// El reducer debe extender de la clase Reducer, 
// que espera 4 objetos que definen los tipos, los 
// 2 primeros la key/value de entrada (que son los
// valores intermedios )y los 2
// últimos la key/value de salida
public class WordCountReducer extends 
 Reducer<Text, IntWritable, Text, IntWritable> {
// En el reducer, al igual que en el mapper se podrían
// reutilizar los objetos declarándolos aquí. 
// Pero esta vez lo implemento sin usarlo para que 
// podáis ver cómo quedaría.


// El método reduce recibe 3 atributos, el primero
// es la key de entrada y el segundo es una lista
// de los valores intermedios asociados a esa key.
// Al igual que el Mapper, recibe el objeto Context
// para escribir la salida y otras informaciones.
 public void reduce(Text key, Iterable<IntWritable> values, 
   Context context) 
   throws IOException, InterruptedException {
  
  int count = 0;

// Se va recorriendo la lista de valores y para cada
// uno se extrae a través del .get() el valor correspondiente
// Se van sumando esos valores para obtener el total
// de veces que aparece una palabra.
  for (IntWritable value : values) {
   count += value.get();
  }
// Finalmente escribimos el resultado en HDFS usando 
// el context.write
  context.write(key, new IntWritable(count));
 }
}

Y este es el mismo código pero para la old API:
 
import java.io.IOException; 
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapred.OutputCollector; 
import org.apache.hadoop.mapred.MapReduceBase; 
import org.apache.hadoop.mapred.Reducer; 
import org.apache.hadoop.mapred.Reporter;

public class WordCountReducer extends MapReduceBase 
  implements Reducer<Text, IntWritable, Text, IntWritable> {

    public void reduce(Text key, Iterator<IntWritable> values, 
      OutputCollector<Text, IntWritable> output, Reporter reporter) 
      throws IOException {

         int wordCount = 0; 
         while (values.hasNext()) {
            IntWritable value = values.next(); 
            wordCount += value.get();
         } 
         output.collect(key, new IntWritable(wordCount));
    }
}


Las diferencias principales son las mismas que en el Mapper, pero aquí las pongo:
  • En la new API la clase sólo extiende de Reducer, mientras que en la old API necesita extender de MapReduceBase e implementar Reducer.
  • La new API recibe 3 atributos: los 2 tipos del par key/value y el context. La old API recibía 4, los 2 tipos de la key/value, un OutputCollector que es donde se escribían los datos intermedios y un objeto Reporter que servía para devolver cierta información al Driver. En la new API este paso de información se puede hacer con el Context.


Ver también:

No hay comentarios:

Publicar un comentario

Gracias por dejar vuestras sugerencias, dudas, críticas o comentarios en general