Recuerda que puedes descargarte algunos de los ejemplos en la pestaña de Código Fuente

miércoles, 10 de julio de 2013

Técnicas de Debugging y Logging

Debugging

Depurar los programas MapReduce es muy complicado, ya que cada instancia de un Mapper se ejecuta en una tarea diferente y quizás en máquinas diferentes, así que adjuntar una técnica de debug en el proceso no es tarea fácil.

El problema es que con cantidades tan grandes de datos puede haber una entrada inesperada o errónea y es un evento que hay que tener en cuenta y controlar.
Porque piénsalo de esta forma, estamos realizando un proyecto de Big Data, eso quiere decir que estamos tratando tales cantidades de datos que los procesos pueden tardar horas o incluso días, imaginemos que a mitad de ese proceso que tarda días hay una entrada de un dato inesperado y el programa falla por no haber tomado ninguna medida.
La conclusión es que al desarrollador sólo le queda tomar una serie de estrategias para evitar este tipo errores:
  • Código defensivo: siempre suponer que un dato puede ser incorrecto o esperar que las cosas vayan mal. Habría que añadir control de excepciones o código que controle los posibles fallos.
  • Empieza un desarrollo pequeño y ve agrandándolo poco a poco
  • Escribe pruebas unitarias, utilizando, por ejemplo, MRunit.
  • Prueba primero en local con cantidades reducidas de datos, luego pasa a modo pseudodistribuído (asegurándote que el entorno es similar al del clúster) y finalmente prueba en todo el clúster.
  • Utiliza Counters, por ejemplo, para conocer el número de entradas inválidas.

Para hacer pruebas en local no se usa HDFS, se usa el sistema de ficheros local y sólo se ejecuta un proceso llamado LocalJobRunner.


Logging

Como logging lo más efectivo es usar librerías como el log4java, ya que éste se puede configurar en niveles y desactivarlo en situaciones de producción.

Como buenas prácticas nunca se debería usar println como estrategia de loggin cuando hacemos nuestras primeras pruebas, ya que posteriormente deberíamos borrarlo en todo el código. Si dejáramos estas líneas y pasáramos el código al clúster en modo producción, podría afectar considerablemente a los tiempos de ejecución, piensa que son procesos que necesitamos que sean lo más rápidos posibles y el realizar un println supone un consumo de tiempo ejecutando el comando y realizando la escritura.
En el clúster deben estar desactivadas todas las escrituras de logging innecesarias (no perjudica el rendimiento el hacer loggin en los métodos setup y cleanup).







sábado, 29 de junio de 2013

Counters

Los Counters se utilizan para que los mappers o los reducers pasen valores al Driver de forma que éstos se van sumando. Puede servir para analizar ciertos eventos que suceden durante la ejecución de jobs MapReduce, como registros erróneos, o contar el número de veces que aparece cierto tipo de datos.

Cuando todas las tareas Map o Reduce han finalizado se suman todos los valores del contador. Nunca tenemos que fiarnos de los valores intermedios que vayan generando las tareas si estamos mirando la evolución del job a través de la página de administración, ya que a causa de la speculative execution unos contadores se pueden sumar temporalmente de forma repetida.

El contador se utiliza añadiendo en el mapper o reducer la línea

context.getCounter(group, name).increment(cantidad);


El valor del contador se recupera en el driver a través de:

long tipoGrupo = job.getCounters().findCounter(group, name).getValue();

Además de recuperar los resultados en el driver, éstos también se pueden ver a través de la página web del jobtracker (en el puerto 50030) y a través del shell si se ha ejecutado el job map/reduce desde la línea de comando.


Vemos mejor el Counter en un ejemplo en el cual, a partir de un fichero scores.txt en el que tenemos información sobre jugadores, fechas del juego y sus puntuaciones, en el programa, además de contar el número de palabras que hay en el fichero, queremos saber cuántas veces aparecen algunos de los jugadores (Ana, Pepe, Maria y Pablo) y también, para estos jugadores, cuándo se ha hecho un registro incorrecto y no tienen asociada una puntuación (en el ejemplo siguiente Ana, María y Pepe aparecen y sumarían 1 cada uno, y además Ana sumaría un error):

01-11-2012 Pepe Perez Gonzalez 21
01-11-2012 Ana Lopez Fernandez
01-11-2012 Maria Garcia Martinez 11
...


En el ejemplo también aprovecho para aplicar el paso de parámetros que habíamos visto en una entrada anterior.


public class WordCounterMapper extends 
  Mapper <LongWritable, Text, Text, IntWritable> {

  private final static IntWritable cuenta = new IntWritable(1);
  private Text palabra = new Text();
  private String GRUPO_JUGADOR;
  private String GRUPO_ERROR;
 
  @Override
  protected void setup(Context context) throws IOException,
   InterruptedException {
 Configuration conf = context.getConfiguration();
  
 GRUPO_JUGADOR = conf.getStrings("grupos")[0];
 GRUPO_ERROR = conf.getStrings("grupos")[1];
  }


  @Override
  public void map(LongWritable key, Text values, Context context) 
     throws IOException, InterruptedException{
 
  String linea = values.toString();
  String[] elems = linea.split("\t");
    
  for(String word : elems){
     if (word.length() > 0){
      String player = "";
      if(word.contains("Ana")){
       player = "Ana";
      }else if(word.contains("Pepe")){
       player = "Pepe";
      }else if(word.contains("Maria")){
       player = "Maria";
      }else if(word.contains("Pablo")){
       player = "Pablo";
      }
      
      if(!"".equals(player)){
       context.getCounter(GRUPO_JUGADOR, player).increment(1);
       if(elems.length < 3){
        context.getCounter(GRUPO_ERROR, player).increment(1);
       }
      }
      
      palabra.set(word);
      context.write(palabra, cuenta);
      }
  }

  }
}

public class WordCounterDriver extends Configured implements Tool{

 public int run(String[] args) throws Exception {

  Configuration conf = new Configuration();
  conf.setStrings("grupos", GRUPO_JUGADOR, GRUPO_ERROR);
  
  Job job = new Job(conf);
  job.setJarByClass(WordCounterDriver.class);
  
  job.setJobName("Word Count");
  
  job.setMapperClass(WordCounterMapper.class);
  job.setReducerClass(WordCounterReducer.class);

  FileInputFormat.setInputPaths(job, new Path("pruebas/score.txt"));
  FileOutputFormat.setOutputPath(job, new Path("pruebas/out"));
  
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);
  
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(IntWritable.class);
  
  boolean success = job.waitForCompletion(true);
  
  long tipoAna = job.getCounters().findCounter(GRUPO_JUGADOR, "Ana").getValue();
  long tipoPepe = job.getCounters().findCounter(GRUPO_JUGADOR, "Pepe").getValue();
  long tipoMaria = job.getCounters().findCounter(GRUPO_JUGADOR, "Maria").getValue();
  long tipoPablo = job.getCounters().findCounter(GRUPO_JUGADOR, "Pablo").getValue();
  
  long tipoErrorAna = job.getCounters().findCounter(GRUPO_ERROR, "Ana").getValue();
  long tipoErrorPepe = job.getCounters().findCounter(GRUPO_ERROR, "Pepe").getValue();
  long tipoErrorMaria = job.getCounters().findCounter(GRUPO_ERROR, "Maria").getValue();
  long tipoErrorPablo = job.getCounters().findCounter(GRUPO_ERROR, "Pablo").getValue();
  
  System.out.println("Ana:   "+tipoAna+" - Errores: "+tipoErrorAna);
  System.out.println("Pepe:  "+tipoPepe+" - Errores: "+tipoErrorPepe);
  System.out.println("Maria: "+tipoMaria+" - Errores: "+tipoErrorMaria);
  System.out.println("Pablo: "+tipoPablo+" - Errores: "+tipoErrorPablo);
  
  return (success ? 0:1); 
 }
  
 public static void main(String[] args) throws Exception {
     int exitCode = ToolRunner.run(new Configuration(), new WordCounterDriver(), args);
     System.exit(exitCode);
 }

A tener en cuenta, no es necesario que el contador vaya sumando de 1 en 1, se pueden poner otros valores e incluso se puede hacer decrementar este valor poniendo un número negativo.

domingo, 16 de junio de 2013

Sqoop: Importar o exportar datos de HDFS a BBDD relacionales

Sqoop es el primer software del ecosistema de Hadoop que voy a ver.


Una de las grandes ventajas de Hadoop es que puede trabajar con datos cuya procedencia pueden ser distintos tipos de formas. Pero para acceder a estos datos Hadoop suele necesitar APIs externas para tratarlos y el que Hadoop acceda a la fuente original no suele ser muy eficiente, así que se han buscado formas para importar estos datos al sistema de ficheros HDFS.

Una de las formas en las que nos solemos encontrar los datos es en sistemas de bases de datos relacionales (RDBMS) y para extraer esta información se utiliza Apache Sqoop.

Sqoop es un software de open source que permite extraer datos de sistemas de almacenamiento de datos estructurados e importarlos en Hadoop para su posterior procesamiento. Permite exportar estos datos a formato texto, Sequence Files o a Avro. También dispone de opciones para en vez de escribir en el sistema de ficheros HDFS, escribir los datos en Hive.
También permite exportar datos de HDFS e importarlos a una BBDD relacional.

Con Sqoop puedes importar todas las tablas, una o incluso sólo porciones de ellas. Se puede configurar de tal forma que haga una importación inicial y posteriormente haga actualizaciones incrementales.

Utiliza un job MapReduce para hacer esa importación, dispone de 4 Mappers por defecto (es configurable) y los datos los importa a ficheros de texto o a Sequence Files.

Instalación de Sqoop

Esta instalación es sólo para la versión 1.x de Sqoop, y no para la 2.

Prerequisitos:

- Java 1.6
- Hadoop 0.20x o más instalado.
- Un RDBMS, en mi caso dispongo de MAMP con MySQL

Para instalarlo primero tenemos que descargar la última versión del software (en este momento la 1.4.3) desde este enlace.
Y lo descomprimimos en nuestra carpeta donde estamos trabajando con hadoop, en mi caso /usr/local/hadoop/sqoop-1.4.3.bin__hadoop-1.0.0

Una vez descomprimido nos posicionamos a través del terminal en la carpeta sqoop-1.4.3.bin__hadoop-1.0.0 y hay que compilarlo introduciendo el comando

ant -Dhadoopversion=20

-Dhadoopversion=20 se indica ya que al instalar Hadoop éste se instala con las versiones CDH3 y CDH4 -la vieja y la nueva-, a través de este comando indicamos que nos descargamos y estamos configurando sqoop para la versión CDH4, si no lo indicamos configuraríamos sqoop para la versión CDH3 y a la hora de realizar pruebas nos daría error.

Configuramos el PATH con el sqoop

export SQOOP_HOME=/usr/local/hadoop/sqoop-1.4.3.bin__hadoop-1.0.0
export PATH=$SQOOP_HOME/bin:$PATH


Y por último comprobamos que Sqoop funciona introduciendo el comando

sqoop help


Si no hemos incluído el HADOOP_HOME en el PATH del sistema, tendremos que configurarlo a través del fichero de configuración.
En la carpeta  /usr/local/hadoop/sqoop-1.4.3.bin__hadoop-1.0.0/conf/ nos encontramos con el fichero sqoop-env-template.sh, habrá que renombrarlo a sqoop-env.sh y editarlo quedando de la forma:

#Set path to where bin/hadoop is available
export HADOOP_COMMON_HOME=/usr/local/hadoop/hadoop-1.0.4
#Set path to where hadoop-*-core.jar is available
export HADOOP_MAPRED_HOME=/usr/local/hadoop/hadoop-1.0.4



Ahora, para poder conectar a la base de datos de MySQL necesitamos descargar el Driver en este enlace, descomprimimos el fichero y ponemos la librería mysql-connector-java-5.1.24-bin.jar en la carpeta /usr/local/hadoop/sqoop-1.4.3.bin__hadoop-1.0.0/lib.

Para comprobar que conecta ejecutamos el comando de listar para ver qué esquemas tenemos en la base de datos

sqoop list-databases --connect jdbc:mysql://localhost --username root --password root



Para probar que sqoop funciona importaremos una tabla.
He creado un esquema llamado 'HADOOP' y en él una tabla llamada 'score', en ella he introducido los datos de puntuaciones de juegos que he utilizado de ejemplo en otros artículos (os dejo el script de creación de la tabla y la inserción de los datos).
A continuación (anteriormente tenemos que haber arrancado los demonios hadoop) introducimos el comando:

sqoop import --connect jdbc:mysql://localhost/HADOOP --table score --username root --password root


Veremos cómo se van a ejecutar las tareas MapReduce y si ejecutamos el comando de listar el sistema de ficheros HDFS (hadoop fs -ls) veremos que se ha creado una carpeta score que contendrá:


elena$ hadoop fs -ls score
Found 6 items
-rw-r--r--   1 elena supergroup          0 2013-04-10 19:08 /user/elena/score/_SUCCESS
drwxr-xr-x   - elena supergroup          0 2013-04-10 19:06 /user/elena/score/_logs
-rw-r--r--   1 elena supergroup        333 2013-04-10 19:08 /user/elena/score/part-m-00000
-rw-r--r--   1 elena supergroup        344 2013-04-10 19:08 /user/elena/score/part-m-00001
-rw-r--r--   1 elena supergroup        306 2013-04-10 19:08 /user/elena/score/part-m-00002
-rw-r--r--   1 elena supergroup        342 2013-04-10 19:08 /user/elena/score/part-m-00003



Como comenté anteriormente, Sqoop por defecto está configurado con 4 mappers, es por esa razón que ha creado los 4 ficheros part. Si queremos que sólo se ejecute uno para tener una única salida ejecutaríamos el comando sqoop import añadiendo el atributo -m 1

elena$ sqoop import --connect jdbc:mysql://localhost/HADOOP --table score --username root --password root -m 1

elena$ hadoop fs -ls score
Found 3 items
-rw-r--r--   1 elena supergroup          0 2013-04-10 19:13 /user/elena/score/_SUCCESS
drwxr-xr-x   - elena supergroup          0 2013-04-10 19:13 /user/elena/score/_logs
-rw-r--r--   1 elena supergroup       1325 2013-04-10 19:13 /user/elena/score/part-m-00000


Si listamos estarán todos los datos que teníamos en nuestra base de datos MySQL:

elena$ hadoop fs -cat score/part-m-00000
1,2012-11-01,Pepe Perez Gonzalez,21
2,2012-11-01,Ana Lopez Fernandez,14
3,2012-11-01,Maria Garcia Martinez,11
4,2012-11-01,Pablo Sanchez Rodriguez,9
5,2012-11-01,Angel Martin Hernandez,3
6,2012-11-15,Pepe Perez Gonzalez,22
7,2012-11-15,Maria Garcia Martinez,15
8,2012-11-15,Cristina Ruiz Gomez,23
9,2012-12-01,Pepe Perez Gonzalez,25
10,2012-12-01,Ana Lopez Fernandez,15
11,2012-12-01,Pablo Sanchez Rodriguez,8
...