Recuerda que puedes descargarte algunos de los ejemplos en la pestaña de Código Fuente

lunes, 27 de mayo de 2013

Patrón de Diseño "In-Mapper Combining" para programación de algoritmos MapReduce en Java

En esta entrada voy a tratar la otra técnica de Local Aggregation que comenté en la entrada anterior sobre Combiners, utilizada para optimizar los Job MapReduce y reducir la cantidad de tráfico entre el Mapper y el Reducer, el In-mapper combining, que es más bien un patrón de diseño para algoritmos MapReduce.
Este algoritmo nos lo proponen en el libro de J. Lin and C. Dyer "Data-Intensive Text Processing with MapReduce"

La meta de esta técnica es, como ya he dicho, el reducir el número de pares key/value que salen del Mapper y se envían al Reducer.

La ventaja del in-mapper combining sobre el Combiner tradicional que vimos en la entrada anterior, es que el primero puede que se ejecute o puede que no, no tenemos control sobre ello. Mientras que el in-mapper combining podemos hacer que se ejecute siempre. La segunda ventaja de este patrón de diseño es controlar cómo se lleva a cabo exactamente.

Otra ventaja es que con el Combiner se reduce el número de datos que llegan al Shuffle and Sort para luego enviarlos al Reducer, pero realmente no reduce el número de pares key/value emitidos por el Mapper.

Al in-mapper combining más bien se le considera como un patrón de diseño a la hora de desarrollar algoritmos MapReduce en Hadoop, es decir, una técnica a la hora de programar ampliando el algoritmo implementado en el Mapper para reducir el número de pares key/value que emite.

Esta técnica se puede dividir en dos formas: una local y una global.

Volviendo al ejemplo más básico que tenemos, el WordCount, en el que tenemos una clase Mapper y una Reducer, con esta técnica el Reducer va a ser exactamente igual, pero en el Mapper vamos a introducir directamente la funcionalidad del Combiner, más bien en lo que se basa. Y esto lo hace utilizando un objeto de tipo Map.
Este se refiere al algoritmo local, ya que es local respecto al método.
Con esta técnica hemos reducido desde el principio el número de pares key/value que emite el Mapper y que llegan al Shuffle and Sort para posteriormente ser enviados al Reducer.

 
public class WordCountMapper extends 
  Mapper <LongWritable, Text, Text, IntWritable>{

 private IntWritable cuenta = new IntWritable();
 private Text palabra = new Text();

 @Override
 public void map(LongWritable key, Text values, Context context) 
     throws IOException, InterruptedException{
     
   String linea = values.toString();
   Map<String, Integer> myWordMap = new HashMap<String, Integer>();  
   for(String word : linea.split(" ")){
    int sum = 1;
    if(myWordMap.containsKey(word)){
     sum += myWordMap.get(word);
    }
    myWordMap.put(word, sum);
   }
   
   Iterator<String> iterator = myWordMap.keySet().iterator();
   while (iterator.hasNext()){
    palabra.set(iterator.next());
    cuenta.set(myWordMap.get(palabra.toString()));
    context.write(palabra, cuenta);
   }
  }
}


Este método todavía conlleva a que el Mapper cree y destruya múltiples objetos o realice otras tareas que ocupan memoria de una forma considerable, y es por ello que se debe optimizar llegando a la forma global, de tal forma que el Mapper sólo emitirá los pares key/value necesarios al Shuffle and Sort y al Reducer.

Esa optimización se hace utilizando los métodos setup() y cleanup(), que como ya comenté en la entrada sobre estos métodos, se van a ejecutar sólo una vez, el primero antes de que todas las tareas map comiencen y el segundo justo cuando todas las tareas map finalizan y antes de que el Mapper sea destruído.

 
public class WordCountMapper extends 
  Mapper <LongWritable, Text, Text, IntWritable> {

 Map<String, Integer> myWordMap;
 @Override
 protected void setup(Context context)
   throws IOException, InterruptedException {
  if(myWordMap == null )
   myWordMap = new HashMap<String, Integer>();
 }
 
 @Override
  public void map(LongWritable key, Text values, Context context) 
     throws IOException, InterruptedException{
     
   String linea = values.toString();   
   for(String word : linea.split(" ")){
    int sum = 1;
    if(myWordMap.containsKey(word)){
     sum += myWordMap.get(word);
    }
    myWordMap.put(word, sum);
   }
  }

 @Override
 protected void cleanup(Context context)
   throws IOException, InterruptedException {
  Iterator<String> iterator = myWordMap.keySet().iterator();
  IntWritable cuenta = new IntWritable();
  Text palabra = new Text();
    
  while (iterator.hasNext()){
   palabra.set(iterator.next());
   cuenta.set(myWordMap.get(palabra.toString()));
   context.write(palabra, cuenta);
  }
 }
}

Desarrollar la clase Mapper de esta forma mejora la opción anterior pero sigue teniendo problemas de memoria.


Recordemos que cuando usamos el Combiner, primero el Mapper va a tratar los datos y va a emitir los pares key/value, los va a escribir en el disco local y después va a ser el Combiner quien va a actuar.
Si implementamos el algoritmo anterior nos puede surgir un problema con la memoria, ya que si estamos hablando de TB de datos, o no hay coincidencia en las palabras y se genera un registro para cada palabra, estos se van a cargar en el HashMap en memoria y no se van a liberar hasta que todas las tareas map han finalizado y el objeto HashMap se destruya.

La solución para limitar la memoria es "bloquear" la entrada de pares key/value y liberar la estructura HashMap cada cierto tiempo.
Así que la idea es en vez de emitir los datos intermedios una vez que todos los pares key/value han sido tratados, emitir los datos intermedios después de haber tratado N pares key/value.
Esto tendría una fácil solución, que es poner un contador FLUSH_COUNTER, y cuando el número de pares key/value tratados llegue a ese contador, emitir los datos y vaciar el HashMap.

 
public class WordCountMapper extends 
  Mapper <LongWritable, Text, Text, IntWritable> {

 private static Map<String, Integer> myWordMap;
 private static final int FLUSH_COUNTER = 1000;
  
 @Override
 protected void setup(Context context)
   throws IOException, InterruptedException {
  if(myWordMap == null )
   myWordMap = new HashMap<String, Integer>();
 }
 
 @Override
  public void map(LongWritable key, Text values, Context context) 
     throws IOException, InterruptedException{

  String linea = values.toString();
   for(String word : linea.split(" ")){
    if(myWordMap.containsKey(word)){
     myWordMap.put(word, myWordMap.get(word)+1);
    }else{
     myWordMap.put(word, 1);
    }
   }
   
   if(myWordMap.size() >= FLUSH_COUNTER)
    flush(context);
  }

 private void flush(Context context) throws IOException, InterruptedException{
  Iterator<String> iterator = myWordMap.keySet().iterator();
  IntWritable cuenta = new IntWritable();
  Text palabra = new Text();
    
  while (iterator.hasNext()){
   palabra.set(iterator.next());
   cuenta.set(myWordMap.get(palabra.toString()));
   context.write(palabra, cuenta);
  }
  
  myWordMap.clear();
 }

 @Override
 protected void cleanup(Context context)
   throws IOException, InterruptedException {
  flush(context);
 }
}



Estamos en un punto donde podemos seguir haciéndonos preguntas una tras otra para optimizar este algoritmo y aplicar este patrón de diseño correctamente, y la siguiente cuestión sería ¿cómo sabemos cuál es el número FLUSH_COUNT de pares que hay que tratar antes de que la memoria falle?.
El libro de referencia propone que más que crear un contador de pares key/value, poner un límite a la memoria que se utiliza y en el momento que el HashMap ha llegado a ese límite se liberaría. El problema es que en Java no es fácil determinar de forma rápida el tamaño de la memoria ocupada por un objeto, por eso, es mejor elegir un FLUSH_COUNT lo más alto posible y teniendo cuidado que el HashMap no provoque un Out Of Memory.


martes, 14 de mayo de 2013

Map Files

Los Map Files son un tipo de ficheros Sequence Files que contienen un index que permite hacer búsquedas por key. Realmente es un directorio que contiene dos sequence files, uno con el índice, y el otro con los datos.
Se puede pensar en este tipo de ficheros como si fueran una especie de java.util.Map.
Si listamos la carpeta de salida donde se generan estos dos ficheros:


$ hadoop-1.0.4/bin/hadoop fs -ls pruebas/poemamapfile
Found 2 items
-rw-r--r--   1 elena supergroup        780 2013-04-22 19:42 /user/elena/pruebas/poemamapfile/data
-rw-r--r--   1 elena supergroup        203 2013-04-22 19:42 /user/elena/pruebas/poemamapfile/index
$


El índice no guarda todos los registros, sino que guarda el valor cada cierto número de registros, por defecto 128. Es decir, el fichero de índice va a tener un formato parecido a este:

1          128
129      6079
257      12054
385      18030
513      ...


Es decir, el número de registro y el offset de éste. Como vemos, el índice contiene registros cada 128.
Si quisiéramos cambiar este intervalo, que en vez los 128 por defecto, poner otro número, usaríamos:

writer.setIndexInterval(nIntervalos);

Siendo nIntervalos cada cuántos registros queremos que se indexen. Si nIntervalos  = 5, la visualización del índice sería:

1        128
6        1215
11      2410
16      3606
21      ...



Crear un MapFile


public class CreateMapFile {

 private static final String[] POEMA = { 
  "El ciego sol se estrella",
  "en las duras aristas de las armas,",
  "llaga de luz los petos y espaldares",
  "y flamea en las puntas de las lanzas.",
  "El ciego sol, la sed y la fatiga",
  "Por la terrible estepa castellana,",
  "al destierro, con doce de los suyos",
  "-polvo, sudor y hierro- el Cid cabalga.",
  "Cerrado está el mesón a piedra y lodo.",
  "Nadie responde... Al pomo de la espada",
  "y al cuento de las picas el postigo",
  "va a ceder ¡Quema el sol, el aire abrasa!"};
 
 private static final String rutaDestino = new String ("pruebas/poemamapfile");

 public static void main(String[] args) throws IOException {
  Configuration conf = new Configuration();
  FileSystem fs = FileSystem.get( conf);
  //Path path = new Path(rutaDestino);

  IntWritable key = new IntWritable();
  Text value = new Text();
  
  MapFile.Writer writer = new MapFile.Writer(conf, fs, 
   rutaDestino, key.getClass(), value.getClass());

  //Indico un intervalo de índice diferente a 128 (por defecto)
  writer.setIndexInterval(5);
   
  //No usaré posiciones consecutivas.
  int pos = 1;
  for (int i = 0; i < POEMA.length; i++) { 
   // La key es el número de línea
   key.set(pos); 
   // El value es la línea del poema correspondiente
   value.set(POEMA[i]); 
   // Escribimos el par en el sequenceFile 
   
   writer.append(key, value);
   pos += 3;
  }
  writer.close();
 }

}


Leer un MapFile


public class ReadMapFile {

 private static final String rutaDestino = new String ("pruebas/poemamapfile");
 
 public static void main(String[] args) throws IOException, InstantiationException, IllegalAccessException {
  Configuration conf = new Configuration();
  FileSystem fs = FileSystem.get(conf);

  MapFile.Reader reader = new MapFile.Reader(fs, rutaDestino, conf);
  
  IntWritable key = (IntWritable) reader.getKeyClass().newInstance();
  Text value = (Text) reader.getValueClass().newInstance();
  
  StringBuilder strBuilder;
  while(reader.next(key, value)){
   strBuilder = new StringBuilder(key.toString()).append(" - ").append(value.toString());
   System.out.println(strBuilder);
  }

  // Posiciono el reader en una key dada y conocida, si no existe la key daría un error
  value = (Text)reader.get(new IntWritable(7), new Text());
  strBuilder = new StringBuilder(" Probando get() ").append(value.toString());
  System.out.println(strBuilder);
  
  // Busco una key a partir de un valor, que si no existe, me daría el valor
  // siguiente más próximo y me posiciona el reader en ese lugar.
  key = (IntWritable) reader.getClosest(new IntWritable(11), new Text());
  strBuilder = new StringBuilder(" Probando getClosest() ").append(key.toString());
  System.out.println(strBuilder);

  
  reader.close();
 }

}


En la creación las key deben estar ordenadas, si intentamos crear un mapfile con un orden aleatorio, dará error de tipo:
Exception in thread "main" java.io.IOException: key out of order: n after m


Al igual que en los Sequence Files el MapFile.Reader nos da opciones para posicionarnos en el reader.
A través del método get(), dada una key conocida, aunque si esa key no existe al intentar acceder al Objeto devuelto nos dará error.
Y a través del método getClosest(), nos posicionaremos en el valor más próximo después de la key introducida.

miércoles, 8 de mayo de 2013

Uso del Combiner para optimización de Jobs MapReduce

El Combiner es una función propia de Hadoop utilizada para optimizar los job MapReduce.

Cuando en un Job la salida del Mapper genera una gran cantidad de datos intermedios, éstos se tienen que transmitir por la red hacia los Reducer. Si la cantidad de datos es excesivamente grande aquí se puede producir un cuello de botella.

Las técnicas utilizadas para reducir la cantidad de datos y mejorar la eficiencia de los job MapReduce, se llama Local Aggregation.
Existen dos técnicas, el Combiner que vamos a ver en esta entrada y el In-Mapper Combining que veremos en la siguiente entrada.

Una buena solución para estos casos es la de implementar un Combiner, que se ejecuta a la salida de la fase Map y de forma local a este antes de enviar los datos a través de la red.

El Combiner implementa la misma interfaz que el Reducer e incluso muchas veces suele ser la misma clase que el Reducer.
Sin embargo hay que tener cuidado que la operación que se realiza en el Combiner sea asociativa y conmutativa. Por ejemplo la operación suma (es decir, el ejemplo del Wordcount) cumple con estos dos requisitos, pero por ejemplo la operación de cálculo de la media aritmética no es asociativa.

Para configurarlo se hace en el Driver a través de:
  job.setCombinerClass (MyCombiner.class);

¿Se puede implementar un Combiner distinto al Reducer?
Sí, pueden ser distintos. Hay que tener en cuenta que el Combiner va a seguir implementando la interfaz Reducer. Y también hay que tener cuidado de no poner dentro algún tipo de código "sensible" ya que el Combiner se puede ejecutar cero, una o más veces a la salida desde cualquier Mapper.

Si observamos los logs generados tras la ejecución de un Job del WordCount cuyo código he dejado publicado en este enlace.

Sin Combiner:

Map input records=1928
Map output records=187029
Combine input records=0
Combine output records=0
Reduce input records=187029
Reduce output records=22948


Con Combiner:

Map input records=1928
Map output records=187029
Combine input records=219738
Combine output records=55657
Reduce input records=22948
Reduce output records=22948   



Como he comentado anteriormente el Combiner se ejecuta a la salida del Mapper, antes de que los datos se transmitan por la red hacia el Reducer, la conclusión es que la cantidad de datos transmitida y por tanto el input del Reducer, es considerablemente menor si utilizamos ese Combiner.

Algo a tener en cuenta en el código publicado, es que he hecho un Combiner del WordCount, el código que contiene la clase WordCountCombiner.java es exactamente igual que el código de la clase WordCountReducer.java, en situaciones como esta no haría falta crear esa nueva clase, bastaría con definir en el Driver:

  job.setCombinerClass (WordCountReducer.class);

Así que creé una clase por separado para mostrar que es posible hacer el Combiner en una clase distinta (extendiendo de Reducer), recordando que no debe haber "código sensible" en ella ya que el Combiner puede llegar a ejecutarse varias veces o puede llegar a no ejecutarse.