En Hadoop esto no es correcto, y a estas alturas deberíamos tener claro que este tipo de aplicaciones se ejecuta de forma distribuída, que cada tarea se ejecuta en una JVM diferente e incluso en distintos nodos del clúster.
Voy a dejaros dos ejemplos, uno con la forma correcta y otro con la forma incorrecta por si queréis ver qué pasa, ya que forma la incorrecta no es que dé un error, el Job va a ejecutarse sin problemas, pero el parámetro no se pasará.
También hay que tener cuidado, ya que si se ejecuta en modo local, el paso de parámetros SÍ que funcionará. Sin embargo, no funcionará en el momento que llevemos nuestro programa al modo distribuído o pseudo-distribuído.
El ejemplo es muy simple y ni si quiera vamos a usar la función Reducer. A partir del fichero score.txt que ya he usado para otros ejemplos de la forma:
01-11-2012 Pepe Perez Gonzalez 21 01-11-2012 Ana Lopez Fernandez 14 15-02-2013 Angel Martin Hernandez 3 01-11-2012 Maria Garcia Martinez 11 01-11-2012 Pablo Sanchez Rodriguez 9 01-11-2012 Angel Martin Hernandez 3 15-01-2013 Pepe Perez Gonzalez 17 15-01-2013 Maria Garcia Martinez 3 ...
Sólo queremos como salida la lista de personas con una puntuación mayor de 25, y ese valor lo queremos pasar desde el Driver.
Forma Incorrecta
public class PersonaScoreDriver { private static int score; public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); //Indicamos el parámetro a pasar score = 25; Job job = new Job(conf); job.setJarByClass(PersonaScoreDriver.class); job.setJobName("Persona Score"); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setInputFormatClass(KeyValueTextInputFormat.class); job.setMapperClass(MyMapper.class); job.setNumReduceTasks(0); boolean success = job.waitForCompletion(true); System.exit(success ? 0:1); } private static class MyMapper extends Mapper{ @Override public void map(Text key, Text value, Context context) throws IOException, InterruptedException { int s = score; String[] personaSplit = value.toString().split(" "); if(personaSplit.length == 4){ int mScore = Integer.valueOf(personaSplit[3]); if(mScore >= s) context.write(key, value); } } } }
Forma Correcta
public class PersonaScoreDriver { private static class MyMapper extends Mapper{ private int score; @Override protected void setup(Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); //Indicas cuál es el id del parámetro a recoger y el valor // por defecto. score = conf.getInt("score", 0); } @Override public void map(Text key, Text value, Context context) throws IOException, InterruptedException { int s = score; String[] personaSplit = value.toString().split(" "); if(personaSplit.length == 4){ int mScore = Integer.valueOf(personaSplit[3]); if(mScore >= s) context.write(key, value); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); //Indicamos el parámetro a pasar conf.setInt("score", 25); Job job = new Job(conf); job.setJarByClass(PersonaScoreDriver.class); job.setJobName("Persona Score"); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setInputFormatClass(KeyValueTextInputFormat.class); job.setMapperClass(MyMapper.class); job.setNumReduceTasks(0); boolean success = job.waitForCompletion(true); System.exit(success ? 0:1); } }
Hay que tener en cuenta que esta forma de pasar información entre el Driver y las tareas (map o reduce) es muy útil siempre cuando esta información no ocupa demasiada memoria ya que esta información se copia en varios sitios: en la memoria del Driver, en la memoria del TaskTracker y en la memoria de la JVM que ejecuta la tarea.
Por lo tanto, si se trata de una información grande (por ejemplo, varios TB), conviene usar otro método (por ejemplo: almacenar esta información en un fichero HDFS y acceder a esta información vía el método setup(), o bien usar Distributed Cache).
Recordad que este código también lo podréis descargar desde este enlace.