Recuerda que puedes descargarte algunos de los ejemplos en la pestaña de Código Fuente
Mostrando entradas con la etiqueta Mapper. Mostrar todas las entradas
Mostrando entradas con la etiqueta Mapper. Mostrar todas las entradas

miércoles, 8 de mayo de 2013

Uso del Combiner para optimización de Jobs MapReduce

El Combiner es una función propia de Hadoop utilizada para optimizar los job MapReduce.

Cuando en un Job la salida del Mapper genera una gran cantidad de datos intermedios, éstos se tienen que transmitir por la red hacia los Reducer. Si la cantidad de datos es excesivamente grande aquí se puede producir un cuello de botella.

Las técnicas utilizadas para reducir la cantidad de datos y mejorar la eficiencia de los job MapReduce, se llama Local Aggregation.
Existen dos técnicas, el Combiner que vamos a ver en esta entrada y el In-Mapper Combining que veremos en la siguiente entrada.

Una buena solución para estos casos es la de implementar un Combiner, que se ejecuta a la salida de la fase Map y de forma local a este antes de enviar los datos a través de la red.

El Combiner implementa la misma interfaz que el Reducer e incluso muchas veces suele ser la misma clase que el Reducer.
Sin embargo hay que tener cuidado que la operación que se realiza en el Combiner sea asociativa y conmutativa. Por ejemplo la operación suma (es decir, el ejemplo del Wordcount) cumple con estos dos requisitos, pero por ejemplo la operación de cálculo de la media aritmética no es asociativa.

Para configurarlo se hace en el Driver a través de:
  job.setCombinerClass (MyCombiner.class);

¿Se puede implementar un Combiner distinto al Reducer?
Sí, pueden ser distintos. Hay que tener en cuenta que el Combiner va a seguir implementando la interfaz Reducer. Y también hay que tener cuidado de no poner dentro algún tipo de código "sensible" ya que el Combiner se puede ejecutar cero, una o más veces a la salida desde cualquier Mapper.

Si observamos los logs generados tras la ejecución de un Job del WordCount cuyo código he dejado publicado en este enlace.

Sin Combiner:

Map input records=1928
Map output records=187029
Combine input records=0
Combine output records=0
Reduce input records=187029
Reduce output records=22948


Con Combiner:

Map input records=1928
Map output records=187029
Combine input records=219738
Combine output records=55657
Reduce input records=22948
Reduce output records=22948   



Como he comentado anteriormente el Combiner se ejecuta a la salida del Mapper, antes de que los datos se transmitan por la red hacia el Reducer, la conclusión es que la cantidad de datos transmitida y por tanto el input del Reducer, es considerablemente menor si utilizamos ese Combiner.

Algo a tener en cuenta en el código publicado, es que he hecho un Combiner del WordCount, el código que contiene la clase WordCountCombiner.java es exactamente igual que el código de la clase WordCountReducer.java, en situaciones como esta no haría falta crear esa nueva clase, bastaría con definir en el Driver:

  job.setCombinerClass (WordCountReducer.class);

Así que creé una clase por separado para mostrar que es posible hacer el Combiner en una clase distinta (extendiendo de Reducer), recordando que no debe haber "código sensible" en ella ya que el Combiner puede llegar a ejecutarse varias veces o puede llegar a no ejecutarse.

domingo, 17 de marzo de 2013

Ejemplo de uso de Tipos de Datos propios con las interfaces Writable y WritableComparable

Continuando con la entrada anterior en la que explicaba qué son las interfaces Writable y WritableComparable y cómo es posible crear nuestros propios tipos usándolas, vamos a ver un ejemplo.

El código fuente y el fichero de ejemplo de esta entrada también los podréis encontrar en este enlace.

Ejemplo de aplicación MapReduce utilizando nuestra propia clase como key.
Al programar nuestra clase PersonaWritableComparable que implementa WritableComparable, en la fase del Shuffle and Sort se consigue que el Reducer reciba las key ordenadas y con sus valores correspondientes agrupados para poder operar con ellos.

Recibimos un fichero de texto cuya información es
Fecha [tab] Nombre Apellido1 Apellido2 Puntuación
Queremos como salida un listado de personas (con los nombres y apellidos) y la suma de todas sus puntuaciones.

Fichero de entrada score.txt:

01-11-2012 Pepe Perez Gonzalez 21
01-11-2012 Ana Lopez Fernandez 14
01-11-2012 Maria Garcia Martinez 11
01-11-2012 Pablo Sanchez Rodriguez 9
01-11-2012 Angel Martin Hernandez 3
15-11-2012 Pepe Perez Gonzalez 22
15-11-2012 Maria Garcia Martinez 15
15-11-2012 John Smith 13
01-12-2012 Pepe Perez Gonzalez 25
01-12-2012 Ana Lopez Fernandez 15
01-12-2012 Pablo Sanchez Rodriguez 8
01-12-2012 Maria Garcia Martinez 32
15-12-2012 Maria Garcia Martinez 47
15-12-2012 Pepe Perez Gonzalez 13
15-12-2012 Angel Martin Hernandez 13
15-12-2012 John Smith 27
01-01-2013 Ana Lopez Fernandez 5
01-01-2013 Pablo Sanchez Rodriguez 2
01-01-2013 Pepe Perez Gonzalez 17
01-01-2013 Maria Garcia Martinez 3
01-01-2013 Angel Martin Hernandez 32
01-01-2013 John Smith 21


Nuestra propia clase PersonaWritableComparable:
 
public class PersonaWritableComparable 
  implements WritableComparable<PersonaWritableComparable>{

 Text nombre, primerApellido, segundoApellido;
 
 public void set(String nom, String prApell, String sgApell){
  nombre.set(nom);
  primerApellido.set(prApell);
  segundoApellido.set(sgApell);
 }
 
 public PersonaWritableComparable() {
  this.nombre = new Text();
  this.primerApellido = new Text();
  this.segundoApellido = new Text();
 }

 public PersonaWritableComparable(Text nombre, 
   Text primerApellido, Text segundoApellido) {
  this.nombre = nombre;
  this.primerApellido = primerApellido;
  this.segundoApellido = segundoApellido;
 }
 
 @Override
 public void readFields(DataInput arg0) throws IOException {
  this.nombre.readFields(arg0);
  this.primerApellido.readFields(arg0);
  this.segundoApellido.readFields(arg0);
  
 }

 @Override
 public void write(DataOutput arg0) throws IOException {
  this.nombre.write(arg0);
  this.primerApellido.write(arg0);
  this.segundoApellido.write(arg0);
 }

 @Override
 public int compareTo(PersonaWritableComparable o) {
  if(this.nombre.compareTo(o.nombre) != 0){
   return this.nombre.compareTo(o.nombre);
  }else if(this.primerApellido.compareTo(o.primerApellido) != 0){
   return this.primerApellido.compareTo(o.primerApellido);
  }else if(this.segundoApellido.compareTo(o.segundoApellido) != 0){
   return this.segundoApellido.compareTo(o.segundoApellido);
  }
  return 0;
 }

 @Override
 public boolean equals(Object obj) {
  if(obj instanceof PersonaWritableComparable){
   PersonaWritableComparable p = (PersonaWritableComparable) obj;
   return this.nombre.equals(p.nombre) && 
    this.primerApellido.equals(p.primerApellido) && 
    this.segundoApellido.equals(p.segundoApellido);
  }
  return false;
 }

 @Override
 public int hashCode() {
  return this.nombre.hashCode()*163 + 
    this.primerApellido.hashCode()*163 + 
    this.segundoApellido.hashCode()*163;
 }
 
 @Override
 public String toString() {
  return nombre.toString()+" "+primerApellido.toString()+" "
   +segundoApellido.toString();
 }
}


El Driver de la aplicación:
 
public class PersonaScoreDriver {
 public static void main(String[] args) throws Exception {
  if(args.length != 2){
   System.out.println("Ha ocurrido un error en la entrada");
   System.exit(-1);
  }
  
  Configuration conf = new Configuration();
  Job job = new Job(conf);
  job.setJarByClass(PersonaScoreDriver.class);
  
  job.setJobName("Persona Score");
  
  job.setOutputKeyClass(PersonaWritableComparable.class);
  job.setOutputValueClass(IntWritable.class);

  FileInputFormat.setInputPaths(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  
  job.setMapperClass(PersonaScoreMapper.class);
  job.setReducerClass(PersonaScoreReducer.class);

  boolean success = job.waitForCompletion(true);
  System.exit(success ? 0:1);  
 }
}


La clase Mapper:
 
public class PersonaScoreMapper extends 
 Mapper<LongWritable, Text, 
 PersonaWritableComparable, IntWritable> {

 private IntWritable score = new IntWritable();
 PersonaWritableComparable persona = new PersonaWritableComparable();
 
 public void map(LongWritable key, Text values,
   Context context) throws IOException, InterruptedException {
  
  // El texto tiene este formato:
  // 01-11-2012 Maria Garcia Martinez 11
  // La fecha separada por tabulación, el resto con espacios
  String[] primerSplit = values.toString().split(" ");
  if(primerSplit.length == 2){
   String[] segundoSplit = primerSplit[1].split(" ");
   
   // Puede haber personas con un apellido o con dos
   if(segundoSplit.length == 3 || segundoSplit.length == 4){
    if(segundoSplit.length == 3){
     persona.set(segundoSplit[0], segundoSplit[1], "");
     score.set(Integer.valueOf(segundoSplit[2]));
    }else {
     persona.set(segundoSplit[0], segundoSplit[1], segundoSplit[2]);
     score.set(Integer.valueOf(segundoSplit[3]));
    }
    context.write(persona, score);
   } 
  }
 }
}


La clase Reducer
 
public class PersonaScoreReducer extends 
 Reducer<PersonaWritableComparable, IntWritable, 
 PersonaWritableComparable, IntWritable> {

 public void reduce(PersonaWritableComparable key, 
   Iterable<IntWritable> values,
   Context context) throws IOException, InterruptedException {
  
  int suma = 0;
  for (IntWritable value : values) {
   suma += value.get();
  }
  
  context.write(key, new IntWritable(suma));
 }
}

Ver también: Tipos de datos Hadoop e interfaces Writable y WritableComparable

jueves, 28 de febrero de 2013

Hadoop: Introducción al Desarrollo en Java (Parte II): El Mapper (Ejemplo Word Count)

El Mapper implementa el método map, es la parte del programa que se va a ejecutar en el lugar en el que se encuentran los bloques de datos, hará las operaciones necesarias con ellos, seleccionará sólo los datos que nos interesan y los emitirá como datos intermedios antes de que se sigan procesando
 
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

// Esta clase tiene que extender de la clase Mapper.
// Espera 4 tipos de datos: los 2 primeros definen 
// los tipos del key/value de entrada y los 2 últimos 
// definen los tipos del key/value de salida.
public class WordCountMapper extends 
 Mapper <LongWritable, Text, Text, IntWritable> {

// Una buena práctica es la reutilización de objetos. 
// Cuando necesitamos utilizar constantes, crear una 
// variable estática fuera del map.
// De esta forma, cada vez que el método map se llama,
// no se creará una nueva instancia de ese tipo.
 private final static IntWritable cuenta = new IntWritable(1);
 private Text palabra = new Text();

// La función que obligatoriamente tiene que 
// implementarse en el Mapper es la map, que va
// a recibir como parámetros: primero el tipo de
// la key, luego el tipo del value y finalmente un
// objeto Context que se usará para escribir los 
// datos intermedios
 public void map(LongWritable key, Text values, Context context) 
   throws IOException, InterruptedException{
// En el objeto "values" estamos recibiendo cada
// línea del fichero que estamos leyendo. Primero
// tenemos que pasarlo a String para poder 
// operar con él
  String linea = values.toString();
  
// Cada línea va a contener palabras separadas por
// "un separador", separador que se considera como
// una expresión regular y a partir del cual dividimos
// la línea. Vamos recorriendo elemento a elemento. 
  for(String word : linea.split(" ")){
   if (word.length() > 0){
//  Le damos el valor a nuestro objeto creado para la
//  reutilización (claramente, a 'palabra', ya que 
//  'cuenta' es una constante final static).
//  Con el write escribimos los datos intermedios, que
//  son como key la palabra y como valor un 1.
    palabra.set(word);
    context.write(palabra, cuenta);
   }
  }
 }
}

Ahora os muestro el código con la old API que vemos que también tiene algunas diferencias. Además, aprovecho para que os fijéis en la parte output.collect, ahí se está creando cada vez una nueva instancia al objeto IntWritable y mostrar que es bastante útil intentar reaprovechar los objetos:
 
import java.io.IOException;
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapred.MapReduceBase; 
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector; 
import org.apache.hadoop.mapred.Reporter;

public class WordCountMapper extends MapReduceBase implements 
   Mapper <LongWritable, Text, Text, IntWritable> {

    public void map(LongWritable key, Text value, 
     OutputCollector<Text, IntWritable> output, Reporter reporter) 
     throws IOException {

        String s = value.toString();
        for (String word : s.split("\\W+")) {
           if (word.length() > 0) {
              output.collect(new Text(word), new IntWritable(1));
           }
        }
    }
}


Las diferencias principales que observamos son:

  • En la new API la clase sólo extiende de Mapper, mientras que en la old API necesita extender de MapReduceBase e implementar Mapper.
  • La new API recibe 3 atributos: los 2 tipos del par key/value y el context. La old API recibía 4, los 2 tipos de la key/value, un OutputCollector que es donde se escribían los datos intermedios y un objeto Reporter que servía para devolver cierta información al Driver. En la new API este paso de información se puede hacer con el Context.

Ver también: