Recuerda que puedes descargarte algunos de los ejemplos en la pestaña de Código Fuente

sábado, 29 de junio de 2013

Counters

Los Counters se utilizan para que los mappers o los reducers pasen valores al Driver de forma que éstos se van sumando. Puede servir para analizar ciertos eventos que suceden durante la ejecución de jobs MapReduce, como registros erróneos, o contar el número de veces que aparece cierto tipo de datos.

Cuando todas las tareas Map o Reduce han finalizado se suman todos los valores del contador. Nunca tenemos que fiarnos de los valores intermedios que vayan generando las tareas si estamos mirando la evolución del job a través de la página de administración, ya que a causa de la speculative execution unos contadores se pueden sumar temporalmente de forma repetida.

El contador se utiliza añadiendo en el mapper o reducer la línea

context.getCounter(group, name).increment(cantidad);


El valor del contador se recupera en el driver a través de:

long tipoGrupo = job.getCounters().findCounter(group, name).getValue();

Además de recuperar los resultados en el driver, éstos también se pueden ver a través de la página web del jobtracker (en el puerto 50030) y a través del shell si se ha ejecutado el job map/reduce desde la línea de comando.


Vemos mejor el Counter en un ejemplo en el cual, a partir de un fichero scores.txt en el que tenemos información sobre jugadores, fechas del juego y sus puntuaciones, en el programa, además de contar el número de palabras que hay en el fichero, queremos saber cuántas veces aparecen algunos de los jugadores (Ana, Pepe, Maria y Pablo) y también, para estos jugadores, cuándo se ha hecho un registro incorrecto y no tienen asociada una puntuación (en el ejemplo siguiente Ana, María y Pepe aparecen y sumarían 1 cada uno, y además Ana sumaría un error):

01-11-2012 Pepe Perez Gonzalez 21
01-11-2012 Ana Lopez Fernandez
01-11-2012 Maria Garcia Martinez 11
...


En el ejemplo también aprovecho para aplicar el paso de parámetros que habíamos visto en una entrada anterior.


public class WordCounterMapper extends 
  Mapper <LongWritable, Text, Text, IntWritable> {

  private final static IntWritable cuenta = new IntWritable(1);
  private Text palabra = new Text();
  private String GRUPO_JUGADOR;
  private String GRUPO_ERROR;
 
  @Override
  protected void setup(Context context) throws IOException,
   InterruptedException {
 Configuration conf = context.getConfiguration();
  
 GRUPO_JUGADOR = conf.getStrings("grupos")[0];
 GRUPO_ERROR = conf.getStrings("grupos")[1];
  }


  @Override
  public void map(LongWritable key, Text values, Context context) 
     throws IOException, InterruptedException{
 
  String linea = values.toString();
  String[] elems = linea.split("\t");
    
  for(String word : elems){
     if (word.length() > 0){
      String player = "";
      if(word.contains("Ana")){
       player = "Ana";
      }else if(word.contains("Pepe")){
       player = "Pepe";
      }else if(word.contains("Maria")){
       player = "Maria";
      }else if(word.contains("Pablo")){
       player = "Pablo";
      }
      
      if(!"".equals(player)){
       context.getCounter(GRUPO_JUGADOR, player).increment(1);
       if(elems.length < 3){
        context.getCounter(GRUPO_ERROR, player).increment(1);
       }
      }
      
      palabra.set(word);
      context.write(palabra, cuenta);
      }
  }

  }
}

public class WordCounterDriver extends Configured implements Tool{

 public int run(String[] args) throws Exception {

  Configuration conf = new Configuration();
  conf.setStrings("grupos", GRUPO_JUGADOR, GRUPO_ERROR);
  
  Job job = new Job(conf);
  job.setJarByClass(WordCounterDriver.class);
  
  job.setJobName("Word Count");
  
  job.setMapperClass(WordCounterMapper.class);
  job.setReducerClass(WordCounterReducer.class);

  FileInputFormat.setInputPaths(job, new Path("pruebas/score.txt"));
  FileOutputFormat.setOutputPath(job, new Path("pruebas/out"));
  
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);
  
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(IntWritable.class);
  
  boolean success = job.waitForCompletion(true);
  
  long tipoAna = job.getCounters().findCounter(GRUPO_JUGADOR, "Ana").getValue();
  long tipoPepe = job.getCounters().findCounter(GRUPO_JUGADOR, "Pepe").getValue();
  long tipoMaria = job.getCounters().findCounter(GRUPO_JUGADOR, "Maria").getValue();
  long tipoPablo = job.getCounters().findCounter(GRUPO_JUGADOR, "Pablo").getValue();
  
  long tipoErrorAna = job.getCounters().findCounter(GRUPO_ERROR, "Ana").getValue();
  long tipoErrorPepe = job.getCounters().findCounter(GRUPO_ERROR, "Pepe").getValue();
  long tipoErrorMaria = job.getCounters().findCounter(GRUPO_ERROR, "Maria").getValue();
  long tipoErrorPablo = job.getCounters().findCounter(GRUPO_ERROR, "Pablo").getValue();
  
  System.out.println("Ana:   "+tipoAna+" - Errores: "+tipoErrorAna);
  System.out.println("Pepe:  "+tipoPepe+" - Errores: "+tipoErrorPepe);
  System.out.println("Maria: "+tipoMaria+" - Errores: "+tipoErrorMaria);
  System.out.println("Pablo: "+tipoPablo+" - Errores: "+tipoErrorPablo);
  
  return (success ? 0:1); 
 }
  
 public static void main(String[] args) throws Exception {
     int exitCode = ToolRunner.run(new Configuration(), new WordCounterDriver(), args);
     System.exit(exitCode);
 }

A tener en cuenta, no es necesario que el contador vaya sumando de 1 en 1, se pueden poner otros valores e incluso se puede hacer decrementar este valor poniendo un número negativo.

domingo, 16 de junio de 2013

Sqoop: Importar o exportar datos de HDFS a BBDD relacionales

Sqoop es el primer software del ecosistema de Hadoop que voy a ver.


Una de las grandes ventajas de Hadoop es que puede trabajar con datos cuya procedencia pueden ser distintos tipos de formas. Pero para acceder a estos datos Hadoop suele necesitar APIs externas para tratarlos y el que Hadoop acceda a la fuente original no suele ser muy eficiente, así que se han buscado formas para importar estos datos al sistema de ficheros HDFS.

Una de las formas en las que nos solemos encontrar los datos es en sistemas de bases de datos relacionales (RDBMS) y para extraer esta información se utiliza Apache Sqoop.

Sqoop es un software de open source que permite extraer datos de sistemas de almacenamiento de datos estructurados e importarlos en Hadoop para su posterior procesamiento. Permite exportar estos datos a formato texto, Sequence Files o a Avro. También dispone de opciones para en vez de escribir en el sistema de ficheros HDFS, escribir los datos en Hive.
También permite exportar datos de HDFS e importarlos a una BBDD relacional.

Con Sqoop puedes importar todas las tablas, una o incluso sólo porciones de ellas. Se puede configurar de tal forma que haga una importación inicial y posteriormente haga actualizaciones incrementales.

Utiliza un job MapReduce para hacer esa importación, dispone de 4 Mappers por defecto (es configurable) y los datos los importa a ficheros de texto o a Sequence Files.

Instalación de Sqoop

Esta instalación es sólo para la versión 1.x de Sqoop, y no para la 2.

Prerequisitos:

- Java 1.6
- Hadoop 0.20x o más instalado.
- Un RDBMS, en mi caso dispongo de MAMP con MySQL

Para instalarlo primero tenemos que descargar la última versión del software (en este momento la 1.4.3) desde este enlace.
Y lo descomprimimos en nuestra carpeta donde estamos trabajando con hadoop, en mi caso /usr/local/hadoop/sqoop-1.4.3.bin__hadoop-1.0.0

Una vez descomprimido nos posicionamos a través del terminal en la carpeta sqoop-1.4.3.bin__hadoop-1.0.0 y hay que compilarlo introduciendo el comando

ant -Dhadoopversion=20

-Dhadoopversion=20 se indica ya que al instalar Hadoop éste se instala con las versiones CDH3 y CDH4 -la vieja y la nueva-, a través de este comando indicamos que nos descargamos y estamos configurando sqoop para la versión CDH4, si no lo indicamos configuraríamos sqoop para la versión CDH3 y a la hora de realizar pruebas nos daría error.

Configuramos el PATH con el sqoop

export SQOOP_HOME=/usr/local/hadoop/sqoop-1.4.3.bin__hadoop-1.0.0
export PATH=$SQOOP_HOME/bin:$PATH


Y por último comprobamos que Sqoop funciona introduciendo el comando

sqoop help


Si no hemos incluído el HADOOP_HOME en el PATH del sistema, tendremos que configurarlo a través del fichero de configuración.
En la carpeta  /usr/local/hadoop/sqoop-1.4.3.bin__hadoop-1.0.0/conf/ nos encontramos con el fichero sqoop-env-template.sh, habrá que renombrarlo a sqoop-env.sh y editarlo quedando de la forma:

#Set path to where bin/hadoop is available
export HADOOP_COMMON_HOME=/usr/local/hadoop/hadoop-1.0.4
#Set path to where hadoop-*-core.jar is available
export HADOOP_MAPRED_HOME=/usr/local/hadoop/hadoop-1.0.4



Ahora, para poder conectar a la base de datos de MySQL necesitamos descargar el Driver en este enlace, descomprimimos el fichero y ponemos la librería mysql-connector-java-5.1.24-bin.jar en la carpeta /usr/local/hadoop/sqoop-1.4.3.bin__hadoop-1.0.0/lib.

Para comprobar que conecta ejecutamos el comando de listar para ver qué esquemas tenemos en la base de datos

sqoop list-databases --connect jdbc:mysql://localhost --username root --password root



Para probar que sqoop funciona importaremos una tabla.
He creado un esquema llamado 'HADOOP' y en él una tabla llamada 'score', en ella he introducido los datos de puntuaciones de juegos que he utilizado de ejemplo en otros artículos (os dejo el script de creación de la tabla y la inserción de los datos).
A continuación (anteriormente tenemos que haber arrancado los demonios hadoop) introducimos el comando:

sqoop import --connect jdbc:mysql://localhost/HADOOP --table score --username root --password root


Veremos cómo se van a ejecutar las tareas MapReduce y si ejecutamos el comando de listar el sistema de ficheros HDFS (hadoop fs -ls) veremos que se ha creado una carpeta score que contendrá:


elena$ hadoop fs -ls score
Found 6 items
-rw-r--r--   1 elena supergroup          0 2013-04-10 19:08 /user/elena/score/_SUCCESS
drwxr-xr-x   - elena supergroup          0 2013-04-10 19:06 /user/elena/score/_logs
-rw-r--r--   1 elena supergroup        333 2013-04-10 19:08 /user/elena/score/part-m-00000
-rw-r--r--   1 elena supergroup        344 2013-04-10 19:08 /user/elena/score/part-m-00001
-rw-r--r--   1 elena supergroup        306 2013-04-10 19:08 /user/elena/score/part-m-00002
-rw-r--r--   1 elena supergroup        342 2013-04-10 19:08 /user/elena/score/part-m-00003



Como comenté anteriormente, Sqoop por defecto está configurado con 4 mappers, es por esa razón que ha creado los 4 ficheros part. Si queremos que sólo se ejecute uno para tener una única salida ejecutaríamos el comando sqoop import añadiendo el atributo -m 1

elena$ sqoop import --connect jdbc:mysql://localhost/HADOOP --table score --username root --password root -m 1

elena$ hadoop fs -ls score
Found 3 items
-rw-r--r--   1 elena supergroup          0 2013-04-10 19:13 /user/elena/score/_SUCCESS
drwxr-xr-x   - elena supergroup          0 2013-04-10 19:13 /user/elena/score/_logs
-rw-r--r--   1 elena supergroup       1325 2013-04-10 19:13 /user/elena/score/part-m-00000


Si listamos estarán todos los datos que teníamos en nuestra base de datos MySQL:

elena$ hadoop fs -cat score/part-m-00000
1,2012-11-01,Pepe Perez Gonzalez,21
2,2012-11-01,Ana Lopez Fernandez,14
3,2012-11-01,Maria Garcia Martinez,11
4,2012-11-01,Pablo Sanchez Rodriguez,9
5,2012-11-01,Angel Martin Hernandez,3
6,2012-11-15,Pepe Perez Gonzalez,22
7,2012-11-15,Maria Garcia Martinez,15
8,2012-11-15,Cristina Ruiz Gomez,23
9,2012-12-01,Pepe Perez Gonzalez,25
10,2012-12-01,Ana Lopez Fernandez,15
11,2012-12-01,Pablo Sanchez Rodriguez,8
...