Recuerda que puedes descargarte algunos de los ejemplos en la pestaña de Código Fuente

sábado, 29 de junio de 2013

Counters

Los Counters se utilizan para que los mappers o los reducers pasen valores al Driver de forma que éstos se van sumando. Puede servir para analizar ciertos eventos que suceden durante la ejecución de jobs MapReduce, como registros erróneos, o contar el número de veces que aparece cierto tipo de datos.

Cuando todas las tareas Map o Reduce han finalizado se suman todos los valores del contador. Nunca tenemos que fiarnos de los valores intermedios que vayan generando las tareas si estamos mirando la evolución del job a través de la página de administración, ya que a causa de la speculative execution unos contadores se pueden sumar temporalmente de forma repetida.

El contador se utiliza añadiendo en el mapper o reducer la línea

context.getCounter(group, name).increment(cantidad);


El valor del contador se recupera en el driver a través de:

long tipoGrupo = job.getCounters().findCounter(group, name).getValue();

Además de recuperar los resultados en el driver, éstos también se pueden ver a través de la página web del jobtracker (en el puerto 50030) y a través del shell si se ha ejecutado el job map/reduce desde la línea de comando.


Vemos mejor el Counter en un ejemplo en el cual, a partir de un fichero scores.txt en el que tenemos información sobre jugadores, fechas del juego y sus puntuaciones, en el programa, además de contar el número de palabras que hay en el fichero, queremos saber cuántas veces aparecen algunos de los jugadores (Ana, Pepe, Maria y Pablo) y también, para estos jugadores, cuándo se ha hecho un registro incorrecto y no tienen asociada una puntuación (en el ejemplo siguiente Ana, María y Pepe aparecen y sumarían 1 cada uno, y además Ana sumaría un error):

01-11-2012 Pepe Perez Gonzalez 21
01-11-2012 Ana Lopez Fernandez
01-11-2012 Maria Garcia Martinez 11
...


En el ejemplo también aprovecho para aplicar el paso de parámetros que habíamos visto en una entrada anterior.


public class WordCounterMapper extends 
  Mapper <LongWritable, Text, Text, IntWritable> {

  private final static IntWritable cuenta = new IntWritable(1);
  private Text palabra = new Text();
  private String GRUPO_JUGADOR;
  private String GRUPO_ERROR;
 
  @Override
  protected void setup(Context context) throws IOException,
   InterruptedException {
 Configuration conf = context.getConfiguration();
  
 GRUPO_JUGADOR = conf.getStrings("grupos")[0];
 GRUPO_ERROR = conf.getStrings("grupos")[1];
  }


  @Override
  public void map(LongWritable key, Text values, Context context) 
     throws IOException, InterruptedException{
 
  String linea = values.toString();
  String[] elems = linea.split("\t");
    
  for(String word : elems){
     if (word.length() > 0){
      String player = "";
      if(word.contains("Ana")){
       player = "Ana";
      }else if(word.contains("Pepe")){
       player = "Pepe";
      }else if(word.contains("Maria")){
       player = "Maria";
      }else if(word.contains("Pablo")){
       player = "Pablo";
      }
      
      if(!"".equals(player)){
       context.getCounter(GRUPO_JUGADOR, player).increment(1);
       if(elems.length < 3){
        context.getCounter(GRUPO_ERROR, player).increment(1);
       }
      }
      
      palabra.set(word);
      context.write(palabra, cuenta);
      }
  }

  }
}

public class WordCounterDriver extends Configured implements Tool{

 public int run(String[] args) throws Exception {

  Configuration conf = new Configuration();
  conf.setStrings("grupos", GRUPO_JUGADOR, GRUPO_ERROR);
  
  Job job = new Job(conf);
  job.setJarByClass(WordCounterDriver.class);
  
  job.setJobName("Word Count");
  
  job.setMapperClass(WordCounterMapper.class);
  job.setReducerClass(WordCounterReducer.class);

  FileInputFormat.setInputPaths(job, new Path("pruebas/score.txt"));
  FileOutputFormat.setOutputPath(job, new Path("pruebas/out"));
  
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);
  
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(IntWritable.class);
  
  boolean success = job.waitForCompletion(true);
  
  long tipoAna = job.getCounters().findCounter(GRUPO_JUGADOR, "Ana").getValue();
  long tipoPepe = job.getCounters().findCounter(GRUPO_JUGADOR, "Pepe").getValue();
  long tipoMaria = job.getCounters().findCounter(GRUPO_JUGADOR, "Maria").getValue();
  long tipoPablo = job.getCounters().findCounter(GRUPO_JUGADOR, "Pablo").getValue();
  
  long tipoErrorAna = job.getCounters().findCounter(GRUPO_ERROR, "Ana").getValue();
  long tipoErrorPepe = job.getCounters().findCounter(GRUPO_ERROR, "Pepe").getValue();
  long tipoErrorMaria = job.getCounters().findCounter(GRUPO_ERROR, "Maria").getValue();
  long tipoErrorPablo = job.getCounters().findCounter(GRUPO_ERROR, "Pablo").getValue();
  
  System.out.println("Ana:   "+tipoAna+" - Errores: "+tipoErrorAna);
  System.out.println("Pepe:  "+tipoPepe+" - Errores: "+tipoErrorPepe);
  System.out.println("Maria: "+tipoMaria+" - Errores: "+tipoErrorMaria);
  System.out.println("Pablo: "+tipoPablo+" - Errores: "+tipoErrorPablo);
  
  return (success ? 0:1); 
 }
  
 public static void main(String[] args) throws Exception {
     int exitCode = ToolRunner.run(new Configuration(), new WordCounterDriver(), args);
     System.exit(exitCode);
 }

A tener en cuenta, no es necesario que el contador vaya sumando de 1 en 1, se pueden poner otros valores e incluso se puede hacer decrementar este valor poniendo un número negativo.

domingo, 16 de junio de 2013

Sqoop: Importar o exportar datos de HDFS a BBDD relacionales

Sqoop es el primer software del ecosistema de Hadoop que voy a ver.


Una de las grandes ventajas de Hadoop es que puede trabajar con datos cuya procedencia pueden ser distintos tipos de formas. Pero para acceder a estos datos Hadoop suele necesitar APIs externas para tratarlos y el que Hadoop acceda a la fuente original no suele ser muy eficiente, así que se han buscado formas para importar estos datos al sistema de ficheros HDFS.

Una de las formas en las que nos solemos encontrar los datos es en sistemas de bases de datos relacionales (RDBMS) y para extraer esta información se utiliza Apache Sqoop.

Sqoop es un software de open source que permite extraer datos de sistemas de almacenamiento de datos estructurados e importarlos en Hadoop para su posterior procesamiento. Permite exportar estos datos a formato texto, Sequence Files o a Avro. También dispone de opciones para en vez de escribir en el sistema de ficheros HDFS, escribir los datos en Hive.
También permite exportar datos de HDFS e importarlos a una BBDD relacional.

Con Sqoop puedes importar todas las tablas, una o incluso sólo porciones de ellas. Se puede configurar de tal forma que haga una importación inicial y posteriormente haga actualizaciones incrementales.

Utiliza un job MapReduce para hacer esa importación, dispone de 4 Mappers por defecto (es configurable) y los datos los importa a ficheros de texto o a Sequence Files.

Instalación de Sqoop

Esta instalación es sólo para la versión 1.x de Sqoop, y no para la 2.

Prerequisitos:

- Java 1.6
- Hadoop 0.20x o más instalado.
- Un RDBMS, en mi caso dispongo de MAMP con MySQL

Para instalarlo primero tenemos que descargar la última versión del software (en este momento la 1.4.3) desde este enlace.
Y lo descomprimimos en nuestra carpeta donde estamos trabajando con hadoop, en mi caso /usr/local/hadoop/sqoop-1.4.3.bin__hadoop-1.0.0

Una vez descomprimido nos posicionamos a través del terminal en la carpeta sqoop-1.4.3.bin__hadoop-1.0.0 y hay que compilarlo introduciendo el comando

ant -Dhadoopversion=20

-Dhadoopversion=20 se indica ya que al instalar Hadoop éste se instala con las versiones CDH3 y CDH4 -la vieja y la nueva-, a través de este comando indicamos que nos descargamos y estamos configurando sqoop para la versión CDH4, si no lo indicamos configuraríamos sqoop para la versión CDH3 y a la hora de realizar pruebas nos daría error.

Configuramos el PATH con el sqoop

export SQOOP_HOME=/usr/local/hadoop/sqoop-1.4.3.bin__hadoop-1.0.0
export PATH=$SQOOP_HOME/bin:$PATH


Y por último comprobamos que Sqoop funciona introduciendo el comando

sqoop help


Si no hemos incluído el HADOOP_HOME en el PATH del sistema, tendremos que configurarlo a través del fichero de configuración.
En la carpeta  /usr/local/hadoop/sqoop-1.4.3.bin__hadoop-1.0.0/conf/ nos encontramos con el fichero sqoop-env-template.sh, habrá que renombrarlo a sqoop-env.sh y editarlo quedando de la forma:

#Set path to where bin/hadoop is available
export HADOOP_COMMON_HOME=/usr/local/hadoop/hadoop-1.0.4
#Set path to where hadoop-*-core.jar is available
export HADOOP_MAPRED_HOME=/usr/local/hadoop/hadoop-1.0.4



Ahora, para poder conectar a la base de datos de MySQL necesitamos descargar el Driver en este enlace, descomprimimos el fichero y ponemos la librería mysql-connector-java-5.1.24-bin.jar en la carpeta /usr/local/hadoop/sqoop-1.4.3.bin__hadoop-1.0.0/lib.

Para comprobar que conecta ejecutamos el comando de listar para ver qué esquemas tenemos en la base de datos

sqoop list-databases --connect jdbc:mysql://localhost --username root --password root



Para probar que sqoop funciona importaremos una tabla.
He creado un esquema llamado 'HADOOP' y en él una tabla llamada 'score', en ella he introducido los datos de puntuaciones de juegos que he utilizado de ejemplo en otros artículos (os dejo el script de creación de la tabla y la inserción de los datos).
A continuación (anteriormente tenemos que haber arrancado los demonios hadoop) introducimos el comando:

sqoop import --connect jdbc:mysql://localhost/HADOOP --table score --username root --password root


Veremos cómo se van a ejecutar las tareas MapReduce y si ejecutamos el comando de listar el sistema de ficheros HDFS (hadoop fs -ls) veremos que se ha creado una carpeta score que contendrá:


elena$ hadoop fs -ls score
Found 6 items
-rw-r--r--   1 elena supergroup          0 2013-04-10 19:08 /user/elena/score/_SUCCESS
drwxr-xr-x   - elena supergroup          0 2013-04-10 19:06 /user/elena/score/_logs
-rw-r--r--   1 elena supergroup        333 2013-04-10 19:08 /user/elena/score/part-m-00000
-rw-r--r--   1 elena supergroup        344 2013-04-10 19:08 /user/elena/score/part-m-00001
-rw-r--r--   1 elena supergroup        306 2013-04-10 19:08 /user/elena/score/part-m-00002
-rw-r--r--   1 elena supergroup        342 2013-04-10 19:08 /user/elena/score/part-m-00003



Como comenté anteriormente, Sqoop por defecto está configurado con 4 mappers, es por esa razón que ha creado los 4 ficheros part. Si queremos que sólo se ejecute uno para tener una única salida ejecutaríamos el comando sqoop import añadiendo el atributo -m 1

elena$ sqoop import --connect jdbc:mysql://localhost/HADOOP --table score --username root --password root -m 1

elena$ hadoop fs -ls score
Found 3 items
-rw-r--r--   1 elena supergroup          0 2013-04-10 19:13 /user/elena/score/_SUCCESS
drwxr-xr-x   - elena supergroup          0 2013-04-10 19:13 /user/elena/score/_logs
-rw-r--r--   1 elena supergroup       1325 2013-04-10 19:13 /user/elena/score/part-m-00000


Si listamos estarán todos los datos que teníamos en nuestra base de datos MySQL:

elena$ hadoop fs -cat score/part-m-00000
1,2012-11-01,Pepe Perez Gonzalez,21
2,2012-11-01,Ana Lopez Fernandez,14
3,2012-11-01,Maria Garcia Martinez,11
4,2012-11-01,Pablo Sanchez Rodriguez,9
5,2012-11-01,Angel Martin Hernandez,3
6,2012-11-15,Pepe Perez Gonzalez,22
7,2012-11-15,Maria Garcia Martinez,15
8,2012-11-15,Cristina Ruiz Gomez,23
9,2012-12-01,Pepe Perez Gonzalez,25
10,2012-12-01,Ana Lopez Fernandez,15
11,2012-12-01,Pablo Sanchez Rodriguez,8
...









lunes, 27 de mayo de 2013

Patrón de Diseño "In-Mapper Combining" para programación de algoritmos MapReduce en Java

En esta entrada voy a tratar la otra técnica de Local Aggregation que comenté en la entrada anterior sobre Combiners, utilizada para optimizar los Job MapReduce y reducir la cantidad de tráfico entre el Mapper y el Reducer, el In-mapper combining, que es más bien un patrón de diseño para algoritmos MapReduce.
Este algoritmo nos lo proponen en el libro de J. Lin and C. Dyer "Data-Intensive Text Processing with MapReduce"

La meta de esta técnica es, como ya he dicho, el reducir el número de pares key/value que salen del Mapper y se envían al Reducer.

La ventaja del in-mapper combining sobre el Combiner tradicional que vimos en la entrada anterior, es que el primero puede que se ejecute o puede que no, no tenemos control sobre ello. Mientras que el in-mapper combining podemos hacer que se ejecute siempre. La segunda ventaja de este patrón de diseño es controlar cómo se lleva a cabo exactamente.

Otra ventaja es que con el Combiner se reduce el número de datos que llegan al Shuffle and Sort para luego enviarlos al Reducer, pero realmente no reduce el número de pares key/value emitidos por el Mapper.

Al in-mapper combining más bien se le considera como un patrón de diseño a la hora de desarrollar algoritmos MapReduce en Hadoop, es decir, una técnica a la hora de programar ampliando el algoritmo implementado en el Mapper para reducir el número de pares key/value que emite.

Esta técnica se puede dividir en dos formas: una local y una global.

Volviendo al ejemplo más básico que tenemos, el WordCount, en el que tenemos una clase Mapper y una Reducer, con esta técnica el Reducer va a ser exactamente igual, pero en el Mapper vamos a introducir directamente la funcionalidad del Combiner, más bien en lo que se basa. Y esto lo hace utilizando un objeto de tipo Map.
Este se refiere al algoritmo local, ya que es local respecto al método.
Con esta técnica hemos reducido desde el principio el número de pares key/value que emite el Mapper y que llegan al Shuffle and Sort para posteriormente ser enviados al Reducer.

 
public class WordCountMapper extends 
  Mapper <LongWritable, Text, Text, IntWritable>{

 private IntWritable cuenta = new IntWritable();
 private Text palabra = new Text();

 @Override
 public void map(LongWritable key, Text values, Context context) 
     throws IOException, InterruptedException{
     
   String linea = values.toString();
   Map<String, Integer> myWordMap = new HashMap<String, Integer>();  
   for(String word : linea.split(" ")){
    int sum = 1;
    if(myWordMap.containsKey(word)){
     sum += myWordMap.get(word);
    }
    myWordMap.put(word, sum);
   }
   
   Iterator<String> iterator = myWordMap.keySet().iterator();
   while (iterator.hasNext()){
    palabra.set(iterator.next());
    cuenta.set(myWordMap.get(palabra.toString()));
    context.write(palabra, cuenta);
   }
  }
}


Este método todavía conlleva a que el Mapper cree y destruya múltiples objetos o realice otras tareas que ocupan memoria de una forma considerable, y es por ello que se debe optimizar llegando a la forma global, de tal forma que el Mapper sólo emitirá los pares key/value necesarios al Shuffle and Sort y al Reducer.

Esa optimización se hace utilizando los métodos setup() y cleanup(), que como ya comenté en la entrada sobre estos métodos, se van a ejecutar sólo una vez, el primero antes de que todas las tareas map comiencen y el segundo justo cuando todas las tareas map finalizan y antes de que el Mapper sea destruído.

 
public class WordCountMapper extends 
  Mapper <LongWritable, Text, Text, IntWritable> {

 Map<String, Integer> myWordMap;
 @Override
 protected void setup(Context context)
   throws IOException, InterruptedException {
  if(myWordMap == null )
   myWordMap = new HashMap<String, Integer>();
 }
 
 @Override
  public void map(LongWritable key, Text values, Context context) 
     throws IOException, InterruptedException{
     
   String linea = values.toString();   
   for(String word : linea.split(" ")){
    int sum = 1;
    if(myWordMap.containsKey(word)){
     sum += myWordMap.get(word);
    }
    myWordMap.put(word, sum);
   }
  }

 @Override
 protected void cleanup(Context context)
   throws IOException, InterruptedException {
  Iterator<String> iterator = myWordMap.keySet().iterator();
  IntWritable cuenta = new IntWritable();
  Text palabra = new Text();
    
  while (iterator.hasNext()){
   palabra.set(iterator.next());
   cuenta.set(myWordMap.get(palabra.toString()));
   context.write(palabra, cuenta);
  }
 }
}

Desarrollar la clase Mapper de esta forma mejora la opción anterior pero sigue teniendo problemas de memoria.


Recordemos que cuando usamos el Combiner, primero el Mapper va a tratar los datos y va a emitir los pares key/value, los va a escribir en el disco local y después va a ser el Combiner quien va a actuar.
Si implementamos el algoritmo anterior nos puede surgir un problema con la memoria, ya que si estamos hablando de TB de datos, o no hay coincidencia en las palabras y se genera un registro para cada palabra, estos se van a cargar en el HashMap en memoria y no se van a liberar hasta que todas las tareas map han finalizado y el objeto HashMap se destruya.

La solución para limitar la memoria es "bloquear" la entrada de pares key/value y liberar la estructura HashMap cada cierto tiempo.
Así que la idea es en vez de emitir los datos intermedios una vez que todos los pares key/value han sido tratados, emitir los datos intermedios después de haber tratado N pares key/value.
Esto tendría una fácil solución, que es poner un contador FLUSH_COUNTER, y cuando el número de pares key/value tratados llegue a ese contador, emitir los datos y vaciar el HashMap.

 
public class WordCountMapper extends 
  Mapper <LongWritable, Text, Text, IntWritable> {

 private static Map<String, Integer> myWordMap;
 private static final int FLUSH_COUNTER = 1000;
  
 @Override
 protected void setup(Context context)
   throws IOException, InterruptedException {
  if(myWordMap == null )
   myWordMap = new HashMap<String, Integer>();
 }
 
 @Override
  public void map(LongWritable key, Text values, Context context) 
     throws IOException, InterruptedException{

  String linea = values.toString();
   for(String word : linea.split(" ")){
    if(myWordMap.containsKey(word)){
     myWordMap.put(word, myWordMap.get(word)+1);
    }else{
     myWordMap.put(word, 1);
    }
   }
   
   if(myWordMap.size() >= FLUSH_COUNTER)
    flush(context);
  }

 private void flush(Context context) throws IOException, InterruptedException{
  Iterator<String> iterator = myWordMap.keySet().iterator();
  IntWritable cuenta = new IntWritable();
  Text palabra = new Text();
    
  while (iterator.hasNext()){
   palabra.set(iterator.next());
   cuenta.set(myWordMap.get(palabra.toString()));
   context.write(palabra, cuenta);
  }
  
  myWordMap.clear();
 }

 @Override
 protected void cleanup(Context context)
   throws IOException, InterruptedException {
  flush(context);
 }
}



Estamos en un punto donde podemos seguir haciéndonos preguntas una tras otra para optimizar este algoritmo y aplicar este patrón de diseño correctamente, y la siguiente cuestión sería ¿cómo sabemos cuál es el número FLUSH_COUNT de pares que hay que tratar antes de que la memoria falle?.
El libro de referencia propone que más que crear un contador de pares key/value, poner un límite a la memoria que se utiliza y en el momento que el HashMap ha llegado a ese límite se liberaría. El problema es que en Java no es fácil determinar de forma rápida el tamaño de la memoria ocupada por un objeto, por eso, es mejor elegir un FLUSH_COUNT lo más alto posible y teniendo cuidado que el HashMap no provoque un Out Of Memory.