Recuerda que puedes descargarte algunos de los ejemplos en la pestaña de Código Fuente
Mostrando entradas con la etiqueta Programación. Mostrar todas las entradas
Mostrando entradas con la etiqueta Programación. Mostrar todas las entradas

miércoles, 10 de julio de 2013

Técnicas de Debugging y Logging

Debugging

Depurar los programas MapReduce es muy complicado, ya que cada instancia de un Mapper se ejecuta en una tarea diferente y quizás en máquinas diferentes, así que adjuntar una técnica de debug en el proceso no es tarea fácil.

El problema es que con cantidades tan grandes de datos puede haber una entrada inesperada o errónea y es un evento que hay que tener en cuenta y controlar.
Porque piénsalo de esta forma, estamos realizando un proyecto de Big Data, eso quiere decir que estamos tratando tales cantidades de datos que los procesos pueden tardar horas o incluso días, imaginemos que a mitad de ese proceso que tarda días hay una entrada de un dato inesperado y el programa falla por no haber tomado ninguna medida.
La conclusión es que al desarrollador sólo le queda tomar una serie de estrategias para evitar este tipo errores:
  • Código defensivo: siempre suponer que un dato puede ser incorrecto o esperar que las cosas vayan mal. Habría que añadir control de excepciones o código que controle los posibles fallos.
  • Empieza un desarrollo pequeño y ve agrandándolo poco a poco
  • Escribe pruebas unitarias, utilizando, por ejemplo, MRunit.
  • Prueba primero en local con cantidades reducidas de datos, luego pasa a modo pseudodistribuído (asegurándote que el entorno es similar al del clúster) y finalmente prueba en todo el clúster.
  • Utiliza Counters, por ejemplo, para conocer el número de entradas inválidas.

Para hacer pruebas en local no se usa HDFS, se usa el sistema de ficheros local y sólo se ejecuta un proceso llamado LocalJobRunner.


Logging

Como logging lo más efectivo es usar librerías como el log4java, ya que éste se puede configurar en niveles y desactivarlo en situaciones de producción.

Como buenas prácticas nunca se debería usar println como estrategia de loggin cuando hacemos nuestras primeras pruebas, ya que posteriormente deberíamos borrarlo en todo el código. Si dejáramos estas líneas y pasáramos el código al clúster en modo producción, podría afectar considerablemente a los tiempos de ejecución, piensa que son procesos que necesitamos que sean lo más rápidos posibles y el realizar un println supone un consumo de tiempo ejecutando el comando y realizando la escritura.
En el clúster deben estar desactivadas todas las escrituras de logging innecesarias (no perjudica el rendimiento el hacer loggin en los métodos setup y cleanup).







lunes, 27 de mayo de 2013

Patrón de Diseño "In-Mapper Combining" para programación de algoritmos MapReduce en Java

En esta entrada voy a tratar la otra técnica de Local Aggregation que comenté en la entrada anterior sobre Combiners, utilizada para optimizar los Job MapReduce y reducir la cantidad de tráfico entre el Mapper y el Reducer, el In-mapper combining, que es más bien un patrón de diseño para algoritmos MapReduce.
Este algoritmo nos lo proponen en el libro de J. Lin and C. Dyer "Data-Intensive Text Processing with MapReduce"

La meta de esta técnica es, como ya he dicho, el reducir el número de pares key/value que salen del Mapper y se envían al Reducer.

La ventaja del in-mapper combining sobre el Combiner tradicional que vimos en la entrada anterior, es que el primero puede que se ejecute o puede que no, no tenemos control sobre ello. Mientras que el in-mapper combining podemos hacer que se ejecute siempre. La segunda ventaja de este patrón de diseño es controlar cómo se lleva a cabo exactamente.

Otra ventaja es que con el Combiner se reduce el número de datos que llegan al Shuffle and Sort para luego enviarlos al Reducer, pero realmente no reduce el número de pares key/value emitidos por el Mapper.

Al in-mapper combining más bien se le considera como un patrón de diseño a la hora de desarrollar algoritmos MapReduce en Hadoop, es decir, una técnica a la hora de programar ampliando el algoritmo implementado en el Mapper para reducir el número de pares key/value que emite.

Esta técnica se puede dividir en dos formas: una local y una global.

Volviendo al ejemplo más básico que tenemos, el WordCount, en el que tenemos una clase Mapper y una Reducer, con esta técnica el Reducer va a ser exactamente igual, pero en el Mapper vamos a introducir directamente la funcionalidad del Combiner, más bien en lo que se basa. Y esto lo hace utilizando un objeto de tipo Map.
Este se refiere al algoritmo local, ya que es local respecto al método.
Con esta técnica hemos reducido desde el principio el número de pares key/value que emite el Mapper y que llegan al Shuffle and Sort para posteriormente ser enviados al Reducer.

 
public class WordCountMapper extends 
  Mapper <LongWritable, Text, Text, IntWritable>{

 private IntWritable cuenta = new IntWritable();
 private Text palabra = new Text();

 @Override
 public void map(LongWritable key, Text values, Context context) 
     throws IOException, InterruptedException{
     
   String linea = values.toString();
   Map<String, Integer> myWordMap = new HashMap<String, Integer>();  
   for(String word : linea.split(" ")){
    int sum = 1;
    if(myWordMap.containsKey(word)){
     sum += myWordMap.get(word);
    }
    myWordMap.put(word, sum);
   }
   
   Iterator<String> iterator = myWordMap.keySet().iterator();
   while (iterator.hasNext()){
    palabra.set(iterator.next());
    cuenta.set(myWordMap.get(palabra.toString()));
    context.write(palabra, cuenta);
   }
  }
}


Este método todavía conlleva a que el Mapper cree y destruya múltiples objetos o realice otras tareas que ocupan memoria de una forma considerable, y es por ello que se debe optimizar llegando a la forma global, de tal forma que el Mapper sólo emitirá los pares key/value necesarios al Shuffle and Sort y al Reducer.

Esa optimización se hace utilizando los métodos setup() y cleanup(), que como ya comenté en la entrada sobre estos métodos, se van a ejecutar sólo una vez, el primero antes de que todas las tareas map comiencen y el segundo justo cuando todas las tareas map finalizan y antes de que el Mapper sea destruído.

 
public class WordCountMapper extends 
  Mapper <LongWritable, Text, Text, IntWritable> {

 Map<String, Integer> myWordMap;
 @Override
 protected void setup(Context context)
   throws IOException, InterruptedException {
  if(myWordMap == null )
   myWordMap = new HashMap<String, Integer>();
 }
 
 @Override
  public void map(LongWritable key, Text values, Context context) 
     throws IOException, InterruptedException{
     
   String linea = values.toString();   
   for(String word : linea.split(" ")){
    int sum = 1;
    if(myWordMap.containsKey(word)){
     sum += myWordMap.get(word);
    }
    myWordMap.put(word, sum);
   }
  }

 @Override
 protected void cleanup(Context context)
   throws IOException, InterruptedException {
  Iterator<String> iterator = myWordMap.keySet().iterator();
  IntWritable cuenta = new IntWritable();
  Text palabra = new Text();
    
  while (iterator.hasNext()){
   palabra.set(iterator.next());
   cuenta.set(myWordMap.get(palabra.toString()));
   context.write(palabra, cuenta);
  }
 }
}

Desarrollar la clase Mapper de esta forma mejora la opción anterior pero sigue teniendo problemas de memoria.


Recordemos que cuando usamos el Combiner, primero el Mapper va a tratar los datos y va a emitir los pares key/value, los va a escribir en el disco local y después va a ser el Combiner quien va a actuar.
Si implementamos el algoritmo anterior nos puede surgir un problema con la memoria, ya que si estamos hablando de TB de datos, o no hay coincidencia en las palabras y se genera un registro para cada palabra, estos se van a cargar en el HashMap en memoria y no se van a liberar hasta que todas las tareas map han finalizado y el objeto HashMap se destruya.

La solución para limitar la memoria es "bloquear" la entrada de pares key/value y liberar la estructura HashMap cada cierto tiempo.
Así que la idea es en vez de emitir los datos intermedios una vez que todos los pares key/value han sido tratados, emitir los datos intermedios después de haber tratado N pares key/value.
Esto tendría una fácil solución, que es poner un contador FLUSH_COUNTER, y cuando el número de pares key/value tratados llegue a ese contador, emitir los datos y vaciar el HashMap.

 
public class WordCountMapper extends 
  Mapper <LongWritable, Text, Text, IntWritable> {

 private static Map<String, Integer> myWordMap;
 private static final int FLUSH_COUNTER = 1000;
  
 @Override
 protected void setup(Context context)
   throws IOException, InterruptedException {
  if(myWordMap == null )
   myWordMap = new HashMap<String, Integer>();
 }
 
 @Override
  public void map(LongWritable key, Text values, Context context) 
     throws IOException, InterruptedException{

  String linea = values.toString();
   for(String word : linea.split(" ")){
    if(myWordMap.containsKey(word)){
     myWordMap.put(word, myWordMap.get(word)+1);
    }else{
     myWordMap.put(word, 1);
    }
   }
   
   if(myWordMap.size() >= FLUSH_COUNTER)
    flush(context);
  }

 private void flush(Context context) throws IOException, InterruptedException{
  Iterator<String> iterator = myWordMap.keySet().iterator();
  IntWritable cuenta = new IntWritable();
  Text palabra = new Text();
    
  while (iterator.hasNext()){
   palabra.set(iterator.next());
   cuenta.set(myWordMap.get(palabra.toString()));
   context.write(palabra, cuenta);
  }
  
  myWordMap.clear();
 }

 @Override
 protected void cleanup(Context context)
   throws IOException, InterruptedException {
  flush(context);
 }
}



Estamos en un punto donde podemos seguir haciéndonos preguntas una tras otra para optimizar este algoritmo y aplicar este patrón de diseño correctamente, y la siguiente cuestión sería ¿cómo sabemos cuál es el número FLUSH_COUNT de pares que hay que tratar antes de que la memoria falle?.
El libro de referencia propone que más que crear un contador de pares key/value, poner un límite a la memoria que se utiliza y en el momento que el HashMap ha llegado a ese límite se liberaría. El problema es que en Java no es fácil determinar de forma rápida el tamaño de la memoria ocupada por un objeto, por eso, es mejor elegir un FLUSH_COUNT lo más alto posible y teniendo cuidado que el HashMap no provoque un Out Of Memory.


martes, 14 de mayo de 2013

Map Files

Los Map Files son un tipo de ficheros Sequence Files que contienen un index que permite hacer búsquedas por key. Realmente es un directorio que contiene dos sequence files, uno con el índice, y el otro con los datos.
Se puede pensar en este tipo de ficheros como si fueran una especie de java.util.Map.
Si listamos la carpeta de salida donde se generan estos dos ficheros:


$ hadoop-1.0.4/bin/hadoop fs -ls pruebas/poemamapfile
Found 2 items
-rw-r--r--   1 elena supergroup        780 2013-04-22 19:42 /user/elena/pruebas/poemamapfile/data
-rw-r--r--   1 elena supergroup        203 2013-04-22 19:42 /user/elena/pruebas/poemamapfile/index
$


El índice no guarda todos los registros, sino que guarda el valor cada cierto número de registros, por defecto 128. Es decir, el fichero de índice va a tener un formato parecido a este:

1          128
129      6079
257      12054
385      18030
513      ...


Es decir, el número de registro y el offset de éste. Como vemos, el índice contiene registros cada 128.
Si quisiéramos cambiar este intervalo, que en vez los 128 por defecto, poner otro número, usaríamos:

writer.setIndexInterval(nIntervalos);

Siendo nIntervalos cada cuántos registros queremos que se indexen. Si nIntervalos  = 5, la visualización del índice sería:

1        128
6        1215
11      2410
16      3606
21      ...



Crear un MapFile


public class CreateMapFile {

 private static final String[] POEMA = { 
  "El ciego sol se estrella",
  "en las duras aristas de las armas,",
  "llaga de luz los petos y espaldares",
  "y flamea en las puntas de las lanzas.",
  "El ciego sol, la sed y la fatiga",
  "Por la terrible estepa castellana,",
  "al destierro, con doce de los suyos",
  "-polvo, sudor y hierro- el Cid cabalga.",
  "Cerrado está el mesón a piedra y lodo.",
  "Nadie responde... Al pomo de la espada",
  "y al cuento de las picas el postigo",
  "va a ceder ¡Quema el sol, el aire abrasa!"};
 
 private static final String rutaDestino = new String ("pruebas/poemamapfile");

 public static void main(String[] args) throws IOException {
  Configuration conf = new Configuration();
  FileSystem fs = FileSystem.get( conf);
  //Path path = new Path(rutaDestino);

  IntWritable key = new IntWritable();
  Text value = new Text();
  
  MapFile.Writer writer = new MapFile.Writer(conf, fs, 
   rutaDestino, key.getClass(), value.getClass());

  //Indico un intervalo de índice diferente a 128 (por defecto)
  writer.setIndexInterval(5);
   
  //No usaré posiciones consecutivas.
  int pos = 1;
  for (int i = 0; i < POEMA.length; i++) { 
   // La key es el número de línea
   key.set(pos); 
   // El value es la línea del poema correspondiente
   value.set(POEMA[i]); 
   // Escribimos el par en el sequenceFile 
   
   writer.append(key, value);
   pos += 3;
  }
  writer.close();
 }

}


Leer un MapFile


public class ReadMapFile {

 private static final String rutaDestino = new String ("pruebas/poemamapfile");
 
 public static void main(String[] args) throws IOException, InstantiationException, IllegalAccessException {
  Configuration conf = new Configuration();
  FileSystem fs = FileSystem.get(conf);

  MapFile.Reader reader = new MapFile.Reader(fs, rutaDestino, conf);
  
  IntWritable key = (IntWritable) reader.getKeyClass().newInstance();
  Text value = (Text) reader.getValueClass().newInstance();
  
  StringBuilder strBuilder;
  while(reader.next(key, value)){
   strBuilder = new StringBuilder(key.toString()).append(" - ").append(value.toString());
   System.out.println(strBuilder);
  }

  // Posiciono el reader en una key dada y conocida, si no existe la key daría un error
  value = (Text)reader.get(new IntWritable(7), new Text());
  strBuilder = new StringBuilder(" Probando get() ").append(value.toString());
  System.out.println(strBuilder);
  
  // Busco una key a partir de un valor, que si no existe, me daría el valor
  // siguiente más próximo y me posiciona el reader en ese lugar.
  key = (IntWritable) reader.getClosest(new IntWritable(11), new Text());
  strBuilder = new StringBuilder(" Probando getClosest() ").append(key.toString());
  System.out.println(strBuilder);

  
  reader.close();
 }

}


En la creación las key deben estar ordenadas, si intentamos crear un mapfile con un orden aleatorio, dará error de tipo:
Exception in thread "main" java.io.IOException: key out of order: n after m


Al igual que en los Sequence Files el MapFile.Reader nos da opciones para posicionarnos en el reader.
A través del método get(), dada una key conocida, aunque si esa key no existe al intentar acceder al Objeto devuelto nos dará error.
Y a través del método getClosest(), nos posicionaremos en el valor más próximo después de la key introducida.

viernes, 3 de mayo de 2013

ToolRunner

Cuando desarrollamos una aplicación Hadoop, quizás necesitemos en algún momento utilizar opciones de la línea de comandos Hadoop, para lo cual se utiliza la clase ToolRunner.

GenericOptionsParser es quien interpreta esas opciones de línea de comandos de Hadoop.
Normalmente no se usa esta clase, sino que se usa ToolRunner que la utiliza internamente y se suele usar en el Driver de un programa MapReduce

Así que si en algún momento queremos pasar opciones de configuración a través de la línea de comandos, en nuestro programa deberemos utilizar ToolRunner e invocando al método ToolRunner.run().

Para pasar opciones de configuración a través de línea de comando se usaría a través de la opción:
-D propiedad=valor

Supongamos que lanzamos la ejecución de un programa MapReduce a través de la línea de comandos de esta forma:

hadoop jar WordCount.jar WordCountDriver -D mapreduce.job.reduces=5 -D mapreduce.job.maps=10 directorioInput directorioOutput


Si no usáramos el ToolRunner tendríamos que controlar las propiedades introducidas haciendo algo parecido a esto:

public static void main(String[] args) throws Exception {

  Configuration conf = new Configuration();
  ...
  conf.set("prop1",args[0]);  
  conf.set("prop2", args[1]);
  ...
  Job job = new Job(conf);
  ...
  FileInputFormat.setInputPaths(job, new Path(args[2]));
  FileOutputFormat.setOutputPath(job, new Path(args[3]));
  ...
}


Mientras que si utilizamos el ToolRunner, la función run se encarga de parsear por si sólo los argumentos de configuración que hayamos introducido, siempre y cuando hayamos puesto -D delante de la propiedad.
Para desarrollar a través del ToolRunner la clase debe extender de Configured e implementar Tools. Resultando de la forma:


public class WordCountDriver extends Configured implements Tool{
 public int run(String[] args) throws Exception {
  Configuration conf = new Configuration();
  Job job = new Job(conf);
  ...
  FileInputFormat.setInputPaths(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
   ...
 }

 public static void main(String[] args) throws Exception {
       int exitCode = ToolRunner.run(new Configuration(), 
            new WordCountDriver(), args);
       System.exit(exitCode);
 }
}


Si os preguntáis cuál es la diferencia y las ventajas entre usar ToolRunner o llamar directamente a la función main con todas las configuraciones, realmente el funcionamiento es el mismo, pero utilizando el ToolRunner es mucho más limpio y no usarlo puede llevarnos a errores como asignar las propiedades después de haber instanciado el Job (que esa propiedad sería nula).

public static void main(String[] args) throws Exception {

  Configuration conf = new Configuration();
  Job job = new Job(conf);
  ...
  conf.set("prop1",args[0]);  
  ...
  // Aquí "prop1 sería nulo en el job
  job.getConfiguration().get("prop1"); 
}


Además de la opción -D que nos permite pasar parámetros a la ejecución, hay que saber que existen otras opciones de línea de comandos que nos permiten:
-files fich1, fich2, ... : Que copia los ficheros indicados del sistema de ficheros local al sistema de ficheros HDFS para utilizarlos en el programa MapReduce.
-libjars jar1, jar2, ... : Que copia las librerías jar especificadas del sistema de ficheros local a HDFS y lo añade al Classpath de la tarea MapReduce (muy útil para añadir dependencias Jar de los Job).
-archives archive1, archive2, ... : Que copia los archivos del sistema de ficheros local a HDFS y poniéndolos a disposición de los programas MapReduce.




lunes, 22 de abril de 2013

Paso de Parámetros en Hadoop

Algo tan simple como un paso de parámetros entre el Driver y el Mapper o Reducer puede llevarnos a un error ya que como desarrolladores estamos acostumbrados a que pueda haber una cierta "comunicación" entre las clases.

En Hadoop esto no es correcto, y a estas alturas deberíamos tener claro que este tipo de aplicaciones se ejecuta de forma distribuída, que cada tarea se ejecuta en una JVM diferente e incluso en distintos nodos del clúster.

Voy a dejaros dos ejemplos, uno con la forma correcta y otro con la forma incorrecta por si queréis ver qué pasa, ya que forma la incorrecta no es que dé un error, el Job va a ejecutarse sin problemas, pero el parámetro no se pasará.

También hay que tener cuidado, ya que si se ejecuta en modo local, el paso de parámetros SÍ que funcionará. Sin embargo, no funcionará en el momento que llevemos nuestro programa al modo distribuído o pseudo-distribuído.

El ejemplo es muy simple y ni si quiera vamos a usar la función Reducer. A partir del fichero score.txt que ya he usado para otros ejemplos de la forma:

01-11-2012 Pepe Perez Gonzalez 21
01-11-2012 Ana Lopez Fernandez 14
15-02-2013 Angel Martin Hernandez 3
01-11-2012 Maria Garcia Martinez 11
01-11-2012 Pablo Sanchez Rodriguez 9
01-11-2012 Angel Martin Hernandez 3
15-01-2013 Pepe Perez Gonzalez 17
15-01-2013 Maria Garcia Martinez 3
...

Sólo queremos como salida la lista de personas con una puntuación mayor de 25, y ese valor lo queremos pasar desde el Driver.

Forma Incorrecta

 
public class PersonaScoreDriver {
 
 private static int score;
 
 public static void main(String[] args) throws Exception {
  Configuration conf = new Configuration();
  //Indicamos el parámetro a pasar
  
  score = 25;
    
  Job job = new Job(conf);
  job.setJarByClass(PersonaScoreDriver.class);
  job.setJobName("Persona Score");
  
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);

  FileInputFormat.setInputPaths(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  
  job.setInputFormatClass(KeyValueTextInputFormat.class);
  
  job.setMapperClass(MyMapper.class);
  job.setNumReduceTasks(0);

  boolean success = job.waitForCompletion(true);
  System.exit(success ? 0:1);  
 }
 
 private static class MyMapper extends Mapper{
  @Override
  public void map(Text key, Text value,
    Context context) throws IOException, InterruptedException {
   
   int s = score;
   String[] personaSplit = value.toString().split(" ");
   
   if(personaSplit.length == 4){
    int mScore = Integer.valueOf(personaSplit[3]);
    
    if(mScore >= s)
     context.write(key, value);
   }
  }
 }
}


Forma Correcta

 
public class PersonaScoreDriver {
 
 private static class MyMapper extends Mapper{
  private int score;
  
  @Override
  protected void setup(Context context) throws IOException,
    InterruptedException {
   Configuration conf = context.getConfiguration();
   //Indicas cuál es el id del parámetro a recoger y el valor
   // por defecto.
   score = conf.getInt("score", 0);
  }
  
  @Override
  public void map(Text key, Text value,
    Context context) throws IOException, InterruptedException {
   
   int s = score;
   String[] personaSplit = value.toString().split(" ");
   
   if(personaSplit.length == 4){
    int mScore = Integer.valueOf(personaSplit[3]);
    
    if(mScore >= s)
     context.write(key, value);
   }
  }
 }
 
 public static void main(String[] args) throws Exception {
  Configuration conf = new Configuration();
  //Indicamos el parámetro a pasar
  conf.setInt("score", 25);
    
  Job job = new Job(conf);
  job.setJarByClass(PersonaScoreDriver.class);
  job.setJobName("Persona Score");
  
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);

  FileInputFormat.setInputPaths(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  
  job.setInputFormatClass(KeyValueTextInputFormat.class);
  
  job.setMapperClass(MyMapper.class);
  job.setNumReduceTasks(0);

  boolean success = job.waitForCompletion(true);
  System.exit(success ? 0:1);  
 }
}


Hay que tener en cuenta que esta forma de pasar información entre el Driver y las tareas (map o reduce) es muy útil siempre cuando esta información no ocupa demasiada memoria ya que esta información se copia en varios sitios: en la memoria del Driver, en la memoria del TaskTracker y en la memoria de la JVM que ejecuta la tarea.
Por lo tanto, si se trata de una información grande (por ejemplo, varios TB), conviene usar otro método (por ejemplo: almacenar esta información en un fichero HDFS y acceder a esta información vía el método setup(), o bien usar Distributed Cache).

Recordad que este código también lo podréis descargar desde este enlace.

martes, 16 de abril de 2013

Ejemplo de Partitioner

En la entrada anterior vimos qué es el Partitioner, ahora toca ver un ejemplo y el código desarrollado para hacer ese ejemplo.

Atención si vais a hacer las pruebas en local ya que no funciona, el modo local sólo tiene un Reducer y por eso es mejor usar por lo menos el modo pseudo distribuido.

En este ejemplo, a partir del fichero scores.txt con la forma:

01-11-2012 Pepe Perez Gonzalez 21
01-11-2012 Ana Lopez Fernandez 14
15-02-2013 Angel Martin Hernandez 3
01-11-2012 Maria Garcia Martinez 11
01-11-2012 Pablo Sanchez Rodriguez 9
01-11-2012 Angel Martin Hernandez 3
15-01-2013 Pepe Perez Gonzalez 17
15-01-2013 Maria Garcia Martinez 3
...

Queremos dividir los datos por año y enviar a cada Reducer las personas que han jugado en ese año. A través de un Partitioner vamos a indicar a qué Reducer va cada registro.

El Driver es parecido a todo lo que hemos visto hasta ahora, lo único que hemos definido que el InputFormat será un KeyValueTextInputFormat (ya que la fecha está separado por una tabulación del resto de la línea, así que este formato reconocerá la entrada).
Y luego añadimos en las configuraciones nuestra clase Partitioner y el número de tareas Reduce que queremos (nuestro fichero sólo tiene datos de 2 años (2012 y 2013), entonces serán 2 tareas Reducer).


public class PersonaScoreDriver {
 public static void main(String[] args) throws Exception {
  Configuration conf = new Configuration();
  Job job = new Job(conf);
  job.setJarByClass(PersonaScoreDriver.class);
  
  job.setJobName("Persona Score");
  
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);

  FileInputFormat.setInputPaths(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  
  job.setInputFormatClass(KeyValueTextInputFormat.class);
  //Establecemos el número de tareas Reduce
  job.setNumReduceTasks(2);
  
  job.setMapperClass(PersonaScoreMapper.class);
  job.setReducerClass(PersonaScoreReducer.class);
  //Indicamos cuál es nuestro partitioner
  job.setPartitionerClass(PersonaScorePartitioner.class);

  boolean success = job.waitForCompletion(true);
  System.exit(success ? 0:1);  
 }
}

En el Mapper lo único que hacemos es sacar el nombre y apellidos del value y emite un par key/value enviando como key la fecha y como value la persona.

public class PersonaScoreMapper extends 
 Mapper<Text, Text, Text, Text> {
 
 Text persona = new Text();
 
 @Override
 public void map(Text key, Text values,
   Context context) throws IOException, InterruptedException {
  
  String[] personaSplit = values.toString().split(" ");
  StringBuilder persBuilder = new StringBuilder();
  // Puede haber personas con un apellido o con dos
  if(personaSplit.length == 3 || personaSplit.length == 4){
   if(personaSplit.length == 3){
    persBuilder.append(personaSplit[0]).append(" ")
     .append(personaSplit[1]);
   }else {
    persBuilder.append(personaSplit[0]).append(" ")
     .append(personaSplit[1]).append(" ")
     .append(personaSplit[2]);
   }
   persona.set(persBuilder.toString());
   context.write(key, persona);
  }
 }
}


La clase Reducer lo único que hace es recoger la key con su lista de values correspondientes, recorrer esa lista y emitir cada par key/value con la fecha y el nombre. Al haber realizado el Partiiioner, un mismo reducer procesará las keys de un mismo año.

public class PersonaScoreReducer extends 
 Reducer<Text, Text, Text, Text> {

 @Override
 public void reduce(Text key, Iterable<Text> values,
   Context context) throws IOException, InterruptedException {
  for (Text value : values) {
   context.write(key, value);
  }  
 }
}


Por último el Partitioner, que lo que hace es devolver un entero indicando cuál es el Reducer al que irán los datos intermedios generados por el Mapper.

public class PersonaScorePartitioner extends Partitioner<Text, Text> {
 
 @Override
 public int getPartition(Text key, Text value, int numPartitions) {
  
  if(key.toString().endsWith("2012")){
   return 0;
  }else{
   return 1;
  }
 }
}


Una vez visto el desarrollo y cómo quedaría el código sólo quedaría exportar las clases como jar (tal y como vimos en esta entrada) al directorio que tengamos preparado para la ejecución de Jobs en modo pseudo-distribuído y lo lanzaríamos (previamente habiendo puesto en HDFS el fichero scorePartMezcla).
También os recuerdo que las clases las podréis encontrar en la sección de Código Fuente

hadoop jar training/jars/EjemploPartitioner.jar PersonaScoreDriver pruebas/scorePartMezcla pruebas/resultados/ejemploPartitioner

Y este debería ser el resultado si listamos el contenido del directorio ejemploPartitioner:

elena:hadoop elena$ hadoop fs -ls pruebas/resultados/ejemploPartitioner
Found 4 items
-rw-r--r--   1 elena supergroup          0 2013-04-02 19:34 /user/elena/pruebas/resultados/ejemploPartitioner/_SUCCESS
drwxr-xr-x   - elena supergroup          0 2013-04-02 19:34 /user/elena/pruebas/resultados/ejemploPartitioner/_logs
-rw-r--r--   1 elena supergroup        562 2013-04-02 19:34 /user/elena/pruebas/resultados/ejemploPartitioner/part-r-00000
-rw-r--r--   1 elena supergroup        684 2013-04-02 19:34 /user/elena/pruebas/resultados/ejemploPartitioner/part-r-00001



Y en cada fichero quedaría el siguiente contenido.

En el part-r-00000 que correspondería al año 2012 y que le habíamos asignado el valor 0:


elena:hadoop elena$ hadoop-1.0.4/bin/hadoop fs -cat pruebas/resultados/ejemploPartitioner/part-r-00000
01-11-2012 Angel Martin Hernandez
01-11-2012 Maria Garcia Martinez
01-11-2012 Ana Lopez Fernandez
01-11-2012 Pablo Sanchez Rodriguez
01-11-2012 Pepe Perez Gonzalez
01-12-2012 Maria Garcia Martinez
01-12-2012 Pepe Perez Gonzalez
01-12-2012 Pablo Sanchez Rodriguez
01-12-2012 Ana Lopez Fernandez
15-11-2012 Pepe Perez Gonzalez
15-11-2012 Maria Garcia Martinez
15-11-2012 John Smith
15-11-2012 Cristina Ruiz Gomez
15-12-2012 John Smith
15-12-2012 Cristina Ruiz Gomez
15-12-2012 Maria Garcia Martinez
15-12-2012 Pepe Perez Gonzalez
15-12-2012 Angel Martin Hernandez




En el part-r-00001 que correspondería al año 2013 y que le habíamos asignado el valor 1:

elena:hadoop elena$ hadoop-1.0.4/bin/hadoop fs -cat pruebas/resultados/ejemploPartitioner/part-r-00001
01-01-2013 Ana Lopez Fernandez
01-01-2013 John Smith
01-01-2013 Pablo Sanchez Rodriguez
01-01-2013 Pepe Perez Gonzalez
01-01-2013 Maria Garcia Martinez
01-01-2013 Angel Martin Hernandez
01-02-2013 Ana Lopez Fernandez
01-02-2013 Cristina Ruiz Gomez
01-02-2013 Maria Garcia Martinez
01-02-2013 Pepe Perez Gonzalez
15-01-2013 Angel Martin Hernandez
15-01-2013 Maria Garcia Martinez
15-01-2013 Pepe Perez Gonzalez
15-01-2013 John Smith
15-01-2013 Pablo Sanchez Rodriguez
15-02-2013 Pepe Perez Gonzalez
15-02-2013 John Smith
15-02-2013 Pablo Sanchez Rodriguez
15-02-2013 Maria Garcia Martinez
15-02-2013 Ana Lopez Fernandez
15-02-2013 Cristina Ruiz Gomez
15-02-2013 Angel Martin Hernandez


sábado, 6 de abril de 2013

Output Formats

Los Output Formats son muy parecidos a los tipos vistos en la entrada anterior de los Input Formats.
Pero esta vez la interfaz OutputFormat va a determinar cómo será la salida del Job que vamos a ejecutar.
Para establecer el output format se configura en el Driver a través de:

job.setOutputFormatClass(Tipo.class);

La clase base de salida en Hadoop es FileOutputFormat (que hereda de OutputFormat), y a partir de aquí existen diferentes tipos para poder implementar esa salida, estos son algunos:
  • TextOutputFormat
  • SequenceFileOutputFormat
    • SequenceFileAsBinaryOutputFormat
  • MultipleOutputFormat
El tipo por defecto de salida es el TextOutputFormat, que escribe cada registro (un par key/value) en líneas de texto separadas, y los tipos de los pares key/value pueden ser de cualquier tipo, siempre y cuando implementen el método toString().

También podría ser posible eliminar la key o el value de la salida a través del tipo NullWritable o los dos, que sería mejor definir la salida del Job con el tipo NullOutputFormat.
Si queremos que la salida sea nula, en el Driver definiríamos:
job.setOutputFormatClass(NullOutputFormat.class);

Si sólo queremos eliminar la key o el value se haría con:
job.setOutputKeyClass(NullWritable.class);


o con:
job.setOutputValueClass(NullWritable.class);



Como en los Input Formats, el output también dispone de salidas binarias como el SequenceFileOutputFormat, que como indica, escribe ficheros de tipo Sequence Files (ficheros binarios en los que se escriben pares key/value) y su subclase SequenceFileAsBinaryOutputFormat, que escribe sequence files en los que las key y values están codificados en binario.

En los tipos FileOutputFormat, por defecto se escribe un fichero por cada reducer, que normalmente está predeterminado a uno, pero se puede cambiar ese número de reducers. El nombre de cada fichero es de la forma part-r-00000, part-r-00001..., siendo la última parte el número de reducer. Así que una de las formas de dividir las salidas puede ser usando varios reducers, pero esta no es la solución que nos interesa ver aquí.
La solución sería utilizando la clase MultipleOutputs, se crearía un objeto de este tipo en el reducer y en vez de llamar al write del Context, se haría al write del MultipleOutputs.
También la ventaja de este tipo de Output es que puedes definir el nombre que deseas darle al fichero name-r-00000

Ejemplo de MultipleOutputs:

A partir de nuestro fichero score.txt queremos un programa que separe a los jugadores y sus puntuaciones por fecha agrupados en ficheros separados.
Recordamos que el fichero es de este tipo
 
01-11-2012 Pepe Perez Gonzalez 21
01-11-2012 Ana Lopez Fernandez 14
15-11-2012 John Smith 13
01-12-2012 Pepe Perez Gonzalez 25
...


El Driver es como lo configuramos normalmente, no tiene ninguna configuración especial para hacer el MultipleOutput:
 
public class TestMultipleOutputDriver {
 public static void main(String[] args) throws Exception {
  
  Configuration conf = new Configuration();
  Job job = new Job(conf);
  job.setJarByClass(TestMultipleOutputDriver.class);
  
  job.setJobName("Word Count");
  
  job.setMapperClass(TestMultipleOutputMapper.class);
  job.setReducerClass(TestMultipleOutputReducer.class);
  
  job.setInputFormatClass(KeyValueTextInputFormat.class);

  FileInputFormat.setInputPaths(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);

  boolean success = job.waitForCompletion(true);
  System.exit(success ? 0:1); 
 }
}



En el Mapper lo único que hacemos es emitir el par key/value que recibimos, ya que no estamos haciendo ningún tratamiento de los datos como tal para este ejemplo.
 
public class TestMultipleOutputMapper extends Mapper<Text, Text, Text, Text> {
 public void map(Text key, Text values, Context context) 
   throws IOException, InterruptedException{
  
  context.write(key, values);
 }
}


En el Reducer creamos un objeto de tipo MultipleOutputs que vamos a inicializar en el método setup y es el que vamos a utilizar para escribir la salida.
name será el prefijo del fichero.
 
public class TestMultipleOutputReducer extends Reducer<Text, Text, Text, Text> {

 private MultipleOutputs<Text, Text> multipleOut;
 
 @Override
 protected void cleanup(Context context) throws IOException,
   InterruptedException {
  multipleOut.close();
 }
 @Override
 protected void setup(Context context) throws IOException,
   InterruptedException {
  multipleOut = new MultipleOutputs<Text, Text>(context);
 }
 @Override
 public void reduce(Text key, Iterable<Text> values, 
           Context context) 
     throws IOException, InterruptedException {
  String name = key.toString().replace("-", "");
  for(Text value:values){
   multipleOut.write( key, value, name);
  //Podría añadir más salidas según mis necesidades
  // a través de cláusulas if, o porque un par key/value
  // traiga diversas informaciones que quiero subdividir
  // en diferentes ficheros
  // if(caso1) multipleOut.write( key, value, name2);
  // multipleOut.write( key, value, name3);
  }
 }
}


En este ejemplo con el MultipleOutputs te obliga que aunque quieras que las salidas sean en distintos ficheros, los pares key/value que emites sean todos del mismo tipo del que has definido la clase MultipleOutputs<Text, Text>, es decir, la key debe ser de tipo Text, y el valor también debe ser de tipo Text.
También es posible emitir múltiples salidas en ficheros diferentes y que cada salida sea con tipos distintos para cada fichero.

En el siguiente ejemplo recibo como entrada un fichero con un listado de papers, en los que cada línea contiene la publicación del paper, los autores y el título del paper de la forma:

paper-id:::author1::author2::...::authorN:::title

journals/cl/SantoNR90:::Michele Di Santo::Libero Nigro::Wilma Russo:::Programmer-Defined Control Abstractions in Modula-2.



Quiero 3 ficheros en la salida de este algoritmo:
- Un fichero paper que contenga: String paper-id, String Title
- Un fichero autor que contenga: Int autor-id (se crea en el algoritmo), String nombre autor
- Un fichero paper/autor que los relacione: String paper-id, Int autor-id

Como vemos necesitamos que una salida sea <Text, Text>, otra salida sea <IntWritable, Text> y la última salida sea <Text, IntWritable>.
Esto se haría añadiendo en el Driver las siguientes líneas, donde se asigna un ID a la salida y de qué tipos son esas salidas:
(Podréis encontrar el código fuente completo en este enlace)
 
MultipleOutputs.addNamedOutput(job, "Autor",  TextOutputFormat.class, 
  IntWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, "Paper",  TextOutputFormat.class, 
  Text.class, Text.class);
MultipleOutputs.addNamedOutput(job, "PaperAutor",  TextOutputFormat.class, 
  Text.class, IntWritable.class);


Posteriormente, en el Reducer se haría de la forma:

 
for (PaperWritable value : values) {
  
   //Output tabla Autor
   multipleOut.write("Autor", new IntWritable(contador), key);
   
   //Output tabla Paper
   multipleOut.write("Paper", value.getIdPaper(), 
    value.getTituloPaper());
   
   //Output tabla paper/autor
   multipleOut.write("PaperAutor", value.getIdPaper(), 
    new IntWritable(contador));
   
   contador ++;
  }



domingo, 24 de marzo de 2013

Sequence Files: Ejemplo de creación y lectura a través del FileSystem

Como ya expliqué en esta entrada, FileSystem sirve para acceder a HDFS a través de la API de Java. Así que vamos a usar esta clase para crear y leer un SequenceFile.

Crear un SequenceFile

Crearemos un SequenceFile de un texto, que es un poema, y le pondremos como key el número correspondiente a cada línea.
 
public class CreateSequenceFile {

 private static final String[] POEMA = { 
  "El ciego sol se estrella",
  "en las duras aristas de las armas,",
  "llaga de luz los petos y espaldares",
  "y flamea en las puntas de las lanzas.",
  "El ciego sol, la sed y la fatiga",
  "Por la terrible estepa castellana,",
  "al destierro, con doce de los suyos",
  "-polvo, sudor y hierro- el Cid cabalga.",
  "Cerrado está el mesón a piedra y lodo.",
  "Nadie responde... Al pomo de la espada",
  "y al cuento de las picas el postigo",
  "va a ceder ¡Quema el sol, el aire abrasa!"};
 
 private static final String rutaDestino 
   = new String ("pruebas/poemasequencefile");
 
 public static void main(String[] args) throws IOException {
  Configuration conf = new Configuration();
  FileSystem fs = FileSystem.get( conf);
  Path path = new Path(rutaDestino);
  
  IntWritable key = new IntWritable();
  Text value = new Text();
  
  //Creamos el writer del SequenceFile para poder ir añadiendo
  // los pares key/value al fichero.
  SequenceFile.Writer writer = new SequenceFile.Writer(fs,  
    conf,  path, key.getClass(), value.getClass());
  
  for (int i = 0; i < POEMA.length; i++) { 
   // La key es el número de línea
   key.set(i+1); 
   // El value es la línea del poema correspondiente
   value.set(POEMA[i]); 
   // Escribimos el par en el sequenceFile 
   writer.append(key, value);
  }
  
  writer.close();
 }
}




Leer un SequenceFile

En este ejemplo no sólo vamos a leer el fichero creado anteriormente, sino que también vamos a buscar y a usar los puntos de sincronización, vamos a ver las posiciones del fichero y vamos a desplazarnos a alguna.
 
public class ReadSequenceFile {

 private static final String rutaOrigen 
   = new String ("pruebas/poemasequencefile");
 
 public static void main(String[] args) 
   throws Exception {
  
  Configuration conf = new Configuration();
  FileSystem fs = FileSystem.get( conf);
  Path path = new Path(rutaOrigen);
  
  //Creamos el Reader del SequenceFile
  SequenceFile.Reader reader = 
    new SequenceFile.Reader(fs, path, conf);
  // Leemos la key y value del SequenceFile, los tipos son conocidos,
  // por lo que se declaran variables de esos tipos.
  IntWritable key = 
    (IntWritable) reader.getKeyClass().newInstance();
  Text value = 
    (Text) reader.getValueClass().newInstance();
  
  StringBuilder strBuilder;
  boolean haySync = false;
  long posSync = 0;
  
  //Recorremos el reader recuperando los pares key/value
  while(reader.next(key,value)){
   
   // Comprobamos si la posición es un punto de sync
   // En principio en este fichero no encontrará ninguno ya que es muy
   // pequeño, si fuera uno más grande y tuviera varios puntos de sync
   // se guardará el último punto encontrado.
   if(reader.syncSeen()){
    haySync = true;
    posSync = reader.getPosition();
   }
   
   strBuilder = new StringBuilder("Posición: ").
     append(reader.getPosition()).append(" - Key: ").
     append(key.toString()).append(" Value: " ).
     append(value.toString());
   System.out.println(strBuilder);
  }
  
  if(haySync){
   // reader.sync posicionará el reader en el sync siguiente más próximo,
   // si no hay ninguno se posicionará al final del fichero.
   // En este caso se posicionará en el punto dado, ya que es de sync.
   strBuilder = new StringBuilder("Sync en el punto: ").
     append(posSync);
   System.out.println(strBuilder);
   reader.sync(posSync);
  }else{
   // Es un valor conocido, si no existiera, habría un error
   // al realizar el reader.next.
   posSync = 459;
   reader.seek(posSync);
  }
  
  // En un caso o en otro a pesar de haber finalizado la iteración 
  // hemos posicionado el reader en un punto intermedio, así que 
  // seguimos recorriéndolo (repetimos las líneas)
  // hasta finalizar de nuevo.
  strBuilder = new StringBuilder("Volvemos a la posición: ")
     .append(posSync);
  System.out.println(strBuilder);
  
  System.out.println("Seguimos recorriendo el reader: ");
  while(reader.next(key,value)){
   strBuilder = new StringBuilder("Posición: ").
     append(reader.getPosition()).append(" - Key: ").
     append(key.toString()).append(" Value: " ).
     append(value.toString());
   System.out.println(strBuilder);
  }
  
  reader.close();
 }

}

domingo, 17 de marzo de 2013

Ejemplo de uso de Tipos de Datos propios con las interfaces Writable y WritableComparable

Continuando con la entrada anterior en la que explicaba qué son las interfaces Writable y WritableComparable y cómo es posible crear nuestros propios tipos usándolas, vamos a ver un ejemplo.

El código fuente y el fichero de ejemplo de esta entrada también los podréis encontrar en este enlace.

Ejemplo de aplicación MapReduce utilizando nuestra propia clase como key.
Al programar nuestra clase PersonaWritableComparable que implementa WritableComparable, en la fase del Shuffle and Sort se consigue que el Reducer reciba las key ordenadas y con sus valores correspondientes agrupados para poder operar con ellos.

Recibimos un fichero de texto cuya información es
Fecha [tab] Nombre Apellido1 Apellido2 Puntuación
Queremos como salida un listado de personas (con los nombres y apellidos) y la suma de todas sus puntuaciones.

Fichero de entrada score.txt:

01-11-2012 Pepe Perez Gonzalez 21
01-11-2012 Ana Lopez Fernandez 14
01-11-2012 Maria Garcia Martinez 11
01-11-2012 Pablo Sanchez Rodriguez 9
01-11-2012 Angel Martin Hernandez 3
15-11-2012 Pepe Perez Gonzalez 22
15-11-2012 Maria Garcia Martinez 15
15-11-2012 John Smith 13
01-12-2012 Pepe Perez Gonzalez 25
01-12-2012 Ana Lopez Fernandez 15
01-12-2012 Pablo Sanchez Rodriguez 8
01-12-2012 Maria Garcia Martinez 32
15-12-2012 Maria Garcia Martinez 47
15-12-2012 Pepe Perez Gonzalez 13
15-12-2012 Angel Martin Hernandez 13
15-12-2012 John Smith 27
01-01-2013 Ana Lopez Fernandez 5
01-01-2013 Pablo Sanchez Rodriguez 2
01-01-2013 Pepe Perez Gonzalez 17
01-01-2013 Maria Garcia Martinez 3
01-01-2013 Angel Martin Hernandez 32
01-01-2013 John Smith 21


Nuestra propia clase PersonaWritableComparable:
 
public class PersonaWritableComparable 
  implements WritableComparable<PersonaWritableComparable>{

 Text nombre, primerApellido, segundoApellido;
 
 public void set(String nom, String prApell, String sgApell){
  nombre.set(nom);
  primerApellido.set(prApell);
  segundoApellido.set(sgApell);
 }
 
 public PersonaWritableComparable() {
  this.nombre = new Text();
  this.primerApellido = new Text();
  this.segundoApellido = new Text();
 }

 public PersonaWritableComparable(Text nombre, 
   Text primerApellido, Text segundoApellido) {
  this.nombre = nombre;
  this.primerApellido = primerApellido;
  this.segundoApellido = segundoApellido;
 }
 
 @Override
 public void readFields(DataInput arg0) throws IOException {
  this.nombre.readFields(arg0);
  this.primerApellido.readFields(arg0);
  this.segundoApellido.readFields(arg0);
  
 }

 @Override
 public void write(DataOutput arg0) throws IOException {
  this.nombre.write(arg0);
  this.primerApellido.write(arg0);
  this.segundoApellido.write(arg0);
 }

 @Override
 public int compareTo(PersonaWritableComparable o) {
  if(this.nombre.compareTo(o.nombre) != 0){
   return this.nombre.compareTo(o.nombre);
  }else if(this.primerApellido.compareTo(o.primerApellido) != 0){
   return this.primerApellido.compareTo(o.primerApellido);
  }else if(this.segundoApellido.compareTo(o.segundoApellido) != 0){
   return this.segundoApellido.compareTo(o.segundoApellido);
  }
  return 0;
 }

 @Override
 public boolean equals(Object obj) {
  if(obj instanceof PersonaWritableComparable){
   PersonaWritableComparable p = (PersonaWritableComparable) obj;
   return this.nombre.equals(p.nombre) && 
    this.primerApellido.equals(p.primerApellido) && 
    this.segundoApellido.equals(p.segundoApellido);
  }
  return false;
 }

 @Override
 public int hashCode() {
  return this.nombre.hashCode()*163 + 
    this.primerApellido.hashCode()*163 + 
    this.segundoApellido.hashCode()*163;
 }
 
 @Override
 public String toString() {
  return nombre.toString()+" "+primerApellido.toString()+" "
   +segundoApellido.toString();
 }
}


El Driver de la aplicación:
 
public class PersonaScoreDriver {
 public static void main(String[] args) throws Exception {
  if(args.length != 2){
   System.out.println("Ha ocurrido un error en la entrada");
   System.exit(-1);
  }
  
  Configuration conf = new Configuration();
  Job job = new Job(conf);
  job.setJarByClass(PersonaScoreDriver.class);
  
  job.setJobName("Persona Score");
  
  job.setOutputKeyClass(PersonaWritableComparable.class);
  job.setOutputValueClass(IntWritable.class);

  FileInputFormat.setInputPaths(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  
  job.setMapperClass(PersonaScoreMapper.class);
  job.setReducerClass(PersonaScoreReducer.class);

  boolean success = job.waitForCompletion(true);
  System.exit(success ? 0:1);  
 }
}


La clase Mapper:
 
public class PersonaScoreMapper extends 
 Mapper<LongWritable, Text, 
 PersonaWritableComparable, IntWritable> {

 private IntWritable score = new IntWritable();
 PersonaWritableComparable persona = new PersonaWritableComparable();
 
 public void map(LongWritable key, Text values,
   Context context) throws IOException, InterruptedException {
  
  // El texto tiene este formato:
  // 01-11-2012 Maria Garcia Martinez 11
  // La fecha separada por tabulación, el resto con espacios
  String[] primerSplit = values.toString().split(" ");
  if(primerSplit.length == 2){
   String[] segundoSplit = primerSplit[1].split(" ");
   
   // Puede haber personas con un apellido o con dos
   if(segundoSplit.length == 3 || segundoSplit.length == 4){
    if(segundoSplit.length == 3){
     persona.set(segundoSplit[0], segundoSplit[1], "");
     score.set(Integer.valueOf(segundoSplit[2]));
    }else {
     persona.set(segundoSplit[0], segundoSplit[1], segundoSplit[2]);
     score.set(Integer.valueOf(segundoSplit[3]));
    }
    context.write(persona, score);
   } 
  }
 }
}


La clase Reducer
 
public class PersonaScoreReducer extends 
 Reducer<PersonaWritableComparable, IntWritable, 
 PersonaWritableComparable, IntWritable> {

 public void reduce(PersonaWritableComparable key, 
   Iterable<IntWritable> values,
   Context context) throws IOException, InterruptedException {
  
  int suma = 0;
  for (IntWritable value : values) {
   suma += value.get();
  }
  
  context.write(key, new IntWritable(suma));
 }
}

Ver también: Tipos de datos Hadoop e interfaces Writable y WritableComparable

miércoles, 13 de febrero de 2013

HDFS: Interfaz Java (FileSystem)

Como ya comenté en una entrada anterior (Hadoop Distributed File System (HDFS)), para acceder a HDFS lo podemos hacer de dos formas, por un lado y como vimos en esa misma entrada (con algunos ejemplos) utilizando un terminal y a través de los comandos propios de hadoop y por otro lado a través de la API de Java, que es la parte que vamos a ver ahora.

Para interactuar con HDFS con Java, se hace a través de la clase FileSystem.
El desarrollo es en Java y los ejecutamos desde línea de comando tras haber arrancado los demonios Hadoop (/bin/start-all.sh).

Lo primero que he hecho es crear una carpeta pruebas en mi sistema de ficheros HDFS:
      hadoop fs -mkdir pruebas

Algunos ejemplos que también podéis descargar en este enlace son:

  • Copiar un fichero o carpeta del sistema de ficheros local a HDFS

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class LocalToHdfs {
 public static void main(String[] args) throws Exception{
  Configuration conf = new Configuration();
  FileSystem hdfs = FileSystem.get(conf);
  Path sourcePath = new Path(args[0]);
  Path destPath = new Path(args[1]);
  hdfs.copyFromLocalFile(sourcePath, destPath);
 }
}


Y este es el resultado en el que copio desde mi carpeta local training el fichero score.txt a mi carpeta pruebas en HDFS:

$ hadoop jar training/JavaHdfs.jar LocalToHdfs training/score.txt /user/elena/pruebas 
$ hadoop fs -ls pruebas 
Found 1 items -rw-r--r--  1 elena supergroup  363 2013-02-11 19:31 /user/elena/pruebas/score.txt


  • Crear un fichero HDFS

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class CreateFileHdfs {

 public static void main(String[] args) throws Exception{
  String contentFile = "Este es el contenido del fichero\n";
  byte[] texto = contentFile.getBytes();
  Configuration conf = new Configuration();
  FileSystem hdfs = FileSystem.get(conf);
  Path path = new Path(args[0]+"/nombreFichero");
  FSDataOutputStream outputStream = hdfs.create(path);
  outputStream.write(texto, 0, texto.length);
 }
}


En el código Java tengo puesto el nombre del fichero y paso como argumento la carpeta donde se debe crear el fichero.

$ hadoop jar training/JavaHdfs.jar CreateFileHdfs pruebas 
$ hadoop fs -ls pruebas 
Found 2 items 
-rw-r--r--  1 elena supergroup  32 2013-02-11 19:34 /user/elena/pruebas/nombreFichero 
-rw-r--r--  1 elena supergroup  363 2013-02-11 19:31 /user/elena/pruebas/score.txt 
$ hadoop fs -cat pruebas/nombreFichero 
Este es el contenido del fichero


  • Leer el contenido de un fichero HDFS

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

public class ReadFileHdfs {

 public static void main(String[] args) throws Exception {
  Configuration conf = new Configuration();
  FileSystem hdfs = FileSystem.get(conf);
  Path path = new Path(args[0]+"/nombreFichero");
  FSDataInputStream inputStream = hdfs.open(path);
  IOUtils.copyBytes(inputStream, System.out, conf, true); 
 }
}

$ hadoop jar training/JavaHdfs.jar ReadFileHdfs pruebas 
Este es el contenido del fichero 


  • Borrar un fichero HDFS

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class DeleteFileHdfs {

 public static void main(String[] args) throws Exception{
  Configuration conf = new Configuration();
  FileSystem hdfs = FileSystem.get(conf);
  Path path = new Path(args[0]+"/nombreFichero");
  hdfs.delete(path, false);
 }
}


$ hadoop jar training/JavaHdfs.jar DeleteFileHdfs pruebas 
$ hadoop fs -ls pruebas 
Found 1 items 
-rw-r--r--  1 elena supergroup  363 2013-02-11 19:31 /user/elena/pruebas/score.txt