Recuerda que puedes descargarte algunos de los ejemplos en la pestaña de Código Fuente
Mostrando entradas con la etiqueta Tipos de Ficheros. Mostrar todas las entradas
Mostrando entradas con la etiqueta Tipos de Ficheros. Mostrar todas las entradas

martes, 14 de mayo de 2013

Map Files

Los Map Files son un tipo de ficheros Sequence Files que contienen un index que permite hacer búsquedas por key. Realmente es un directorio que contiene dos sequence files, uno con el índice, y el otro con los datos.
Se puede pensar en este tipo de ficheros como si fueran una especie de java.util.Map.
Si listamos la carpeta de salida donde se generan estos dos ficheros:


$ hadoop-1.0.4/bin/hadoop fs -ls pruebas/poemamapfile
Found 2 items
-rw-r--r--   1 elena supergroup        780 2013-04-22 19:42 /user/elena/pruebas/poemamapfile/data
-rw-r--r--   1 elena supergroup        203 2013-04-22 19:42 /user/elena/pruebas/poemamapfile/index
$


El índice no guarda todos los registros, sino que guarda el valor cada cierto número de registros, por defecto 128. Es decir, el fichero de índice va a tener un formato parecido a este:

1          128
129      6079
257      12054
385      18030
513      ...


Es decir, el número de registro y el offset de éste. Como vemos, el índice contiene registros cada 128.
Si quisiéramos cambiar este intervalo, que en vez los 128 por defecto, poner otro número, usaríamos:

writer.setIndexInterval(nIntervalos);

Siendo nIntervalos cada cuántos registros queremos que se indexen. Si nIntervalos  = 5, la visualización del índice sería:

1        128
6        1215
11      2410
16      3606
21      ...



Crear un MapFile


public class CreateMapFile {

 private static final String[] POEMA = { 
  "El ciego sol se estrella",
  "en las duras aristas de las armas,",
  "llaga de luz los petos y espaldares",
  "y flamea en las puntas de las lanzas.",
  "El ciego sol, la sed y la fatiga",
  "Por la terrible estepa castellana,",
  "al destierro, con doce de los suyos",
  "-polvo, sudor y hierro- el Cid cabalga.",
  "Cerrado está el mesón a piedra y lodo.",
  "Nadie responde... Al pomo de la espada",
  "y al cuento de las picas el postigo",
  "va a ceder ¡Quema el sol, el aire abrasa!"};
 
 private static final String rutaDestino = new String ("pruebas/poemamapfile");

 public static void main(String[] args) throws IOException {
  Configuration conf = new Configuration();
  FileSystem fs = FileSystem.get( conf);
  //Path path = new Path(rutaDestino);

  IntWritable key = new IntWritable();
  Text value = new Text();
  
  MapFile.Writer writer = new MapFile.Writer(conf, fs, 
   rutaDestino, key.getClass(), value.getClass());

  //Indico un intervalo de índice diferente a 128 (por defecto)
  writer.setIndexInterval(5);
   
  //No usaré posiciones consecutivas.
  int pos = 1;
  for (int i = 0; i < POEMA.length; i++) { 
   // La key es el número de línea
   key.set(pos); 
   // El value es la línea del poema correspondiente
   value.set(POEMA[i]); 
   // Escribimos el par en el sequenceFile 
   
   writer.append(key, value);
   pos += 3;
  }
  writer.close();
 }

}


Leer un MapFile


public class ReadMapFile {

 private static final String rutaDestino = new String ("pruebas/poemamapfile");
 
 public static void main(String[] args) throws IOException, InstantiationException, IllegalAccessException {
  Configuration conf = new Configuration();
  FileSystem fs = FileSystem.get(conf);

  MapFile.Reader reader = new MapFile.Reader(fs, rutaDestino, conf);
  
  IntWritable key = (IntWritable) reader.getKeyClass().newInstance();
  Text value = (Text) reader.getValueClass().newInstance();
  
  StringBuilder strBuilder;
  while(reader.next(key, value)){
   strBuilder = new StringBuilder(key.toString()).append(" - ").append(value.toString());
   System.out.println(strBuilder);
  }

  // Posiciono el reader en una key dada y conocida, si no existe la key daría un error
  value = (Text)reader.get(new IntWritable(7), new Text());
  strBuilder = new StringBuilder(" Probando get() ").append(value.toString());
  System.out.println(strBuilder);
  
  // Busco una key a partir de un valor, que si no existe, me daría el valor
  // siguiente más próximo y me posiciona el reader en ese lugar.
  key = (IntWritable) reader.getClosest(new IntWritable(11), new Text());
  strBuilder = new StringBuilder(" Probando getClosest() ").append(key.toString());
  System.out.println(strBuilder);

  
  reader.close();
 }

}


En la creación las key deben estar ordenadas, si intentamos crear un mapfile con un orden aleatorio, dará error de tipo:
Exception in thread "main" java.io.IOException: key out of order: n after m


Al igual que en los Sequence Files el MapFile.Reader nos da opciones para posicionarnos en el reader.
A través del método get(), dada una key conocida, aunque si esa key no existe al intentar acceder al Objeto devuelto nos dará error.
Y a través del método getClosest(), nos posicionaremos en el valor más próximo después de la key introducida.

sábado, 6 de abril de 2013

Output Formats

Los Output Formats son muy parecidos a los tipos vistos en la entrada anterior de los Input Formats.
Pero esta vez la interfaz OutputFormat va a determinar cómo será la salida del Job que vamos a ejecutar.
Para establecer el output format se configura en el Driver a través de:

job.setOutputFormatClass(Tipo.class);

La clase base de salida en Hadoop es FileOutputFormat (que hereda de OutputFormat), y a partir de aquí existen diferentes tipos para poder implementar esa salida, estos son algunos:
  • TextOutputFormat
  • SequenceFileOutputFormat
    • SequenceFileAsBinaryOutputFormat
  • MultipleOutputFormat
El tipo por defecto de salida es el TextOutputFormat, que escribe cada registro (un par key/value) en líneas de texto separadas, y los tipos de los pares key/value pueden ser de cualquier tipo, siempre y cuando implementen el método toString().

También podría ser posible eliminar la key o el value de la salida a través del tipo NullWritable o los dos, que sería mejor definir la salida del Job con el tipo NullOutputFormat.
Si queremos que la salida sea nula, en el Driver definiríamos:
job.setOutputFormatClass(NullOutputFormat.class);

Si sólo queremos eliminar la key o el value se haría con:
job.setOutputKeyClass(NullWritable.class);


o con:
job.setOutputValueClass(NullWritable.class);



Como en los Input Formats, el output también dispone de salidas binarias como el SequenceFileOutputFormat, que como indica, escribe ficheros de tipo Sequence Files (ficheros binarios en los que se escriben pares key/value) y su subclase SequenceFileAsBinaryOutputFormat, que escribe sequence files en los que las key y values están codificados en binario.

En los tipos FileOutputFormat, por defecto se escribe un fichero por cada reducer, que normalmente está predeterminado a uno, pero se puede cambiar ese número de reducers. El nombre de cada fichero es de la forma part-r-00000, part-r-00001..., siendo la última parte el número de reducer. Así que una de las formas de dividir las salidas puede ser usando varios reducers, pero esta no es la solución que nos interesa ver aquí.
La solución sería utilizando la clase MultipleOutputs, se crearía un objeto de este tipo en el reducer y en vez de llamar al write del Context, se haría al write del MultipleOutputs.
También la ventaja de este tipo de Output es que puedes definir el nombre que deseas darle al fichero name-r-00000

Ejemplo de MultipleOutputs:

A partir de nuestro fichero score.txt queremos un programa que separe a los jugadores y sus puntuaciones por fecha agrupados en ficheros separados.
Recordamos que el fichero es de este tipo
 
01-11-2012 Pepe Perez Gonzalez 21
01-11-2012 Ana Lopez Fernandez 14
15-11-2012 John Smith 13
01-12-2012 Pepe Perez Gonzalez 25
...


El Driver es como lo configuramos normalmente, no tiene ninguna configuración especial para hacer el MultipleOutput:
 
public class TestMultipleOutputDriver {
 public static void main(String[] args) throws Exception {
  
  Configuration conf = new Configuration();
  Job job = new Job(conf);
  job.setJarByClass(TestMultipleOutputDriver.class);
  
  job.setJobName("Word Count");
  
  job.setMapperClass(TestMultipleOutputMapper.class);
  job.setReducerClass(TestMultipleOutputReducer.class);
  
  job.setInputFormatClass(KeyValueTextInputFormat.class);

  FileInputFormat.setInputPaths(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);

  boolean success = job.waitForCompletion(true);
  System.exit(success ? 0:1); 
 }
}



En el Mapper lo único que hacemos es emitir el par key/value que recibimos, ya que no estamos haciendo ningún tratamiento de los datos como tal para este ejemplo.
 
public class TestMultipleOutputMapper extends Mapper<Text, Text, Text, Text> {
 public void map(Text key, Text values, Context context) 
   throws IOException, InterruptedException{
  
  context.write(key, values);
 }
}


En el Reducer creamos un objeto de tipo MultipleOutputs que vamos a inicializar en el método setup y es el que vamos a utilizar para escribir la salida.
name será el prefijo del fichero.
 
public class TestMultipleOutputReducer extends Reducer<Text, Text, Text, Text> {

 private MultipleOutputs<Text, Text> multipleOut;
 
 @Override
 protected void cleanup(Context context) throws IOException,
   InterruptedException {
  multipleOut.close();
 }
 @Override
 protected void setup(Context context) throws IOException,
   InterruptedException {
  multipleOut = new MultipleOutputs<Text, Text>(context);
 }
 @Override
 public void reduce(Text key, Iterable<Text> values, 
           Context context) 
     throws IOException, InterruptedException {
  String name = key.toString().replace("-", "");
  for(Text value:values){
   multipleOut.write( key, value, name);
  //Podría añadir más salidas según mis necesidades
  // a través de cláusulas if, o porque un par key/value
  // traiga diversas informaciones que quiero subdividir
  // en diferentes ficheros
  // if(caso1) multipleOut.write( key, value, name2);
  // multipleOut.write( key, value, name3);
  }
 }
}


En este ejemplo con el MultipleOutputs te obliga que aunque quieras que las salidas sean en distintos ficheros, los pares key/value que emites sean todos del mismo tipo del que has definido la clase MultipleOutputs<Text, Text>, es decir, la key debe ser de tipo Text, y el valor también debe ser de tipo Text.
También es posible emitir múltiples salidas en ficheros diferentes y que cada salida sea con tipos distintos para cada fichero.

En el siguiente ejemplo recibo como entrada un fichero con un listado de papers, en los que cada línea contiene la publicación del paper, los autores y el título del paper de la forma:

paper-id:::author1::author2::...::authorN:::title

journals/cl/SantoNR90:::Michele Di Santo::Libero Nigro::Wilma Russo:::Programmer-Defined Control Abstractions in Modula-2.



Quiero 3 ficheros en la salida de este algoritmo:
- Un fichero paper que contenga: String paper-id, String Title
- Un fichero autor que contenga: Int autor-id (se crea en el algoritmo), String nombre autor
- Un fichero paper/autor que los relacione: String paper-id, Int autor-id

Como vemos necesitamos que una salida sea <Text, Text>, otra salida sea <IntWritable, Text> y la última salida sea <Text, IntWritable>.
Esto se haría añadiendo en el Driver las siguientes líneas, donde se asigna un ID a la salida y de qué tipos son esas salidas:
(Podréis encontrar el código fuente completo en este enlace)
 
MultipleOutputs.addNamedOutput(job, "Autor",  TextOutputFormat.class, 
  IntWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, "Paper",  TextOutputFormat.class, 
  Text.class, Text.class);
MultipleOutputs.addNamedOutput(job, "PaperAutor",  TextOutputFormat.class, 
  Text.class, IntWritable.class);


Posteriormente, en el Reducer se haría de la forma:

 
for (PaperWritable value : values) {
  
   //Output tabla Autor
   multipleOut.write("Autor", new IntWritable(contador), key);
   
   //Output tabla Paper
   multipleOut.write("Paper", value.getIdPaper(), 
    value.getTituloPaper());
   
   //Output tabla paper/autor
   multipleOut.write("PaperAutor", value.getIdPaper(), 
    new IntWritable(contador));
   
   contador ++;
  }



martes, 2 de abril de 2013

Input Formats

Los InputFormat son los formatos que definen los tipos de datos de entrada que recibe las función map en un programa MapReduce.

La clase base en la que están basados los InputFormat es FileInputFormat, que provee la definición de qué ficheros se incluyen como input en un Job y una implementación para dividir en partes los ficheros de entrada.

Los InputFormat pueden clasificar según el tipo de datos que van a recibir en:
  • Texto
  • Binarios
  • Múltiples
  • Databases

(Podréis encontrar los ejemplos con el código completo en la pestaña de Código Fuente)

InputFormat de tipo texto:

Hadoop se destaca por su capacidad de procesar ficheros de texto no estructurado y dispone de varios tipos según cómo están constituídos los ficheros de datos.

TextInputFormat

Es el formato por defecto de MapReduce, si no se indica nada en el Driver a la hora de programar, será el tipo que considera.

Cada registro es una línea de la entrada, la key será de tipo LongWritable indicando el offset de la línea (el offset es el número de bytes, no el número de la línea) y el value será el contenido de la línea, una cadena de tipo Text.
En este tipo de ficheros la key no suele tener ninguna utilidad a la hora de desarrollar los algoritmos.

Como ejemplo de este formato podemos ver el ejercicio WordCount publicado anteriormente, que como digo, en el Driver no se indica el formato porque toma el TextInputFormat por defecto.

KeyValueTextInputFormat

Muchas veces la línea de texto que recibimos a la entrada suele contener el par key/value que nos servirá para el algoritmo separados por un separador, normalmente una tabulación. Así que, el recibir como key el offset no nos es de ninguna utilidad.

Este tipo nos va a ayudar a recibir como key la primera parte y como value el resto de la línea después de la tabulación.

Como ejemplo tomamos de nuevo el ejercicio WordCount ya publicado. El programa va a recibir un fichero con este formato
 
01-11-2012 Pepe Perez Gonzalez 21
01-11-2012 Ana Lopez Fernandez 14

En el que la fecha y el nombre están separados por una tabulación. En principio vamos a hacer algo simple, que es contar cuántas veces aparece cada fecha.

Esta vez en el Driver añadimos esta línea:
 
job.setInputFormatClass(KeyValueTextInputFormat.class);

La clase Mapper será:
 
public class TestKeyValueMapper 
     extends Mapper<Text, Text, Text, IntWritable> {
 private final static IntWritable cuenta = new IntWritable(1);
 public void map(Text key, Text values, Context context) 
   throws IOException, InterruptedException{
   context.write(key, cuenta);
 }
}

La clase Reducer del WordCount se puede quedar como está.


NLineInputFormat

En todos los casos anteriores y como ya hemos visto, el tamaño de un InputSplit corresponde al tamaño de un bloque HDFS y por lo tanto puede contenter un número indefinido de líneas de entrada.

El formato NLineInputFormat permite definir InputSplits con un número determinado de líneas de entrada (si no se indica nada, por defecto está a 1).

Con este formato podemos definir que en la división se envíe más de un par key/value y que nuestro Mapper reciba N pares key/value.
Los grupos de líneas que recibe el Mapper tendrán un formato par key/value de la forma TextInputFormat, es decir, con el offset como key y el resto de la línea como value.

Hay que tener cuidado, no quiere decir que se reciban 2 líneas en la función map, si no que la misma función map se va a ejecutar N veces recibiendo ese número de registros par key/value.


En el Driver añadimos esta línea:
 
job.setInputFormatClass(NLineInputFormat.class);
//Definimos el número de pares key/value
NLineInputFormat.setNumLinesPerSplit(job, 3);

La clase Mapper, como ya he comentado, estará declarado con el formato "estándar" (TextInputFormat), pero podemos operar en la función de tal forma que sabemos que va a ser llamada N veces, podemos concatenar textos, hacer operaciones, etc con los N pares que va a recibir.

Así que si, por ejemplo, hemos puesto nuestro número de líneas a 2 y tenemos esta entrada:
 
01-11-2012 Pepe Perez Gonzalez 21 
01-11-2012 Ana Lopez Fernandez 14
01-11-2012 Maria Garcia Martinez 11 
01-11-2012 Pablo Sanchez Rodriguez 9

Podemos hacer que la salida sea la concatenación de las entradas quedando:

01-11-2012 Pepe Perez Gonzalez 21 + 01-11-2012 Ana Lopez Fernandez 14
01-11-2012 Maria Garcia Martinez 11 + 01-11-2012 Pablo Sanchez Rodriguez 9


InputFormat de tipo binario:

Aunque MapReduce se destaca por su tratamiento de textos no estructurados, no es exclusivo a este tipo de ficheros, también es capaz de tratar ficheros binarios.

SequenceFileInputFormat

Los sequence files de Hadoop almacenan secuencias de datos binarios en forma de pares key/value. Son splittable (con sus puntos de sincronización sync), soportan compresión y se pueden almacenar múltiples tipos de datos.

Este formato de entrada, las key y value están determinados por el mismo sequence file y al desarrollar el programa hay que asegurarse que escogemos los tipos que corresponden.

Para probar un ejemplo sería conveniente haber leído el artículo de Secuences Files y haber hecho el ejercicio Crear un SequenceFile y así disponer de un fichero de este tipo en nuestro HDFS.
Así que deberíamos disponer de un fichero en HDFS pruebas/poemasequencefile en el que la key es un número (número de línea) y el value es una línea de texto (los versos del poema).

En el Driver se declara el input format:
 
job.setInputFormatClass(SequenceFileInputFormat.class);

Y las declaraciones del Mapper será con los tipos del key/value que sabemos que tiene el fichero (entero y texto) y lo único que haremos será emitir estos pares key/value:
 
public class TestSeqFileMapper 
      extends Mapper<IntWritable, Text, IntWritable, Text> {
 public void map(IntWritable key, Text values, Context context) 
   throws IOException, InterruptedException{
   context.write(key, values);
 }
}

En este ejemplo no he considerado la tarea reducer poniéndola a 0.

SequenceFileAsTextInputFormat

Es una variante del anterior que convierte los key/value en objetos de tipo Text.

Estableciendo el tipo en el Driver:
 
job.setInputFormatClass(SequenceFileAsTextInputFormat.class);

Y simplemente, el Mapper se declara poniendo los objetos de tipo Text:
 
public class TestSeqFileAsTextMapper 
      extends Mapper<Text, Text, Text, Text> {
 public void map(Text key, Text values, Context context) 
   throws IOException, InterruptedException{
   context.write(key, values);
 }
}


SequenceFileAsBinaryInputFormat

Otra variante más del SequenceFileInputFormat que recupera las key y los value en forma de objetos binarios (BytesWritable).

Y en el Driver lo configuramos:
 
job.setInputFormatClass(SequenceFileAsBinaryInputFormat.class);

Y esta vez el Mapper se declara poniendo los objetos de tipo BytesWritable:
public class TestSeqFileAsBinaryMapper 
      extends Mapper<BytesWritable, BytesWritable, Text, Text> {

 public void map(BytesWritable key,BytesWritable values,Context context)
   throws IOException, InterruptedException{
  ...
 }
}



InputFormat Múltiple:


MultipleInputFormat

Este tipo de Inputs sirven para cuando necesitas que haya diferentes fuentes de datos, e incluso que cada una de estos Input sea de tipo distinto.
Como es lógico, si cada entrada es de un formato diferente, va a necesitar una tarea Map distinta para cada uno de ellos, así que al declarar el formato MultipleInputFormat vas a poder definir para cada fichero de entrada, de qué tipo es y a qué Mapper se debe dirigir.

MultipleInputFormat puede ser muy útil cuando necesitas unificar información cuyo origen y formato es diferente.

Para utilizar este formato, en el Driver sobrarían las líneas:
 
job.setMapperClass(MiMapper.class);
FileInputFormat.setInputPaths(job, new Path("path"));

y añadiríamos las líneas:

 
MultipleInputs.addInputPath(job, new Path("path1_2"), 
      KeyValueTextInputFormat.class, TestKeyValueMapper.class);
MultipleInputs.addInputPath(job, new Path("path_2"), 
      SequenceFileInputFormat.class, TestSeqFileMapper.class);



viernes, 22 de marzo de 2013

Sequence Files

Los SequenceFiles son unos tipos de ficheros de datos propios de Hadoop almacenados en forma de pares key/value y codificados de forma binaria.
Proporcionan una estructura de datos persistente para pares key/value de forma binaria.

En el mismo fichero se almacenan los metadatos que contienen la información de ese fichero (tipo de los datos, nombre, fecha,...).

Este tipo de ficheros se usa muy a menudo en Jobs MapReduce, sobre todo cuando la salida de un Job es la entrada de otro.
También, podemos imaginarlos como ficheros de log donde cada registro es una nueva línea.

Son un tipo de fichero muy adecuados para MapReduce porque además son splittable (que se pueden fragmentar).
Pueden almacenar distintos tipos de datos gracias a que usan una gran variedad de frameworks de serialización.

Soportan compresión, que puede ser de 3 tipos:
  • Uncompressed (No comprimido)
  • Record Compressed (Compresión a nivel de cada par key/value)
  • Block Compressed (Se comprime por bloques)
Sea cual sea el tipo de compresión que utilice, la estructura del encabezado (header) va a ser el mismo, sólo que éste contendrá la información necesaria para su posterior lectura.

Problemas: 
  • Sólo se puede acceder a ellos a través de la AP Java de Hadoop.
  • Si la definición de la key o value cambian, el fichero no se podrá leer.

Para entender de forma visual cómo es la estructura de un SequenceFile, empezamos viendo cómo sería el Header:

Estructura del Header

Esta sería la estructura de un SequenceFile Uncompressed o Record Compressed:

Estructura de un SequenceFile

Así sería la estructura de un registro cuando no está comprimido (Uncompressed):

Registro no comprimido

Y así sería cuando está comprimido por registro (Record Compressed):

Estructura de la compresión por registro


Aunque es la misma estructura de SequenceFile, no pueden existir formatos de compresión distintos dentro de un mismo fichero.

A continuación es el formato de un SequenceFile Block Compressed:

Estructura de un SequenceFile comprimido por bloques


Y por último el formato del bloque:

Estructura del bloque comprimido




Una propiedad de los SequenceFiles es que en la creación introducen puntos de sincronización (sync points).
Estos puntos se pueden utilizar cuando el reader se pierde, por ejemplo, si nos hemos desplazado a buscar en una posición cualquiera en nuestro flujo de datos. Aún más importante, estos puntos de sincronización sirven para definir los InputSplit de los Jobs MapReduce.
Estos sync points se crean automáticamente introduciéndolos cada cierto número de registros cuando realizamos el SequenceFile.Writer.
Se pueden localizar durante la lectura del SequenceFile con el SecuenceFile.Reader a través de:

      reader.syncSeen();

Otra opción que nos dan los SequenceFiles es, como hemos dicho antes, buscar una posición dada en este tipo de ficheros.
Si sabemos la posición exacta a la que queremos acceder, utilizaríamos el método:

      reader.seek(posicion);
      reader.next(key, value);


Si ponemos una posición que no existe daría un error IOException.


Si no conocemos la posición exacta, podemos utilizar:

      reader.sync (posicion)

El reader se posicionará en el siguiente sync point que encuentre después de posicion.
Puede ser el caso que el fichero sea muy pequeño y no exista ningún sync point, lo que sucederá entonces es que el reader se posicionará al final del fichero.