Atención si vais a hacer las pruebas en local ya que no funciona, el modo local sólo tiene un Reducer y por eso es mejor usar por lo menos el modo pseudo distribuido.
En este ejemplo, a partir del fichero scores.txt con la forma:
01-11-2012 Pepe Perez Gonzalez 21 01-11-2012 Ana Lopez Fernandez 14 15-02-2013 Angel Martin Hernandez 3 01-11-2012 Maria Garcia Martinez 11 01-11-2012 Pablo Sanchez Rodriguez 9 01-11-2012 Angel Martin Hernandez 3 15-01-2013 Pepe Perez Gonzalez 17 15-01-2013 Maria Garcia Martinez 3 ...
Queremos dividir los datos por año y enviar a cada Reducer las personas que han jugado en ese año. A través de un Partitioner vamos a indicar a qué Reducer va cada registro.
El Driver es parecido a todo lo que hemos visto hasta ahora, lo único que hemos definido que el InputFormat será un KeyValueTextInputFormat (ya que la fecha está separado por una tabulación del resto de la línea, así que este formato reconocerá la entrada).
Y luego añadimos en las configuraciones nuestra clase Partitioner y el número de tareas Reduce que queremos (nuestro fichero sólo tiene datos de 2 años (2012 y 2013), entonces serán 2 tareas Reducer).
public class PersonaScoreDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf); job.setJarByClass(PersonaScoreDriver.class); job.setJobName("Persona Score"); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setInputFormatClass(KeyValueTextInputFormat.class); //Establecemos el número de tareas Reduce job.setNumReduceTasks(2); job.setMapperClass(PersonaScoreMapper.class); job.setReducerClass(PersonaScoreReducer.class); //Indicamos cuál es nuestro partitioner job.setPartitionerClass(PersonaScorePartitioner.class); boolean success = job.waitForCompletion(true); System.exit(success ? 0:1); } }
En el Mapper lo único que hacemos es sacar el nombre y apellidos del value y emite un par key/value enviando como key la fecha y como value la persona.
public class PersonaScoreMapper extends Mapper<Text, Text, Text, Text> { Text persona = new Text(); @Override public void map(Text key, Text values, Context context) throws IOException, InterruptedException { String[] personaSplit = values.toString().split(" "); StringBuilder persBuilder = new StringBuilder(); // Puede haber personas con un apellido o con dos if(personaSplit.length == 3 || personaSplit.length == 4){ if(personaSplit.length == 3){ persBuilder.append(personaSplit[0]).append(" ") .append(personaSplit[1]); }else { persBuilder.append(personaSplit[0]).append(" ") .append(personaSplit[1]).append(" ") .append(personaSplit[2]); } persona.set(persBuilder.toString()); context.write(key, persona); } } }
La clase Reducer lo único que hace es recoger la key con su lista de values correspondientes, recorrer esa lista y emitir cada par key/value con la fecha y el nombre. Al haber realizado el Partiiioner, un mismo reducer procesará las keys de un mismo año.
public class PersonaScoreReducer extends Reducer<Text, Text, Text, Text> { @Override public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text value : values) { context.write(key, value); } } }
Por último el Partitioner, que lo que hace es devolver un entero indicando cuál es el Reducer al que irán los datos intermedios generados por el Mapper.
public class PersonaScorePartitioner extends Partitioner<Text, Text> { @Override public int getPartition(Text key, Text value, int numPartitions) { if(key.toString().endsWith("2012")){ return 0; }else{ return 1; } } }
Una vez visto el desarrollo y cómo quedaría el código sólo quedaría exportar las clases como jar (tal y como vimos en esta entrada) al directorio que tengamos preparado para la ejecución de Jobs en modo pseudo-distribuído y lo lanzaríamos (previamente habiendo puesto en HDFS el fichero scorePartMezcla).
También os recuerdo que las clases las podréis encontrar en la sección de Código Fuente
hadoop jar training/jars/EjemploPartitioner.jar PersonaScoreDriver pruebas/scorePartMezcla pruebas/resultados/ejemploPartitioner
Y este debería ser el resultado si listamos el contenido del directorio ejemploPartitioner:
elena:hadoop elena$ hadoop fs -ls pruebas/resultados/ejemploPartitioner
Found 4 items
-rw-r--r-- 1 elena supergroup 0 2013-04-02 19:34 /user/elena/pruebas/resultados/ejemploPartitioner/_SUCCESS
drwxr-xr-x - elena supergroup 0 2013-04-02 19:34 /user/elena/pruebas/resultados/ejemploPartitioner/_logs
-rw-r--r-- 1 elena supergroup 562 2013-04-02 19:34 /user/elena/pruebas/resultados/ejemploPartitioner/part-r-00000
-rw-r--r-- 1 elena supergroup 684 2013-04-02 19:34 /user/elena/pruebas/resultados/ejemploPartitioner/part-r-00001
Y en cada fichero quedaría el siguiente contenido.
En el part-r-00000 que correspondería al año 2012 y que le habíamos asignado el valor 0:
elena:hadoop elena$ hadoop-1.0.4/bin/hadoop fs -cat pruebas/resultados/ejemploPartitioner/part-r-00000
01-11-2012 Angel Martin Hernandez
01-11-2012 Maria Garcia Martinez
01-11-2012 Ana Lopez Fernandez
01-11-2012 Pablo Sanchez Rodriguez
01-11-2012 Pepe Perez Gonzalez
01-12-2012 Maria Garcia Martinez
01-12-2012 Pepe Perez Gonzalez
01-12-2012 Pablo Sanchez Rodriguez
01-12-2012 Ana Lopez Fernandez
15-11-2012 Pepe Perez Gonzalez
15-11-2012 Maria Garcia Martinez
15-11-2012 John Smith
15-11-2012 Cristina Ruiz Gomez
15-12-2012 John Smith
15-12-2012 Cristina Ruiz Gomez
15-12-2012 Maria Garcia Martinez
15-12-2012 Pepe Perez Gonzalez
15-12-2012 Angel Martin Hernandez
En el part-r-00001 que correspondería al año 2013 y que le habíamos asignado el valor 1:
elena:hadoop elena$ hadoop-1.0.4/bin/hadoop fs -cat pruebas/resultados/ejemploPartitioner/part-r-00001
01-01-2013 Ana Lopez Fernandez
01-01-2013 John Smith
01-01-2013 Pablo Sanchez Rodriguez
01-01-2013 Pepe Perez Gonzalez
01-01-2013 Maria Garcia Martinez
01-01-2013 Angel Martin Hernandez
01-02-2013 Ana Lopez Fernandez
01-02-2013 Cristina Ruiz Gomez
01-02-2013 Maria Garcia Martinez
01-02-2013 Pepe Perez Gonzalez
15-01-2013 Angel Martin Hernandez
15-01-2013 Maria Garcia Martinez
15-01-2013 Pepe Perez Gonzalez
15-01-2013 John Smith
15-01-2013 Pablo Sanchez Rodriguez
15-02-2013 Pepe Perez Gonzalez
15-02-2013 John Smith
15-02-2013 Pablo Sanchez Rodriguez
15-02-2013 Maria Garcia Martinez
15-02-2013 Ana Lopez Fernandez
15-02-2013 Cristina Ruiz Gomez
15-02-2013 Angel Martin Hernandez