Recuerda que puedes descargarte algunos de los ejemplos en la pestaña de Código Fuente
Mostrando entradas con la etiqueta Hadoop. Mostrar todas las entradas
Mostrando entradas con la etiqueta Hadoop. Mostrar todas las entradas

sábado, 29 de junio de 2013

Counters

Los Counters se utilizan para que los mappers o los reducers pasen valores al Driver de forma que éstos se van sumando. Puede servir para analizar ciertos eventos que suceden durante la ejecución de jobs MapReduce, como registros erróneos, o contar el número de veces que aparece cierto tipo de datos.

Cuando todas las tareas Map o Reduce han finalizado se suman todos los valores del contador. Nunca tenemos que fiarnos de los valores intermedios que vayan generando las tareas si estamos mirando la evolución del job a través de la página de administración, ya que a causa de la speculative execution unos contadores se pueden sumar temporalmente de forma repetida.

El contador se utiliza añadiendo en el mapper o reducer la línea

context.getCounter(group, name).increment(cantidad);


El valor del contador se recupera en el driver a través de:

long tipoGrupo = job.getCounters().findCounter(group, name).getValue();

Además de recuperar los resultados en el driver, éstos también se pueden ver a través de la página web del jobtracker (en el puerto 50030) y a través del shell si se ha ejecutado el job map/reduce desde la línea de comando.


Vemos mejor el Counter en un ejemplo en el cual, a partir de un fichero scores.txt en el que tenemos información sobre jugadores, fechas del juego y sus puntuaciones, en el programa, además de contar el número de palabras que hay en el fichero, queremos saber cuántas veces aparecen algunos de los jugadores (Ana, Pepe, Maria y Pablo) y también, para estos jugadores, cuándo se ha hecho un registro incorrecto y no tienen asociada una puntuación (en el ejemplo siguiente Ana, María y Pepe aparecen y sumarían 1 cada uno, y además Ana sumaría un error):

01-11-2012 Pepe Perez Gonzalez 21
01-11-2012 Ana Lopez Fernandez
01-11-2012 Maria Garcia Martinez 11
...


En el ejemplo también aprovecho para aplicar el paso de parámetros que habíamos visto en una entrada anterior.


public class WordCounterMapper extends 
  Mapper <LongWritable, Text, Text, IntWritable> {

  private final static IntWritable cuenta = new IntWritable(1);
  private Text palabra = new Text();
  private String GRUPO_JUGADOR;
  private String GRUPO_ERROR;
 
  @Override
  protected void setup(Context context) throws IOException,
   InterruptedException {
 Configuration conf = context.getConfiguration();
  
 GRUPO_JUGADOR = conf.getStrings("grupos")[0];
 GRUPO_ERROR = conf.getStrings("grupos")[1];
  }


  @Override
  public void map(LongWritable key, Text values, Context context) 
     throws IOException, InterruptedException{
 
  String linea = values.toString();
  String[] elems = linea.split("\t");
    
  for(String word : elems){
     if (word.length() > 0){
      String player = "";
      if(word.contains("Ana")){
       player = "Ana";
      }else if(word.contains("Pepe")){
       player = "Pepe";
      }else if(word.contains("Maria")){
       player = "Maria";
      }else if(word.contains("Pablo")){
       player = "Pablo";
      }
      
      if(!"".equals(player)){
       context.getCounter(GRUPO_JUGADOR, player).increment(1);
       if(elems.length < 3){
        context.getCounter(GRUPO_ERROR, player).increment(1);
       }
      }
      
      palabra.set(word);
      context.write(palabra, cuenta);
      }
  }

  }
}

public class WordCounterDriver extends Configured implements Tool{

 public int run(String[] args) throws Exception {

  Configuration conf = new Configuration();
  conf.setStrings("grupos", GRUPO_JUGADOR, GRUPO_ERROR);
  
  Job job = new Job(conf);
  job.setJarByClass(WordCounterDriver.class);
  
  job.setJobName("Word Count");
  
  job.setMapperClass(WordCounterMapper.class);
  job.setReducerClass(WordCounterReducer.class);

  FileInputFormat.setInputPaths(job, new Path("pruebas/score.txt"));
  FileOutputFormat.setOutputPath(job, new Path("pruebas/out"));
  
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);
  
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(IntWritable.class);
  
  boolean success = job.waitForCompletion(true);
  
  long tipoAna = job.getCounters().findCounter(GRUPO_JUGADOR, "Ana").getValue();
  long tipoPepe = job.getCounters().findCounter(GRUPO_JUGADOR, "Pepe").getValue();
  long tipoMaria = job.getCounters().findCounter(GRUPO_JUGADOR, "Maria").getValue();
  long tipoPablo = job.getCounters().findCounter(GRUPO_JUGADOR, "Pablo").getValue();
  
  long tipoErrorAna = job.getCounters().findCounter(GRUPO_ERROR, "Ana").getValue();
  long tipoErrorPepe = job.getCounters().findCounter(GRUPO_ERROR, "Pepe").getValue();
  long tipoErrorMaria = job.getCounters().findCounter(GRUPO_ERROR, "Maria").getValue();
  long tipoErrorPablo = job.getCounters().findCounter(GRUPO_ERROR, "Pablo").getValue();
  
  System.out.println("Ana:   "+tipoAna+" - Errores: "+tipoErrorAna);
  System.out.println("Pepe:  "+tipoPepe+" - Errores: "+tipoErrorPepe);
  System.out.println("Maria: "+tipoMaria+" - Errores: "+tipoErrorMaria);
  System.out.println("Pablo: "+tipoPablo+" - Errores: "+tipoErrorPablo);
  
  return (success ? 0:1); 
 }
  
 public static void main(String[] args) throws Exception {
     int exitCode = ToolRunner.run(new Configuration(), new WordCounterDriver(), args);
     System.exit(exitCode);
 }

A tener en cuenta, no es necesario que el contador vaya sumando de 1 en 1, se pueden poner otros valores e incluso se puede hacer decrementar este valor poniendo un número negativo.

miércoles, 8 de mayo de 2013

Uso del Combiner para optimización de Jobs MapReduce

El Combiner es una función propia de Hadoop utilizada para optimizar los job MapReduce.

Cuando en un Job la salida del Mapper genera una gran cantidad de datos intermedios, éstos se tienen que transmitir por la red hacia los Reducer. Si la cantidad de datos es excesivamente grande aquí se puede producir un cuello de botella.

Las técnicas utilizadas para reducir la cantidad de datos y mejorar la eficiencia de los job MapReduce, se llama Local Aggregation.
Existen dos técnicas, el Combiner que vamos a ver en esta entrada y el In-Mapper Combining que veremos en la siguiente entrada.

Una buena solución para estos casos es la de implementar un Combiner, que se ejecuta a la salida de la fase Map y de forma local a este antes de enviar los datos a través de la red.

El Combiner implementa la misma interfaz que el Reducer e incluso muchas veces suele ser la misma clase que el Reducer.
Sin embargo hay que tener cuidado que la operación que se realiza en el Combiner sea asociativa y conmutativa. Por ejemplo la operación suma (es decir, el ejemplo del Wordcount) cumple con estos dos requisitos, pero por ejemplo la operación de cálculo de la media aritmética no es asociativa.

Para configurarlo se hace en el Driver a través de:
  job.setCombinerClass (MyCombiner.class);

¿Se puede implementar un Combiner distinto al Reducer?
Sí, pueden ser distintos. Hay que tener en cuenta que el Combiner va a seguir implementando la interfaz Reducer. Y también hay que tener cuidado de no poner dentro algún tipo de código "sensible" ya que el Combiner se puede ejecutar cero, una o más veces a la salida desde cualquier Mapper.

Si observamos los logs generados tras la ejecución de un Job del WordCount cuyo código he dejado publicado en este enlace.

Sin Combiner:

Map input records=1928
Map output records=187029
Combine input records=0
Combine output records=0
Reduce input records=187029
Reduce output records=22948


Con Combiner:

Map input records=1928
Map output records=187029
Combine input records=219738
Combine output records=55657
Reduce input records=22948
Reduce output records=22948   



Como he comentado anteriormente el Combiner se ejecuta a la salida del Mapper, antes de que los datos se transmitan por la red hacia el Reducer, la conclusión es que la cantidad de datos transmitida y por tanto el input del Reducer, es considerablemente menor si utilizamos ese Combiner.

Algo a tener en cuenta en el código publicado, es que he hecho un Combiner del WordCount, el código que contiene la clase WordCountCombiner.java es exactamente igual que el código de la clase WordCountReducer.java, en situaciones como esta no haría falta crear esa nueva clase, bastaría con definir en el Driver:

  job.setCombinerClass (WordCountReducer.class);

Así que creé una clase por separado para mostrar que es posible hacer el Combiner en una clase distinta (extendiendo de Reducer), recordando que no debe haber "código sensible" en ella ya que el Combiner puede llegar a ejecutarse varias veces o puede llegar a no ejecutarse.

domingo, 28 de abril de 2013

Speculative Execution

La meta del modelo MapReduce es dividir trabajos en tareas más pequeñas y ejecutarlas en paralelo para que el tiempo de ejecución total del trabajo sea menor que si se ejecutaran de forma secuencial.

Muchas veces, para grandes cantidades de datos, el número de divisiones es mayor que el número de nodos en el cluster, así que aunque las tareas se van ejecutan de forma paralela, otras tareas tienen que esperar a que las primeras finalicen para poder continuar.

Si por alguna causa una de las tareas se está ejecutando más lentamente de lo normal se podrían producir cuellos de botella y el tiempo dedicado para la ejecución total podría aumentar de forma considerable con respecto a si todo el clúster estuviera funcionando de manera correcta.

Hadoop no se va a encargar de buscar la causa de qué es lo que está retardando una tarea, sino que va a buscar soluciones para mitigar esto, y la Speculative Execution es la técnica que utiliza.

En el momento que JobTracker detecta que una tarea es más lenta de lo normal, va a lanzar una tarea especulativa (speculative task), pero sólo en el momento en el que el resto de las tareas del trabajo han finalizado, que la tarea haya estado funcionando al menos un minuto o que el tiempo medio de ejecución de esa tarea sea más alto que el del resto de tareas. Esto es, va a lanzar un duplicado de la tarea que está funcionando lentamente y las tareas se van a estar ejecutando al mismo tiempo.

Cuando una de las tareas finaliza su ejecución, el JobTracker se encargará de desechar el otro proceso (hace un kill de la tarea).

Si el defecto en la tarea se trata de un bug en el programa, éste va a suceder también en la tarea especulativa, así que tendría que ser el desarrollador quien corrigiera ese bug.

La Speculative Execution está activada por defecto, pero está permitido desactivarla en los ficheros de configuración.
En general es conveniente desactivar la speculative execution para las tareas Reduce porque, por un lado, siempre que se duplica una tarea Reduce ésta tiene que recuperar los datos intermedios de la red, lo que incrementaría el tráfico en la red.
Y por otro lado porque si la distribución de las key que reciben los Reducer no es equilibrada, es normal que una tarea Reduce tarde más que otra, por ejemplo, si pensamos en el WordCount, en inglés, la palabra "the" va a tener un array de valores mucho mayor que otras key, por lo que será normal que esta tarea Reduce tarde más.

Por el contrario, como regla general, se recomienda activar la Speculative Execution para las tareas Map. Una excepción podría ser para tareas que no son idempotentes




lunes, 22 de abril de 2013

Paso de Parámetros en Hadoop

Algo tan simple como un paso de parámetros entre el Driver y el Mapper o Reducer puede llevarnos a un error ya que como desarrolladores estamos acostumbrados a que pueda haber una cierta "comunicación" entre las clases.

En Hadoop esto no es correcto, y a estas alturas deberíamos tener claro que este tipo de aplicaciones se ejecuta de forma distribuída, que cada tarea se ejecuta en una JVM diferente e incluso en distintos nodos del clúster.

Voy a dejaros dos ejemplos, uno con la forma correcta y otro con la forma incorrecta por si queréis ver qué pasa, ya que forma la incorrecta no es que dé un error, el Job va a ejecutarse sin problemas, pero el parámetro no se pasará.

También hay que tener cuidado, ya que si se ejecuta en modo local, el paso de parámetros SÍ que funcionará. Sin embargo, no funcionará en el momento que llevemos nuestro programa al modo distribuído o pseudo-distribuído.

El ejemplo es muy simple y ni si quiera vamos a usar la función Reducer. A partir del fichero score.txt que ya he usado para otros ejemplos de la forma:

01-11-2012 Pepe Perez Gonzalez 21
01-11-2012 Ana Lopez Fernandez 14
15-02-2013 Angel Martin Hernandez 3
01-11-2012 Maria Garcia Martinez 11
01-11-2012 Pablo Sanchez Rodriguez 9
01-11-2012 Angel Martin Hernandez 3
15-01-2013 Pepe Perez Gonzalez 17
15-01-2013 Maria Garcia Martinez 3
...

Sólo queremos como salida la lista de personas con una puntuación mayor de 25, y ese valor lo queremos pasar desde el Driver.

Forma Incorrecta

 
public class PersonaScoreDriver {
 
 private static int score;
 
 public static void main(String[] args) throws Exception {
  Configuration conf = new Configuration();
  //Indicamos el parámetro a pasar
  
  score = 25;
    
  Job job = new Job(conf);
  job.setJarByClass(PersonaScoreDriver.class);
  job.setJobName("Persona Score");
  
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);

  FileInputFormat.setInputPaths(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  
  job.setInputFormatClass(KeyValueTextInputFormat.class);
  
  job.setMapperClass(MyMapper.class);
  job.setNumReduceTasks(0);

  boolean success = job.waitForCompletion(true);
  System.exit(success ? 0:1);  
 }
 
 private static class MyMapper extends Mapper{
  @Override
  public void map(Text key, Text value,
    Context context) throws IOException, InterruptedException {
   
   int s = score;
   String[] personaSplit = value.toString().split(" ");
   
   if(personaSplit.length == 4){
    int mScore = Integer.valueOf(personaSplit[3]);
    
    if(mScore >= s)
     context.write(key, value);
   }
  }
 }
}


Forma Correcta

 
public class PersonaScoreDriver {
 
 private static class MyMapper extends Mapper{
  private int score;
  
  @Override
  protected void setup(Context context) throws IOException,
    InterruptedException {
   Configuration conf = context.getConfiguration();
   //Indicas cuál es el id del parámetro a recoger y el valor
   // por defecto.
   score = conf.getInt("score", 0);
  }
  
  @Override
  public void map(Text key, Text value,
    Context context) throws IOException, InterruptedException {
   
   int s = score;
   String[] personaSplit = value.toString().split(" ");
   
   if(personaSplit.length == 4){
    int mScore = Integer.valueOf(personaSplit[3]);
    
    if(mScore >= s)
     context.write(key, value);
   }
  }
 }
 
 public static void main(String[] args) throws Exception {
  Configuration conf = new Configuration();
  //Indicamos el parámetro a pasar
  conf.setInt("score", 25);
    
  Job job = new Job(conf);
  job.setJarByClass(PersonaScoreDriver.class);
  job.setJobName("Persona Score");
  
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);

  FileInputFormat.setInputPaths(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  
  job.setInputFormatClass(KeyValueTextInputFormat.class);
  
  job.setMapperClass(MyMapper.class);
  job.setNumReduceTasks(0);

  boolean success = job.waitForCompletion(true);
  System.exit(success ? 0:1);  
 }
}


Hay que tener en cuenta que esta forma de pasar información entre el Driver y las tareas (map o reduce) es muy útil siempre cuando esta información no ocupa demasiada memoria ya que esta información se copia en varios sitios: en la memoria del Driver, en la memoria del TaskTracker y en la memoria de la JVM que ejecuta la tarea.
Por lo tanto, si se trata de una información grande (por ejemplo, varios TB), conviene usar otro método (por ejemplo: almacenar esta información en un fichero HDFS y acceder a esta información vía el método setup(), o bien usar Distributed Cache).

Recordad que este código también lo podréis descargar desde este enlace.

martes, 16 de abril de 2013

Ejemplo de Partitioner

En la entrada anterior vimos qué es el Partitioner, ahora toca ver un ejemplo y el código desarrollado para hacer ese ejemplo.

Atención si vais a hacer las pruebas en local ya que no funciona, el modo local sólo tiene un Reducer y por eso es mejor usar por lo menos el modo pseudo distribuido.

En este ejemplo, a partir del fichero scores.txt con la forma:

01-11-2012 Pepe Perez Gonzalez 21
01-11-2012 Ana Lopez Fernandez 14
15-02-2013 Angel Martin Hernandez 3
01-11-2012 Maria Garcia Martinez 11
01-11-2012 Pablo Sanchez Rodriguez 9
01-11-2012 Angel Martin Hernandez 3
15-01-2013 Pepe Perez Gonzalez 17
15-01-2013 Maria Garcia Martinez 3
...

Queremos dividir los datos por año y enviar a cada Reducer las personas que han jugado en ese año. A través de un Partitioner vamos a indicar a qué Reducer va cada registro.

El Driver es parecido a todo lo que hemos visto hasta ahora, lo único que hemos definido que el InputFormat será un KeyValueTextInputFormat (ya que la fecha está separado por una tabulación del resto de la línea, así que este formato reconocerá la entrada).
Y luego añadimos en las configuraciones nuestra clase Partitioner y el número de tareas Reduce que queremos (nuestro fichero sólo tiene datos de 2 años (2012 y 2013), entonces serán 2 tareas Reducer).


public class PersonaScoreDriver {
 public static void main(String[] args) throws Exception {
  Configuration conf = new Configuration();
  Job job = new Job(conf);
  job.setJarByClass(PersonaScoreDriver.class);
  
  job.setJobName("Persona Score");
  
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);

  FileInputFormat.setInputPaths(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  
  job.setInputFormatClass(KeyValueTextInputFormat.class);
  //Establecemos el número de tareas Reduce
  job.setNumReduceTasks(2);
  
  job.setMapperClass(PersonaScoreMapper.class);
  job.setReducerClass(PersonaScoreReducer.class);
  //Indicamos cuál es nuestro partitioner
  job.setPartitionerClass(PersonaScorePartitioner.class);

  boolean success = job.waitForCompletion(true);
  System.exit(success ? 0:1);  
 }
}

En el Mapper lo único que hacemos es sacar el nombre y apellidos del value y emite un par key/value enviando como key la fecha y como value la persona.

public class PersonaScoreMapper extends 
 Mapper<Text, Text, Text, Text> {
 
 Text persona = new Text();
 
 @Override
 public void map(Text key, Text values,
   Context context) throws IOException, InterruptedException {
  
  String[] personaSplit = values.toString().split(" ");
  StringBuilder persBuilder = new StringBuilder();
  // Puede haber personas con un apellido o con dos
  if(personaSplit.length == 3 || personaSplit.length == 4){
   if(personaSplit.length == 3){
    persBuilder.append(personaSplit[0]).append(" ")
     .append(personaSplit[1]);
   }else {
    persBuilder.append(personaSplit[0]).append(" ")
     .append(personaSplit[1]).append(" ")
     .append(personaSplit[2]);
   }
   persona.set(persBuilder.toString());
   context.write(key, persona);
  }
 }
}


La clase Reducer lo único que hace es recoger la key con su lista de values correspondientes, recorrer esa lista y emitir cada par key/value con la fecha y el nombre. Al haber realizado el Partiiioner, un mismo reducer procesará las keys de un mismo año.

public class PersonaScoreReducer extends 
 Reducer<Text, Text, Text, Text> {

 @Override
 public void reduce(Text key, Iterable<Text> values,
   Context context) throws IOException, InterruptedException {
  for (Text value : values) {
   context.write(key, value);
  }  
 }
}


Por último el Partitioner, que lo que hace es devolver un entero indicando cuál es el Reducer al que irán los datos intermedios generados por el Mapper.

public class PersonaScorePartitioner extends Partitioner<Text, Text> {
 
 @Override
 public int getPartition(Text key, Text value, int numPartitions) {
  
  if(key.toString().endsWith("2012")){
   return 0;
  }else{
   return 1;
  }
 }
}


Una vez visto el desarrollo y cómo quedaría el código sólo quedaría exportar las clases como jar (tal y como vimos en esta entrada) al directorio que tengamos preparado para la ejecución de Jobs en modo pseudo-distribuído y lo lanzaríamos (previamente habiendo puesto en HDFS el fichero scorePartMezcla).
También os recuerdo que las clases las podréis encontrar en la sección de Código Fuente

hadoop jar training/jars/EjemploPartitioner.jar PersonaScoreDriver pruebas/scorePartMezcla pruebas/resultados/ejemploPartitioner

Y este debería ser el resultado si listamos el contenido del directorio ejemploPartitioner:

elena:hadoop elena$ hadoop fs -ls pruebas/resultados/ejemploPartitioner
Found 4 items
-rw-r--r--   1 elena supergroup          0 2013-04-02 19:34 /user/elena/pruebas/resultados/ejemploPartitioner/_SUCCESS
drwxr-xr-x   - elena supergroup          0 2013-04-02 19:34 /user/elena/pruebas/resultados/ejemploPartitioner/_logs
-rw-r--r--   1 elena supergroup        562 2013-04-02 19:34 /user/elena/pruebas/resultados/ejemploPartitioner/part-r-00000
-rw-r--r--   1 elena supergroup        684 2013-04-02 19:34 /user/elena/pruebas/resultados/ejemploPartitioner/part-r-00001



Y en cada fichero quedaría el siguiente contenido.

En el part-r-00000 que correspondería al año 2012 y que le habíamos asignado el valor 0:


elena:hadoop elena$ hadoop-1.0.4/bin/hadoop fs -cat pruebas/resultados/ejemploPartitioner/part-r-00000
01-11-2012 Angel Martin Hernandez
01-11-2012 Maria Garcia Martinez
01-11-2012 Ana Lopez Fernandez
01-11-2012 Pablo Sanchez Rodriguez
01-11-2012 Pepe Perez Gonzalez
01-12-2012 Maria Garcia Martinez
01-12-2012 Pepe Perez Gonzalez
01-12-2012 Pablo Sanchez Rodriguez
01-12-2012 Ana Lopez Fernandez
15-11-2012 Pepe Perez Gonzalez
15-11-2012 Maria Garcia Martinez
15-11-2012 John Smith
15-11-2012 Cristina Ruiz Gomez
15-12-2012 John Smith
15-12-2012 Cristina Ruiz Gomez
15-12-2012 Maria Garcia Martinez
15-12-2012 Pepe Perez Gonzalez
15-12-2012 Angel Martin Hernandez




En el part-r-00001 que correspondería al año 2013 y que le habíamos asignado el valor 1:

elena:hadoop elena$ hadoop-1.0.4/bin/hadoop fs -cat pruebas/resultados/ejemploPartitioner/part-r-00001
01-01-2013 Ana Lopez Fernandez
01-01-2013 John Smith
01-01-2013 Pablo Sanchez Rodriguez
01-01-2013 Pepe Perez Gonzalez
01-01-2013 Maria Garcia Martinez
01-01-2013 Angel Martin Hernandez
01-02-2013 Ana Lopez Fernandez
01-02-2013 Cristina Ruiz Gomez
01-02-2013 Maria Garcia Martinez
01-02-2013 Pepe Perez Gonzalez
15-01-2013 Angel Martin Hernandez
15-01-2013 Maria Garcia Martinez
15-01-2013 Pepe Perez Gonzalez
15-01-2013 John Smith
15-01-2013 Pablo Sanchez Rodriguez
15-02-2013 Pepe Perez Gonzalez
15-02-2013 John Smith
15-02-2013 Pablo Sanchez Rodriguez
15-02-2013 Maria Garcia Martinez
15-02-2013 Ana Lopez Fernandez
15-02-2013 Cristina Ruiz Gomez
15-02-2013 Angel Martin Hernandez


sábado, 13 de abril de 2013

Componentes de Hadoop: Partitioner

El partitioner te permite que las Key del mismo valor vayan al mismo Reducer, es decir, divide y distribuye el espacio de claves.

Hadoop tiene un Partitioner por defecto, el HashPartitioner, que a través de su método hashCode() determina a qué partición pertenece una determinada Key y por tanto a qué Reducer va a ser enviado el registro. El número de particiones es igual al número de tareas reduce del job.

A veces, por ciertas razones necesitamos implementar nuestro propio Partitioner para controlar que una serie de Keys vayan al mismo Reducer, es por esta razón que crearíamos nuestra propia clase MyPartitioner que heredaría de la interfaz Partitioner y que implementará el método getPartition.


public class  MyPartitioner<K2, V2> 
    extends Partitioner<KEY, VALUE> implements Configurable {
      public int getPartition(KEY key, VALUE value,  int  numPartitions){}  
}


getPartition recibe una key, un valor y el número de particiones en el que se deben dividir los datos cuyo rango está entre 0 y "numPartitions -1" y devolverá un número entre 0 y numPartitions indicando a qué partición pertenecen esos datos recibidos.

Para configurar un partitioner, basta con añadir en el Driver la línea:

job.setPartitionerClass(MyPartitioner.class);


También nos tenemos que acordar de configurar el número de Reducer al número de particiones que vamos a realizar:

job.setNumReduceTasks(numPartitions);


En la entrada siguiente mostraré un ejemplo concreto del Partitioner.

sábado, 6 de abril de 2013

Output Formats

Los Output Formats son muy parecidos a los tipos vistos en la entrada anterior de los Input Formats.
Pero esta vez la interfaz OutputFormat va a determinar cómo será la salida del Job que vamos a ejecutar.
Para establecer el output format se configura en el Driver a través de:

job.setOutputFormatClass(Tipo.class);

La clase base de salida en Hadoop es FileOutputFormat (que hereda de OutputFormat), y a partir de aquí existen diferentes tipos para poder implementar esa salida, estos son algunos:
  • TextOutputFormat
  • SequenceFileOutputFormat
    • SequenceFileAsBinaryOutputFormat
  • MultipleOutputFormat
El tipo por defecto de salida es el TextOutputFormat, que escribe cada registro (un par key/value) en líneas de texto separadas, y los tipos de los pares key/value pueden ser de cualquier tipo, siempre y cuando implementen el método toString().

También podría ser posible eliminar la key o el value de la salida a través del tipo NullWritable o los dos, que sería mejor definir la salida del Job con el tipo NullOutputFormat.
Si queremos que la salida sea nula, en el Driver definiríamos:
job.setOutputFormatClass(NullOutputFormat.class);

Si sólo queremos eliminar la key o el value se haría con:
job.setOutputKeyClass(NullWritable.class);


o con:
job.setOutputValueClass(NullWritable.class);



Como en los Input Formats, el output también dispone de salidas binarias como el SequenceFileOutputFormat, que como indica, escribe ficheros de tipo Sequence Files (ficheros binarios en los que se escriben pares key/value) y su subclase SequenceFileAsBinaryOutputFormat, que escribe sequence files en los que las key y values están codificados en binario.

En los tipos FileOutputFormat, por defecto se escribe un fichero por cada reducer, que normalmente está predeterminado a uno, pero se puede cambiar ese número de reducers. El nombre de cada fichero es de la forma part-r-00000, part-r-00001..., siendo la última parte el número de reducer. Así que una de las formas de dividir las salidas puede ser usando varios reducers, pero esta no es la solución que nos interesa ver aquí.
La solución sería utilizando la clase MultipleOutputs, se crearía un objeto de este tipo en el reducer y en vez de llamar al write del Context, se haría al write del MultipleOutputs.
También la ventaja de este tipo de Output es que puedes definir el nombre que deseas darle al fichero name-r-00000

Ejemplo de MultipleOutputs:

A partir de nuestro fichero score.txt queremos un programa que separe a los jugadores y sus puntuaciones por fecha agrupados en ficheros separados.
Recordamos que el fichero es de este tipo
 
01-11-2012 Pepe Perez Gonzalez 21
01-11-2012 Ana Lopez Fernandez 14
15-11-2012 John Smith 13
01-12-2012 Pepe Perez Gonzalez 25
...


El Driver es como lo configuramos normalmente, no tiene ninguna configuración especial para hacer el MultipleOutput:
 
public class TestMultipleOutputDriver {
 public static void main(String[] args) throws Exception {
  
  Configuration conf = new Configuration();
  Job job = new Job(conf);
  job.setJarByClass(TestMultipleOutputDriver.class);
  
  job.setJobName("Word Count");
  
  job.setMapperClass(TestMultipleOutputMapper.class);
  job.setReducerClass(TestMultipleOutputReducer.class);
  
  job.setInputFormatClass(KeyValueTextInputFormat.class);

  FileInputFormat.setInputPaths(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);

  boolean success = job.waitForCompletion(true);
  System.exit(success ? 0:1); 
 }
}



En el Mapper lo único que hacemos es emitir el par key/value que recibimos, ya que no estamos haciendo ningún tratamiento de los datos como tal para este ejemplo.
 
public class TestMultipleOutputMapper extends Mapper<Text, Text, Text, Text> {
 public void map(Text key, Text values, Context context) 
   throws IOException, InterruptedException{
  
  context.write(key, values);
 }
}


En el Reducer creamos un objeto de tipo MultipleOutputs que vamos a inicializar en el método setup y es el que vamos a utilizar para escribir la salida.
name será el prefijo del fichero.
 
public class TestMultipleOutputReducer extends Reducer<Text, Text, Text, Text> {

 private MultipleOutputs<Text, Text> multipleOut;
 
 @Override
 protected void cleanup(Context context) throws IOException,
   InterruptedException {
  multipleOut.close();
 }
 @Override
 protected void setup(Context context) throws IOException,
   InterruptedException {
  multipleOut = new MultipleOutputs<Text, Text>(context);
 }
 @Override
 public void reduce(Text key, Iterable<Text> values, 
           Context context) 
     throws IOException, InterruptedException {
  String name = key.toString().replace("-", "");
  for(Text value:values){
   multipleOut.write( key, value, name);
  //Podría añadir más salidas según mis necesidades
  // a través de cláusulas if, o porque un par key/value
  // traiga diversas informaciones que quiero subdividir
  // en diferentes ficheros
  // if(caso1) multipleOut.write( key, value, name2);
  // multipleOut.write( key, value, name3);
  }
 }
}


En este ejemplo con el MultipleOutputs te obliga que aunque quieras que las salidas sean en distintos ficheros, los pares key/value que emites sean todos del mismo tipo del que has definido la clase MultipleOutputs<Text, Text>, es decir, la key debe ser de tipo Text, y el valor también debe ser de tipo Text.
También es posible emitir múltiples salidas en ficheros diferentes y que cada salida sea con tipos distintos para cada fichero.

En el siguiente ejemplo recibo como entrada un fichero con un listado de papers, en los que cada línea contiene la publicación del paper, los autores y el título del paper de la forma:

paper-id:::author1::author2::...::authorN:::title

journals/cl/SantoNR90:::Michele Di Santo::Libero Nigro::Wilma Russo:::Programmer-Defined Control Abstractions in Modula-2.



Quiero 3 ficheros en la salida de este algoritmo:
- Un fichero paper que contenga: String paper-id, String Title
- Un fichero autor que contenga: Int autor-id (se crea en el algoritmo), String nombre autor
- Un fichero paper/autor que los relacione: String paper-id, Int autor-id

Como vemos necesitamos que una salida sea <Text, Text>, otra salida sea <IntWritable, Text> y la última salida sea <Text, IntWritable>.
Esto se haría añadiendo en el Driver las siguientes líneas, donde se asigna un ID a la salida y de qué tipos son esas salidas:
(Podréis encontrar el código fuente completo en este enlace)
 
MultipleOutputs.addNamedOutput(job, "Autor",  TextOutputFormat.class, 
  IntWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, "Paper",  TextOutputFormat.class, 
  Text.class, Text.class);
MultipleOutputs.addNamedOutput(job, "PaperAutor",  TextOutputFormat.class, 
  Text.class, IntWritable.class);


Posteriormente, en el Reducer se haría de la forma:

 
for (PaperWritable value : values) {
  
   //Output tabla Autor
   multipleOut.write("Autor", new IntWritable(contador), key);
   
   //Output tabla Paper
   multipleOut.write("Paper", value.getIdPaper(), 
    value.getTituloPaper());
   
   //Output tabla paper/autor
   multipleOut.write("PaperAutor", value.getIdPaper(), 
    new IntWritable(contador));
   
   contador ++;
  }



martes, 2 de abril de 2013

Input Formats

Los InputFormat son los formatos que definen los tipos de datos de entrada que recibe las función map en un programa MapReduce.

La clase base en la que están basados los InputFormat es FileInputFormat, que provee la definición de qué ficheros se incluyen como input en un Job y una implementación para dividir en partes los ficheros de entrada.

Los InputFormat pueden clasificar según el tipo de datos que van a recibir en:
  • Texto
  • Binarios
  • Múltiples
  • Databases

(Podréis encontrar los ejemplos con el código completo en la pestaña de Código Fuente)

InputFormat de tipo texto:

Hadoop se destaca por su capacidad de procesar ficheros de texto no estructurado y dispone de varios tipos según cómo están constituídos los ficheros de datos.

TextInputFormat

Es el formato por defecto de MapReduce, si no se indica nada en el Driver a la hora de programar, será el tipo que considera.

Cada registro es una línea de la entrada, la key será de tipo LongWritable indicando el offset de la línea (el offset es el número de bytes, no el número de la línea) y el value será el contenido de la línea, una cadena de tipo Text.
En este tipo de ficheros la key no suele tener ninguna utilidad a la hora de desarrollar los algoritmos.

Como ejemplo de este formato podemos ver el ejercicio WordCount publicado anteriormente, que como digo, en el Driver no se indica el formato porque toma el TextInputFormat por defecto.

KeyValueTextInputFormat

Muchas veces la línea de texto que recibimos a la entrada suele contener el par key/value que nos servirá para el algoritmo separados por un separador, normalmente una tabulación. Así que, el recibir como key el offset no nos es de ninguna utilidad.

Este tipo nos va a ayudar a recibir como key la primera parte y como value el resto de la línea después de la tabulación.

Como ejemplo tomamos de nuevo el ejercicio WordCount ya publicado. El programa va a recibir un fichero con este formato
 
01-11-2012 Pepe Perez Gonzalez 21
01-11-2012 Ana Lopez Fernandez 14

En el que la fecha y el nombre están separados por una tabulación. En principio vamos a hacer algo simple, que es contar cuántas veces aparece cada fecha.

Esta vez en el Driver añadimos esta línea:
 
job.setInputFormatClass(KeyValueTextInputFormat.class);

La clase Mapper será:
 
public class TestKeyValueMapper 
     extends Mapper<Text, Text, Text, IntWritable> {
 private final static IntWritable cuenta = new IntWritable(1);
 public void map(Text key, Text values, Context context) 
   throws IOException, InterruptedException{
   context.write(key, cuenta);
 }
}

La clase Reducer del WordCount se puede quedar como está.


NLineInputFormat

En todos los casos anteriores y como ya hemos visto, el tamaño de un InputSplit corresponde al tamaño de un bloque HDFS y por lo tanto puede contenter un número indefinido de líneas de entrada.

El formato NLineInputFormat permite definir InputSplits con un número determinado de líneas de entrada (si no se indica nada, por defecto está a 1).

Con este formato podemos definir que en la división se envíe más de un par key/value y que nuestro Mapper reciba N pares key/value.
Los grupos de líneas que recibe el Mapper tendrán un formato par key/value de la forma TextInputFormat, es decir, con el offset como key y el resto de la línea como value.

Hay que tener cuidado, no quiere decir que se reciban 2 líneas en la función map, si no que la misma función map se va a ejecutar N veces recibiendo ese número de registros par key/value.


En el Driver añadimos esta línea:
 
job.setInputFormatClass(NLineInputFormat.class);
//Definimos el número de pares key/value
NLineInputFormat.setNumLinesPerSplit(job, 3);

La clase Mapper, como ya he comentado, estará declarado con el formato "estándar" (TextInputFormat), pero podemos operar en la función de tal forma que sabemos que va a ser llamada N veces, podemos concatenar textos, hacer operaciones, etc con los N pares que va a recibir.

Así que si, por ejemplo, hemos puesto nuestro número de líneas a 2 y tenemos esta entrada:
 
01-11-2012 Pepe Perez Gonzalez 21 
01-11-2012 Ana Lopez Fernandez 14
01-11-2012 Maria Garcia Martinez 11 
01-11-2012 Pablo Sanchez Rodriguez 9

Podemos hacer que la salida sea la concatenación de las entradas quedando:

01-11-2012 Pepe Perez Gonzalez 21 + 01-11-2012 Ana Lopez Fernandez 14
01-11-2012 Maria Garcia Martinez 11 + 01-11-2012 Pablo Sanchez Rodriguez 9


InputFormat de tipo binario:

Aunque MapReduce se destaca por su tratamiento de textos no estructurados, no es exclusivo a este tipo de ficheros, también es capaz de tratar ficheros binarios.

SequenceFileInputFormat

Los sequence files de Hadoop almacenan secuencias de datos binarios en forma de pares key/value. Son splittable (con sus puntos de sincronización sync), soportan compresión y se pueden almacenar múltiples tipos de datos.

Este formato de entrada, las key y value están determinados por el mismo sequence file y al desarrollar el programa hay que asegurarse que escogemos los tipos que corresponden.

Para probar un ejemplo sería conveniente haber leído el artículo de Secuences Files y haber hecho el ejercicio Crear un SequenceFile y así disponer de un fichero de este tipo en nuestro HDFS.
Así que deberíamos disponer de un fichero en HDFS pruebas/poemasequencefile en el que la key es un número (número de línea) y el value es una línea de texto (los versos del poema).

En el Driver se declara el input format:
 
job.setInputFormatClass(SequenceFileInputFormat.class);

Y las declaraciones del Mapper será con los tipos del key/value que sabemos que tiene el fichero (entero y texto) y lo único que haremos será emitir estos pares key/value:
 
public class TestSeqFileMapper 
      extends Mapper<IntWritable, Text, IntWritable, Text> {
 public void map(IntWritable key, Text values, Context context) 
   throws IOException, InterruptedException{
   context.write(key, values);
 }
}

En este ejemplo no he considerado la tarea reducer poniéndola a 0.

SequenceFileAsTextInputFormat

Es una variante del anterior que convierte los key/value en objetos de tipo Text.

Estableciendo el tipo en el Driver:
 
job.setInputFormatClass(SequenceFileAsTextInputFormat.class);

Y simplemente, el Mapper se declara poniendo los objetos de tipo Text:
 
public class TestSeqFileAsTextMapper 
      extends Mapper<Text, Text, Text, Text> {
 public void map(Text key, Text values, Context context) 
   throws IOException, InterruptedException{
   context.write(key, values);
 }
}


SequenceFileAsBinaryInputFormat

Otra variante más del SequenceFileInputFormat que recupera las key y los value en forma de objetos binarios (BytesWritable).

Y en el Driver lo configuramos:
 
job.setInputFormatClass(SequenceFileAsBinaryInputFormat.class);

Y esta vez el Mapper se declara poniendo los objetos de tipo BytesWritable:
public class TestSeqFileAsBinaryMapper 
      extends Mapper<BytesWritable, BytesWritable, Text, Text> {

 public void map(BytesWritable key,BytesWritable values,Context context)
   throws IOException, InterruptedException{
  ...
 }
}



InputFormat Múltiple:


MultipleInputFormat

Este tipo de Inputs sirven para cuando necesitas que haya diferentes fuentes de datos, e incluso que cada una de estos Input sea de tipo distinto.
Como es lógico, si cada entrada es de un formato diferente, va a necesitar una tarea Map distinta para cada uno de ellos, así que al declarar el formato MultipleInputFormat vas a poder definir para cada fichero de entrada, de qué tipo es y a qué Mapper se debe dirigir.

MultipleInputFormat puede ser muy útil cuando necesitas unificar información cuyo origen y formato es diferente.

Para utilizar este formato, en el Driver sobrarían las líneas:
 
job.setMapperClass(MiMapper.class);
FileInputFormat.setInputPaths(job, new Path("path"));

y añadiríamos las líneas:

 
MultipleInputs.addInputPath(job, new Path("path1_2"), 
      KeyValueTextInputFormat.class, TestKeyValueMapper.class);
MultipleInputs.addInputPath(job, new Path("path_2"), 
      SequenceFileInputFormat.class, TestSeqFileMapper.class);



viernes, 22 de marzo de 2013

Sequence Files

Los SequenceFiles son unos tipos de ficheros de datos propios de Hadoop almacenados en forma de pares key/value y codificados de forma binaria.
Proporcionan una estructura de datos persistente para pares key/value de forma binaria.

En el mismo fichero se almacenan los metadatos que contienen la información de ese fichero (tipo de los datos, nombre, fecha,...).

Este tipo de ficheros se usa muy a menudo en Jobs MapReduce, sobre todo cuando la salida de un Job es la entrada de otro.
También, podemos imaginarlos como ficheros de log donde cada registro es una nueva línea.

Son un tipo de fichero muy adecuados para MapReduce porque además son splittable (que se pueden fragmentar).
Pueden almacenar distintos tipos de datos gracias a que usan una gran variedad de frameworks de serialización.

Soportan compresión, que puede ser de 3 tipos:
  • Uncompressed (No comprimido)
  • Record Compressed (Compresión a nivel de cada par key/value)
  • Block Compressed (Se comprime por bloques)
Sea cual sea el tipo de compresión que utilice, la estructura del encabezado (header) va a ser el mismo, sólo que éste contendrá la información necesaria para su posterior lectura.

Problemas: 
  • Sólo se puede acceder a ellos a través de la AP Java de Hadoop.
  • Si la definición de la key o value cambian, el fichero no se podrá leer.

Para entender de forma visual cómo es la estructura de un SequenceFile, empezamos viendo cómo sería el Header:

Estructura del Header

Esta sería la estructura de un SequenceFile Uncompressed o Record Compressed:

Estructura de un SequenceFile

Así sería la estructura de un registro cuando no está comprimido (Uncompressed):

Registro no comprimido

Y así sería cuando está comprimido por registro (Record Compressed):

Estructura de la compresión por registro


Aunque es la misma estructura de SequenceFile, no pueden existir formatos de compresión distintos dentro de un mismo fichero.

A continuación es el formato de un SequenceFile Block Compressed:

Estructura de un SequenceFile comprimido por bloques


Y por último el formato del bloque:

Estructura del bloque comprimido




Una propiedad de los SequenceFiles es que en la creación introducen puntos de sincronización (sync points).
Estos puntos se pueden utilizar cuando el reader se pierde, por ejemplo, si nos hemos desplazado a buscar en una posición cualquiera en nuestro flujo de datos. Aún más importante, estos puntos de sincronización sirven para definir los InputSplit de los Jobs MapReduce.
Estos sync points se crean automáticamente introduciéndolos cada cierto número de registros cuando realizamos el SequenceFile.Writer.
Se pueden localizar durante la lectura del SequenceFile con el SecuenceFile.Reader a través de:

      reader.syncSeen();

Otra opción que nos dan los SequenceFiles es, como hemos dicho antes, buscar una posición dada en este tipo de ficheros.
Si sabemos la posición exacta a la que queremos acceder, utilizaríamos el método:

      reader.seek(posicion);
      reader.next(key, value);


Si ponemos una posición que no existe daría un error IOException.


Si no conocemos la posición exacta, podemos utilizar:

      reader.sync (posicion)

El reader se posicionará en el siguiente sync point que encuentre después de posicion.
Puede ser el caso que el fichero sea muy pequeño y no exista ningún sync point, lo que sucederá entonces es que el reader se posicionará al final del fichero.

domingo, 17 de marzo de 2013

Ejemplo de uso de Tipos de Datos propios con las interfaces Writable y WritableComparable

Continuando con la entrada anterior en la que explicaba qué son las interfaces Writable y WritableComparable y cómo es posible crear nuestros propios tipos usándolas, vamos a ver un ejemplo.

El código fuente y el fichero de ejemplo de esta entrada también los podréis encontrar en este enlace.

Ejemplo de aplicación MapReduce utilizando nuestra propia clase como key.
Al programar nuestra clase PersonaWritableComparable que implementa WritableComparable, en la fase del Shuffle and Sort se consigue que el Reducer reciba las key ordenadas y con sus valores correspondientes agrupados para poder operar con ellos.

Recibimos un fichero de texto cuya información es
Fecha [tab] Nombre Apellido1 Apellido2 Puntuación
Queremos como salida un listado de personas (con los nombres y apellidos) y la suma de todas sus puntuaciones.

Fichero de entrada score.txt:

01-11-2012 Pepe Perez Gonzalez 21
01-11-2012 Ana Lopez Fernandez 14
01-11-2012 Maria Garcia Martinez 11
01-11-2012 Pablo Sanchez Rodriguez 9
01-11-2012 Angel Martin Hernandez 3
15-11-2012 Pepe Perez Gonzalez 22
15-11-2012 Maria Garcia Martinez 15
15-11-2012 John Smith 13
01-12-2012 Pepe Perez Gonzalez 25
01-12-2012 Ana Lopez Fernandez 15
01-12-2012 Pablo Sanchez Rodriguez 8
01-12-2012 Maria Garcia Martinez 32
15-12-2012 Maria Garcia Martinez 47
15-12-2012 Pepe Perez Gonzalez 13
15-12-2012 Angel Martin Hernandez 13
15-12-2012 John Smith 27
01-01-2013 Ana Lopez Fernandez 5
01-01-2013 Pablo Sanchez Rodriguez 2
01-01-2013 Pepe Perez Gonzalez 17
01-01-2013 Maria Garcia Martinez 3
01-01-2013 Angel Martin Hernandez 32
01-01-2013 John Smith 21


Nuestra propia clase PersonaWritableComparable:
 
public class PersonaWritableComparable 
  implements WritableComparable<PersonaWritableComparable>{

 Text nombre, primerApellido, segundoApellido;
 
 public void set(String nom, String prApell, String sgApell){
  nombre.set(nom);
  primerApellido.set(prApell);
  segundoApellido.set(sgApell);
 }
 
 public PersonaWritableComparable() {
  this.nombre = new Text();
  this.primerApellido = new Text();
  this.segundoApellido = new Text();
 }

 public PersonaWritableComparable(Text nombre, 
   Text primerApellido, Text segundoApellido) {
  this.nombre = nombre;
  this.primerApellido = primerApellido;
  this.segundoApellido = segundoApellido;
 }
 
 @Override
 public void readFields(DataInput arg0) throws IOException {
  this.nombre.readFields(arg0);
  this.primerApellido.readFields(arg0);
  this.segundoApellido.readFields(arg0);
  
 }

 @Override
 public void write(DataOutput arg0) throws IOException {
  this.nombre.write(arg0);
  this.primerApellido.write(arg0);
  this.segundoApellido.write(arg0);
 }

 @Override
 public int compareTo(PersonaWritableComparable o) {
  if(this.nombre.compareTo(o.nombre) != 0){
   return this.nombre.compareTo(o.nombre);
  }else if(this.primerApellido.compareTo(o.primerApellido) != 0){
   return this.primerApellido.compareTo(o.primerApellido);
  }else if(this.segundoApellido.compareTo(o.segundoApellido) != 0){
   return this.segundoApellido.compareTo(o.segundoApellido);
  }
  return 0;
 }

 @Override
 public boolean equals(Object obj) {
  if(obj instanceof PersonaWritableComparable){
   PersonaWritableComparable p = (PersonaWritableComparable) obj;
   return this.nombre.equals(p.nombre) && 
    this.primerApellido.equals(p.primerApellido) && 
    this.segundoApellido.equals(p.segundoApellido);
  }
  return false;
 }

 @Override
 public int hashCode() {
  return this.nombre.hashCode()*163 + 
    this.primerApellido.hashCode()*163 + 
    this.segundoApellido.hashCode()*163;
 }
 
 @Override
 public String toString() {
  return nombre.toString()+" "+primerApellido.toString()+" "
   +segundoApellido.toString();
 }
}


El Driver de la aplicación:
 
public class PersonaScoreDriver {
 public static void main(String[] args) throws Exception {
  if(args.length != 2){
   System.out.println("Ha ocurrido un error en la entrada");
   System.exit(-1);
  }
  
  Configuration conf = new Configuration();
  Job job = new Job(conf);
  job.setJarByClass(PersonaScoreDriver.class);
  
  job.setJobName("Persona Score");
  
  job.setOutputKeyClass(PersonaWritableComparable.class);
  job.setOutputValueClass(IntWritable.class);

  FileInputFormat.setInputPaths(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  
  job.setMapperClass(PersonaScoreMapper.class);
  job.setReducerClass(PersonaScoreReducer.class);

  boolean success = job.waitForCompletion(true);
  System.exit(success ? 0:1);  
 }
}


La clase Mapper:
 
public class PersonaScoreMapper extends 
 Mapper<LongWritable, Text, 
 PersonaWritableComparable, IntWritable> {

 private IntWritable score = new IntWritable();
 PersonaWritableComparable persona = new PersonaWritableComparable();
 
 public void map(LongWritable key, Text values,
   Context context) throws IOException, InterruptedException {
  
  // El texto tiene este formato:
  // 01-11-2012 Maria Garcia Martinez 11
  // La fecha separada por tabulación, el resto con espacios
  String[] primerSplit = values.toString().split(" ");
  if(primerSplit.length == 2){
   String[] segundoSplit = primerSplit[1].split(" ");
   
   // Puede haber personas con un apellido o con dos
   if(segundoSplit.length == 3 || segundoSplit.length == 4){
    if(segundoSplit.length == 3){
     persona.set(segundoSplit[0], segundoSplit[1], "");
     score.set(Integer.valueOf(segundoSplit[2]));
    }else {
     persona.set(segundoSplit[0], segundoSplit[1], segundoSplit[2]);
     score.set(Integer.valueOf(segundoSplit[3]));
    }
    context.write(persona, score);
   } 
  }
 }
}


La clase Reducer
 
public class PersonaScoreReducer extends 
 Reducer<PersonaWritableComparable, IntWritable, 
 PersonaWritableComparable, IntWritable> {

 public void reduce(PersonaWritableComparable key, 
   Iterable<IntWritable> values,
   Context context) throws IOException, InterruptedException {
  
  int suma = 0;
  for (IntWritable value : values) {
   suma += value.get();
  }
  
  context.write(key, new IntWritable(suma));
 }
}

Ver también: Tipos de datos Hadoop e interfaces Writable y WritableComparable

viernes, 15 de marzo de 2013

Tipos de datos Hadoop e Interfaces Writable y WritableComparable

Writable y WritableComparable son las interfaces de Hadoop que se utilizan para la serialización.

Hadoop define sus propios tipos de objetos a partir de los tipos primitivos de Java. Todos ellos heredan de la interfaz WritableComparable, que a su vez hereda de la interfaz Writable y que permiten la Serialización (conversión de los datos en bytes para poder ser transmitidos por red o para su escritura en almacenes persistentes).

Algunos de los tipos son:
  • IntWritable                   ints
  • Text                             strings
  • DoubleWritable           doubles
  • FloatWritable               floats
  • LongWritable               longs
  • ByteWritable                bytes
  • NullWritable

Jerarquía de Clases (Hadoop the Definitive Guide)

De los tipos vistos, aunque son propios de Hadoop, nos son más o menos conocidos, todos excepto quizas el NullWritable. Este tipo de objeto se utiliza para cuando queremos eliminar la key o el value y que Hadoop lo pueda reconocer.
Por ejemplo, si en una salida de un Job MapReduce es de tipo <Text, Text> y tenemos la key siempre a nulo, entonces el fichero de salida tendrá un formato similar a:
[separador] mivalor1
[separador] mivalor2
...
siendo [separador] el carácter de separación (por defecto una tabulación).

Y si la key la definiésemos como NullWritable, la salida sería con este formato:
mivalor1
mivalor2
...
Lo cual nos da un fichero de salida más compacto y más apropiado para usarlo como entrada de otro Job MapReduce.



Durante los desarrollos de aplicaciones Hadoop, en algún momento tendremos la necesidad de crear nuestros propios objetos.
Por ejemplo, imaginemos que queremos como objeto los datos de una persona (nombre y los dos apellidos), sería fácil crear un objeto de tipo Text:

   Text persona = new Text (nombre+" "+primerApellido+" "+segundoApellido);

Como veis, separamos los datos con un espacio, y luego al hacer:

   String[] listaPersonas = persona.toString().split(" ");

Podría ser un problema si uno de los apellidos fuera compuesto y contuviera espacios.
Así que lo mejor sería crear tu propio objeto.

Es muy importante saber que las keys deben implementar siempre la interfaz WritableComparable, y los values la interfaz Writable.

La interfaz Writable

Contiene los métodos de serialización y deserialización readFields y write:
 
public interfaz Writable{
    void readFields(DataInput in);
    void write(DataOutput out);
}

Ejemplo:
 
public class PersonaWritable implements Writable {

 Text nombre, primerApellido, segundoApellido;
 
 public PersonaWritable() {
  this.nombre = new Text();
  this.primerApellido = new Text();
  this.segundoApellido = new Text();
 }

 public PersonaWritable(Text nombre, Text primerApellido,
   Text segundoApellido) {
  this.nombre = nombre;
  this.primerApellido = primerApellido;
  this.segundoApellido = segundoApellido;
 }

 @Override
 public void readFields(DataInput arg0) throws IOException {
  this.nombre.readFields(arg0);
  this.primerApellido.readFields(arg0);
  this.segundoApellido.readFields(arg0);
 }

 @Override
 public void write(DataOutput arg0) throws IOException {
  this.nombre.write(arg0);
  this.primerApellido.write(arg0);
  this.segundoApellido.write(arg0);
 }
 
 // Implementar getters y setters
}



La interfaz WritableComparable

Además de contener los métodos de serialización y deserialización readFields y write, debe implementar los métodos compareTo, hashCode y equals, ya que extiende, además de la interfaz Writable, de la Comparable <   >.

Ejemplo:
 
public class PersonaWritableComparable 
  implements WritableComparable<PersonaWritableComparable>{

 Text nombre, primerApellido, segundoApellido;
 
 public PersonaWritableComparable() {
  this.nombre = new Text();
  this.primerApellido = new Text();
  this.segundoApellido = new Text();
 }

 public PersonaWritableComparable(Text nombre, 
   Text primerApellido, Text segundoApellido) {
  this.nombre = nombre;
  this.primerApellido = primerApellido;
  this.segundoApellido = segundoApellido;
 }
 
 @Override
 public void readFields(DataInput arg0) throws IOException {
  this.nombre.readFields(arg0);
  this.primerApellido.readFields(arg0);
  this.segundoApellido.readFields(arg0);
  
 }

 @Override
 public void write(DataOutput arg0) throws IOException {
  this.nombre.write(arg0);
  this.primerApellido.write(arg0);
  this.segundoApellido.write(arg0);
 }

 @Override
 public int compareTo(PersonaWritableComparable o) {
  if(this.nombre.compareTo(o.nombre) != 0){
   return this.nombre.compareTo(o.nombre);
  }else if(this.primerApellido.compareTo(o.primerApellido) != 0){
   return this.primerApellido.compareTo(o.primerApellido);
  }else if(this.segundoApellido.compareTo(o.segundoApellido) != 0){
   return this.segundoApellido.compareTo(o.segundoApellido);
  }
  return 0;
 }

 @Override
 public boolean equals(Object obj) {
  if(obj instanceof PersonaWritable){
   PersonaWritableComparable p = (PersonaWritableComparable) obj;
   return this.nombre == p.nombre && 
     this.primerApellido == p.primerApellido && 
     this.segundoApellido == p.segundoApellido;
  }
  return false;
 }

 @Override
 public int hashCode() {
  return this.nombre.hashCode()*163 + 
    this.primerApellido.hashCode()*163 + 
    this.segundoApellido.hashCode()*163;
 }

 // Implementar getters y setters
}


Si en algún momento quieres desarrollar un tipo propio y la salida (output) va a ser con este tipo, también hará falta implementar el método toString()

Si tuviéramos que serializar objetos binarios, se haría a través de arrays de bytes

El método write sería:

  1. Serializar el objeto en un array de bytes
  2. Escribir el número de bytes
  3. Escribir el array de bytes
Y el método readFields sería:
  1. Leer el número de bytes
  2. Crear un array de bytes de ese tamaño
  3. Leer el array
  4. Deserializar el objeto

Ver también: Ejemplo de uso de Tipos de Datos propios con las interfaces Writable y WritableComparable

lunes, 11 de marzo de 2013

Flujo de ejecución de un Job MapReduce

Cuando un cliente envía un trabajo, primero su información de configuración se empaqueta en un fichero xml, que junto con el fichero jar (que contiene el código del programa) y todo eso es gestionado por el JobTracker.

El JobTracker envía tareas individuales al TaskTracker, el cual, cuando recibe una petición de ejecutar una tarea,  instancia una nueva JVM separada para esta tarea.

Los datos intermedios se generan en el disco local del TaskTracker. Luego los datos se distribuyen por la red hacia los Reducers, que escribirán la salida en HDFS.

Una vez finalizado el Job, el TaskTracker borra los datos intermedios del disco local.


Así es como actuaría de forma genérica, ahora veremos cómo sería el flujo de un MapReduce con más detalle:

Flujo MapReduce

Cuando un cliente envía un Job, se está enviando unos datos de entrada, un Input, que MapReduce primero divide en trozos (normalmente del mismo tamaño que los bloques HDFS) llamados Input Splits, Hadoop ejecuta una tarea Task por cada split generado.
El InputSplit se hace de forma automática, quiero decir, es el InputFormat el que se encarga de crear los InputSplits y de dividirlos en registros.

La ventaja de dividir en esos trozos es que la cantidad de datos a tratar es mucho menor que si se enviara el input entero, entonces al tratarlo en sistema distribuído el tiempo total del proceso será mucho menor.
También la ventaja está en que si se dividen en el tamaño del bloque HDFS, la tarea map se va a ejecutar (la mayor parte de las veces) en el nodo donde se encuentran esos datos almacenados en HDFS, de esta forma se ahorra ancho de banda, ya que no estamos transmitiendo los datos a través de la red.

Hay que tener en cuenta que el InputSplit no contiene los datos como tal, sino una referencia a los datos.

A continuación el TaskTracker pasa el split al RecordReader, que no es más que un iterador sobre los registros de ese split y es entonces cuando la tarea map trata cada registro de tipo par key/value y generando una salida.

Los datos intermedios se almacenan en el disco local, y no en HDFS porque esto supondría realizar el proceso de replicación y eso sería excesivo.

Después, los datos se enviarían al nodo donde la tarea reduce se está ejecutando. Por defecto hay una sola tarea reduce, pero podría haber más. Así que si disponemos de varios reducer, hay una función partition que se encarga de particionar la salida del map  (se suele dividir para que cada key con sus valores vayan al mismo reducer) y enviar cada trozo a un reducer. Hay un partitioner por defecto, pero podríamos crear el nuestro propio que ya veríamos en otro artículo.

También sería posible tener el programa configurado con cero tareas reducer.

El Reduce no dispone del concepto de localización de los datos

La salida del reducer se va guardando en un RecordWriter, que posteriormente va a pasar a generar el output que se almacena en HDFS para aportar fiabilidad. Por cada bloque del output, la primera réplica se almacena en el nodo local, las otras réplicas en el resto de nodos.





jueves, 28 de febrero de 2013

Hadoop: Introducción al Desarrollo en Java (Parte II): El Mapper (Ejemplo Word Count)

El Mapper implementa el método map, es la parte del programa que se va a ejecutar en el lugar en el que se encuentran los bloques de datos, hará las operaciones necesarias con ellos, seleccionará sólo los datos que nos interesan y los emitirá como datos intermedios antes de que se sigan procesando
 
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

// Esta clase tiene que extender de la clase Mapper.
// Espera 4 tipos de datos: los 2 primeros definen 
// los tipos del key/value de entrada y los 2 últimos 
// definen los tipos del key/value de salida.
public class WordCountMapper extends 
 Mapper <LongWritable, Text, Text, IntWritable> {

// Una buena práctica es la reutilización de objetos. 
// Cuando necesitamos utilizar constantes, crear una 
// variable estática fuera del map.
// De esta forma, cada vez que el método map se llama,
// no se creará una nueva instancia de ese tipo.
 private final static IntWritable cuenta = new IntWritable(1);
 private Text palabra = new Text();

// La función que obligatoriamente tiene que 
// implementarse en el Mapper es la map, que va
// a recibir como parámetros: primero el tipo de
// la key, luego el tipo del value y finalmente un
// objeto Context que se usará para escribir los 
// datos intermedios
 public void map(LongWritable key, Text values, Context context) 
   throws IOException, InterruptedException{
// En el objeto "values" estamos recibiendo cada
// línea del fichero que estamos leyendo. Primero
// tenemos que pasarlo a String para poder 
// operar con él
  String linea = values.toString();
  
// Cada línea va a contener palabras separadas por
// "un separador", separador que se considera como
// una expresión regular y a partir del cual dividimos
// la línea. Vamos recorriendo elemento a elemento. 
  for(String word : linea.split(" ")){
   if (word.length() > 0){
//  Le damos el valor a nuestro objeto creado para la
//  reutilización (claramente, a 'palabra', ya que 
//  'cuenta' es una constante final static).
//  Con el write escribimos los datos intermedios, que
//  son como key la palabra y como valor un 1.
    palabra.set(word);
    context.write(palabra, cuenta);
   }
  }
 }
}

Ahora os muestro el código con la old API que vemos que también tiene algunas diferencias. Además, aprovecho para que os fijéis en la parte output.collect, ahí se está creando cada vez una nueva instancia al objeto IntWritable y mostrar que es bastante útil intentar reaprovechar los objetos:
 
import java.io.IOException;
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapred.MapReduceBase; 
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector; 
import org.apache.hadoop.mapred.Reporter;

public class WordCountMapper extends MapReduceBase implements 
   Mapper <LongWritable, Text, Text, IntWritable> {

    public void map(LongWritable key, Text value, 
     OutputCollector<Text, IntWritable> output, Reporter reporter) 
     throws IOException {

        String s = value.toString();
        for (String word : s.split("\\W+")) {
           if (word.length() > 0) {
              output.collect(new Text(word), new IntWritable(1));
           }
        }
    }
}


Las diferencias principales que observamos son:

  • En la new API la clase sólo extiende de Mapper, mientras que en la old API necesita extender de MapReduceBase e implementar Mapper.
  • La new API recibe 3 atributos: los 2 tipos del par key/value y el context. La old API recibía 4, los 2 tipos de la key/value, un OutputCollector que es donde se escribían los datos intermedios y un objeto Reporter que servía para devolver cierta información al Driver. En la new API este paso de información se puede hacer con el Context.

Ver también:


martes, 26 de febrero de 2013

Hadoop: Introducción al desarrollo en Java (Parte I)

En esta entrada voy a explicar una introducción al desarrollo de un programa MapReduce en Java.

Este blog lo estoy desarrollando a partir de la versión 1.0.4. Esta versión permite usar tanto la nueva API de Hadoop como la vieja API (me referiré a ellas como "new API" -releases 1.x- y "old API" -releases 0.20-x). En esta entrada la voy a aprovechar para mostrar las diferencias entre estas dos APIs pero las próximas entradas y desarrollos ya estarán hechas únicamente con la nueva API.

Para explicar una introducción al desarrollo de un programa MapReduce utilizaré el "Hola Mundo" de Hadoop, que es el "Word Count", básicamente se coge un fichero, se cuenta las veces que aparece una palabra y la salida será un listado de palabras (keys) con el número de veces que aparece (values).

Para desarrollar este tipo de programas necesitamos básicamente tres clases:
También antes de empezar a explicar código, hay que tener en cuenta cuáles son los tipos de datos que vamos a utilizar, ya que las clases hay que definirlas con los tipos de entrada y de salida.

En el WordCount el Mapper va a recibir como key elementos de tipo numérico ya que, como ya he explicado en otros artículos, normalmente va a ser el offset del fichero y ese valor no lo vamos a utilizar.
El value que recibe el map es cada una de las líneas del fichero, es decir, valores de tipo texto.
El map va a retornar una lista de pares key/value, donde la key es la palabra localizada (de tipo texto) y el value es un número (en este caso siempre va a devolver un 1 por cada palabra que encuentre).

El Reducer va a recibir una key que será la palabra (dada en el Mapper) de tipo texto y como valores una lista de números. Y finalmente emitirá la palabra que estamos contanto de tipo texto como key, y como value el resultado de las veces que aparece esa palabra, es decir, de tipo número.

En la clase map y en la clase reduce los tipos de entrada no tienen por qué ser los mismos que los de salida. Pero es muy importante tener en cuenta los tipos de la salida del map deben ser los mismos que los de entrada del reduce.

Con el fin de que este artículo no quede demasiado largo, lo he dividido en varios artículos e iré explicando el código escribiendo los comentarios correspondientes.

Para ejecutar y probar el programa en Eclipse podéis leer este artículo.

Continuar con:
Introducción al desarrollo en Java (Parte II): El Mapper (Ejemplo Word Count)
Introducción al desarrollo en Java (Parte III): El Reducer (Ejemplo Word Count)
Introducción al desarrollo en Java (Parte IV): El Driver (Ejemplo Word Count)
Introducción al desarrollo en Java (Parte V): Métodos setup() y cleanup()

jueves, 21 de febrero de 2013

Configuración de Eclipse con Hadoop (Local y Pseudo-Distribuído)

En esta entrada voy a explicar cómo configurar eclipse para poder trabajar con Hadoop.

Por decirlo de alguna forma, hay dos formas de trabajar:
Una realizando una aplicación para ejecutarla en modo pseudo-distribuído a través de los demonios que hemos configurado e instalado (como hemos visto en esta entrada).
Y la otra instalándole al eclipse un plugin que nos permitirá trabajar en modo local, sin necesidad de lanzar los demonios.

Si no tenemos ya el eclipse instalado, descargamos la última versión disponible (actualmente Juno) en http://www.eclipse.org/downloads/





Crear una aplicación y ejecutarla en modo local


Descargamos el plugin de hadoop para eclipse en.
http://wiki.apache.org/hadoop/EclipsePlugIn

Guardamos el jar descargado en {ruta_eclipse}/eclipse/plugins
Arrancamos el eclipse, seleccionamos un workspace. Ahora, si vamos a Window-Open Perspective-Other, podremos seleccionar la vista MapReduce.

Primero hay que configurar Hadoop en el eclipse en Eclipse-Preferencias-Hadoop, ponemos la ruta de donde habíamos instalado hadoop (/usr/local/hadoop/hadoop-1.0.4)




Ahora ya podemos crear nuevas aplicaciones de tipo MapReduce.



Una vez creada la nueva aplicación, el plugin nos permite añadir clases de tipo Mapper, Reduccer y el Driver:


Además, a la hora de crear un nuevo Driver, si le indicamos cuál es el Mapper y el Reducer lo creará con las configuraciones y las relaciones a estas clases hechas.



Hay que tener cuidado que este plugin va a crear las clases con los encabezados y tipos de la "old API", si vamos a querer desarrollar con la "new API" vamos a tener que cambiarlos a mano, tanto los paquetes importados, como los tipos y los encabezados.

También otra cosa en la que hay que tener cuidado, que al desarrollar en Eclipse y en modo local hay tener cuidad con el paso de parámetros entre el Driver y el Mapper o el Reducer.




Crear una aplicación y ejecutarla en modo pseudo-distribuído


Para lanzar Jobs MapReduce hay que seguir todos estos pasos.


Si hemos instalado el plugin de Hadoop para Eclipse, crearíamos una nueva aplicación MapReduce (tal y como hemos visto en la parte de creación y ejecución para modo local).
Si no hemos instalado el plugin de Hadoop, crearíamos una aplicación Java estándar y tendríamos que añadir al build path las librerías que se encuentran en /usr/local/hadoop/hadoop-1.0.4/lib más las que se llaman hadoop-***.jar que se encuentran en la raíz /usr/local/hadoop/hadoop-1.0.4






La instalación de Hadoop la había hecho en el directorio /usr/local/hadoop, así que en ese nivel he creado este sistema de carpetas:
 /usr/local/hadoop/training/jars -> Donde depositaré mis aplicaciones
 /usr/local/hadoop/training/docs -> Donde depositaré ficheros sobre los que quiera trabajar posteriormente en HDFS.

Después de haber desarrollado nuestra aplicación, con el botón derecho sobre el proyecto vamos a Export, seleccionamos Jar File y como destino  /usr/local/hadoop/training/jars/nombreAplicacion.jar



Y por último, a través del terminal arrancar todos los demonios, y lanzar la aplicación a través de los comandos hadoop.

Ahora sólo queda que si la aplicación da algún tipo de error, volveremos al eclipse, corregiremos los cambios y tendremos que volver a exportar el nuevo jar.


Ejecución de Jobs HDFS

Si vamos usar Hadoop sin necesidad de lanzar un Job MapReduce (por ejemplo, si sólo estamos haciendo operaciones HDFS), hay una forma más fácil de lanzarlo.

Tras haber creado la nueva aplicación (bien sea a través del plugin, o a siendo una aplicación Java estándar a la que le hemos incluído las librerías), vamos a las propiedades del proyecto-Java Build Path-Libraries  y luego pulsando sobre "Add External Class Folder" y añadimos la carpeta conf de Hadoop en la ruta /usr/local/hadoop/hadoop-1.0.4/conf



También en este caso tendríamos arrancar el clúster Hadoop a partir de la línea de comandos del terminal.

Pero a partir de ahora, cuando desarrollemos con el Eclipse este tipo de aplicaciones, podremos ejecutarlas en modo pseudo-distribuído sin necesidad de exportar el Jar y haciendo simplemente un Run As Java Application (si es sin el plugin) y con el plugin valdría tanto como Java Application como Run On Hadoop


lunes, 18 de febrero de 2013

Hadoop Ecosystem

¿A qué llamamos Ecosistema Hadoop?
Desde que nació esta tecnología se han creado varios proyectos con distintas utilidades que nos pueden solucionar muchas cosas a la hora de trabajar con Hadoop.

Hadoop es la parte central del sistema y el ecosistema Hadoop es todo el conjunto de proyectos que ayudan a implementar esta solución.

Algunos de los proyectos del Ecosistema son:
  • Hive: Es un sistema de almacenamiento de datos (Data Warehouse) para Hadoop y basado en metadatos. Tiene un lenguaje parecido a SQL (HiveQL) y genera código MapReduce que funciona sobre Hadoop. En Facebook, más del 99% de los Jobs MapReduce se generan a partir de Hive.
  • Pig: Al igual que Hive, tiene un lenguaje propio (pero más tipo scripting) llamado PigLatin y que también se convierte en Jobs MapReduce que funcionan en el clúster. En este caso no tiene metadatos (por eso es más sencillo de instalar que Hive).
  • Impala: Permite queries en tiempo real sobre datos en HDFS. No usa MapReduce, usa sus propios demonios y luego ejecutándose en cada nodo esclavo. Su lenguaje es similar a HiveQL y tanto los datos como los metadatos que usa son los mismos que los de Hive.
  • Flume: Es un software para importar datos en tiempo real en un cluster Hadoop (sirve para la ETL: Extraction Transformation and Load). Ideal para recolectar logs de múltiples sistemas y almacenarlos a HDFS. Soporta una gran variedad de fuentes (como syslog, log4j, etc).
  • Chuckwa: Este proyecto realiza ETL de una forma parecida a Flume. También contiene un módulo de visualización de los mismos.
  • Sqoop: Es un proyecto desarrollado para importar datos desde bases de datos relacionales a HDFS y por lo tanto, es también un sistema de ETL. Se puede importar toda la BD, sólo algunas tablas o sólo partes de la tabla. También permite realizar la operación inversa: exportar datos en HDFS a una base de datos relacional.
  • Oozie: Se puede decir que es un motor de workflow utilizado para el encadenamiento de Jobs. Se ejecuta en un servidor fuera del clúster y se suele utilizar para workflows complejos, que sean lineales o gráfos. Puede encadenar Jobs MapReduce, Pig, Hive, etc.
  • HBase: Se podría decir que "es la Base de Datos NoSql Hadoop". Puede almacenar hasta petabytes de datos, está orientado a columnas (las relacionales a filas), permite miles de insert por segundo, las tablas pueden tener varios miles de columnas. Al contrario de Hive, su lenguaje de query no es intuitivo y no se parece a SQL.
  • Avro: Es un sistema para la serialización de datos que se usa como alternativa a los SquenceFiles. Tiene un formato propio de compresión, se puede exportar a otros lenguajes que no sean Java y tiene sus propios formatos para el Mapper y el Reducer (AvroMapper y AvroReducer).
  • ZooKeeper: Herramienta para la configuración y sincronización de aplicaciones distribuídas.

En otras entradas profundizaré más sobre algunos de estos proyectos.