Recuerda que puedes descargarte algunos de los ejemplos en la pestaña de Código Fuente

miércoles, 8 de mayo de 2013

Uso del Combiner para optimización de Jobs MapReduce

El Combiner es una función propia de Hadoop utilizada para optimizar los job MapReduce.

Cuando en un Job la salida del Mapper genera una gran cantidad de datos intermedios, éstos se tienen que transmitir por la red hacia los Reducer. Si la cantidad de datos es excesivamente grande aquí se puede producir un cuello de botella.

Las técnicas utilizadas para reducir la cantidad de datos y mejorar la eficiencia de los job MapReduce, se llama Local Aggregation.
Existen dos técnicas, el Combiner que vamos a ver en esta entrada y el In-Mapper Combining que veremos en la siguiente entrada.

Una buena solución para estos casos es la de implementar un Combiner, que se ejecuta a la salida de la fase Map y de forma local a este antes de enviar los datos a través de la red.

El Combiner implementa la misma interfaz que el Reducer e incluso muchas veces suele ser la misma clase que el Reducer.
Sin embargo hay que tener cuidado que la operación que se realiza en el Combiner sea asociativa y conmutativa. Por ejemplo la operación suma (es decir, el ejemplo del Wordcount) cumple con estos dos requisitos, pero por ejemplo la operación de cálculo de la media aritmética no es asociativa.

Para configurarlo se hace en el Driver a través de:
  job.setCombinerClass (MyCombiner.class);

¿Se puede implementar un Combiner distinto al Reducer?
Sí, pueden ser distintos. Hay que tener en cuenta que el Combiner va a seguir implementando la interfaz Reducer. Y también hay que tener cuidado de no poner dentro algún tipo de código "sensible" ya que el Combiner se puede ejecutar cero, una o más veces a la salida desde cualquier Mapper.

Si observamos los logs generados tras la ejecución de un Job del WordCount cuyo código he dejado publicado en este enlace.

Sin Combiner:

Map input records=1928
Map output records=187029
Combine input records=0
Combine output records=0
Reduce input records=187029
Reduce output records=22948


Con Combiner:

Map input records=1928
Map output records=187029
Combine input records=219738
Combine output records=55657
Reduce input records=22948
Reduce output records=22948   



Como he comentado anteriormente el Combiner se ejecuta a la salida del Mapper, antes de que los datos se transmitan por la red hacia el Reducer, la conclusión es que la cantidad de datos transmitida y por tanto el input del Reducer, es considerablemente menor si utilizamos ese Combiner.

Algo a tener en cuenta en el código publicado, es que he hecho un Combiner del WordCount, el código que contiene la clase WordCountCombiner.java es exactamente igual que el código de la clase WordCountReducer.java, en situaciones como esta no haría falta crear esa nueva clase, bastaría con definir en el Driver:

  job.setCombinerClass (WordCountReducer.class);

Así que creé una clase por separado para mostrar que es posible hacer el Combiner en una clase distinta (extendiendo de Reducer), recordando que no debe haber "código sensible" en ella ya que el Combiner puede llegar a ejecutarse varias veces o puede llegar a no ejecutarse.

viernes, 3 de mayo de 2013

ToolRunner

Cuando desarrollamos una aplicación Hadoop, quizás necesitemos en algún momento utilizar opciones de la línea de comandos Hadoop, para lo cual se utiliza la clase ToolRunner.

GenericOptionsParser es quien interpreta esas opciones de línea de comandos de Hadoop.
Normalmente no se usa esta clase, sino que se usa ToolRunner que la utiliza internamente y se suele usar en el Driver de un programa MapReduce

Así que si en algún momento queremos pasar opciones de configuración a través de la línea de comandos, en nuestro programa deberemos utilizar ToolRunner e invocando al método ToolRunner.run().

Para pasar opciones de configuración a través de línea de comando se usaría a través de la opción:
-D propiedad=valor

Supongamos que lanzamos la ejecución de un programa MapReduce a través de la línea de comandos de esta forma:

hadoop jar WordCount.jar WordCountDriver -D mapreduce.job.reduces=5 -D mapreduce.job.maps=10 directorioInput directorioOutput


Si no usáramos el ToolRunner tendríamos que controlar las propiedades introducidas haciendo algo parecido a esto:

public static void main(String[] args) throws Exception {

  Configuration conf = new Configuration();
  ...
  conf.set("prop1",args[0]);  
  conf.set("prop2", args[1]);
  ...
  Job job = new Job(conf);
  ...
  FileInputFormat.setInputPaths(job, new Path(args[2]));
  FileOutputFormat.setOutputPath(job, new Path(args[3]));
  ...
}


Mientras que si utilizamos el ToolRunner, la función run se encarga de parsear por si sólo los argumentos de configuración que hayamos introducido, siempre y cuando hayamos puesto -D delante de la propiedad.
Para desarrollar a través del ToolRunner la clase debe extender de Configured e implementar Tools. Resultando de la forma:


public class WordCountDriver extends Configured implements Tool{
 public int run(String[] args) throws Exception {
  Configuration conf = new Configuration();
  Job job = new Job(conf);
  ...
  FileInputFormat.setInputPaths(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
   ...
 }

 public static void main(String[] args) throws Exception {
       int exitCode = ToolRunner.run(new Configuration(), 
            new WordCountDriver(), args);
       System.exit(exitCode);
 }
}


Si os preguntáis cuál es la diferencia y las ventajas entre usar ToolRunner o llamar directamente a la función main con todas las configuraciones, realmente el funcionamiento es el mismo, pero utilizando el ToolRunner es mucho más limpio y no usarlo puede llevarnos a errores como asignar las propiedades después de haber instanciado el Job (que esa propiedad sería nula).

public static void main(String[] args) throws Exception {

  Configuration conf = new Configuration();
  Job job = new Job(conf);
  ...
  conf.set("prop1",args[0]);  
  ...
  // Aquí "prop1 sería nulo en el job
  job.getConfiguration().get("prop1"); 
}


Además de la opción -D que nos permite pasar parámetros a la ejecución, hay que saber que existen otras opciones de línea de comandos que nos permiten:
-files fich1, fich2, ... : Que copia los ficheros indicados del sistema de ficheros local al sistema de ficheros HDFS para utilizarlos en el programa MapReduce.
-libjars jar1, jar2, ... : Que copia las librerías jar especificadas del sistema de ficheros local a HDFS y lo añade al Classpath de la tarea MapReduce (muy útil para añadir dependencias Jar de los Job).
-archives archive1, archive2, ... : Que copia los archivos del sistema de ficheros local a HDFS y poniéndolos a disposición de los programas MapReduce.




domingo, 28 de abril de 2013

Speculative Execution

La meta del modelo MapReduce es dividir trabajos en tareas más pequeñas y ejecutarlas en paralelo para que el tiempo de ejecución total del trabajo sea menor que si se ejecutaran de forma secuencial.

Muchas veces, para grandes cantidades de datos, el número de divisiones es mayor que el número de nodos en el cluster, así que aunque las tareas se van ejecutan de forma paralela, otras tareas tienen que esperar a que las primeras finalicen para poder continuar.

Si por alguna causa una de las tareas se está ejecutando más lentamente de lo normal se podrían producir cuellos de botella y el tiempo dedicado para la ejecución total podría aumentar de forma considerable con respecto a si todo el clúster estuviera funcionando de manera correcta.

Hadoop no se va a encargar de buscar la causa de qué es lo que está retardando una tarea, sino que va a buscar soluciones para mitigar esto, y la Speculative Execution es la técnica que utiliza.

En el momento que JobTracker detecta que una tarea es más lenta de lo normal, va a lanzar una tarea especulativa (speculative task), pero sólo en el momento en el que el resto de las tareas del trabajo han finalizado, que la tarea haya estado funcionando al menos un minuto o que el tiempo medio de ejecución de esa tarea sea más alto que el del resto de tareas. Esto es, va a lanzar un duplicado de la tarea que está funcionando lentamente y las tareas se van a estar ejecutando al mismo tiempo.

Cuando una de las tareas finaliza su ejecución, el JobTracker se encargará de desechar el otro proceso (hace un kill de la tarea).

Si el defecto en la tarea se trata de un bug en el programa, éste va a suceder también en la tarea especulativa, así que tendría que ser el desarrollador quien corrigiera ese bug.

La Speculative Execution está activada por defecto, pero está permitido desactivarla en los ficheros de configuración.
En general es conveniente desactivar la speculative execution para las tareas Reduce porque, por un lado, siempre que se duplica una tarea Reduce ésta tiene que recuperar los datos intermedios de la red, lo que incrementaría el tráfico en la red.
Y por otro lado porque si la distribución de las key que reciben los Reducer no es equilibrada, es normal que una tarea Reduce tarde más que otra, por ejemplo, si pensamos en el WordCount, en inglés, la palabra "the" va a tener un array de valores mucho mayor que otras key, por lo que será normal que esta tarea Reduce tarde más.

Por el contrario, como regla general, se recomienda activar la Speculative Execution para las tareas Map. Una excepción podría ser para tareas que no son idempotentes