Recuerda que puedes descargarte algunos de los ejemplos en la pestaña de Código Fuente
Mostrando entradas con la etiqueta Técnicas. Mostrar todas las entradas
Mostrando entradas con la etiqueta Técnicas. Mostrar todas las entradas

miércoles, 10 de julio de 2013

Técnicas de Debugging y Logging

Debugging

Depurar los programas MapReduce es muy complicado, ya que cada instancia de un Mapper se ejecuta en una tarea diferente y quizás en máquinas diferentes, así que adjuntar una técnica de debug en el proceso no es tarea fácil.

El problema es que con cantidades tan grandes de datos puede haber una entrada inesperada o errónea y es un evento que hay que tener en cuenta y controlar.
Porque piénsalo de esta forma, estamos realizando un proyecto de Big Data, eso quiere decir que estamos tratando tales cantidades de datos que los procesos pueden tardar horas o incluso días, imaginemos que a mitad de ese proceso que tarda días hay una entrada de un dato inesperado y el programa falla por no haber tomado ninguna medida.
La conclusión es que al desarrollador sólo le queda tomar una serie de estrategias para evitar este tipo errores:
  • Código defensivo: siempre suponer que un dato puede ser incorrecto o esperar que las cosas vayan mal. Habría que añadir control de excepciones o código que controle los posibles fallos.
  • Empieza un desarrollo pequeño y ve agrandándolo poco a poco
  • Escribe pruebas unitarias, utilizando, por ejemplo, MRunit.
  • Prueba primero en local con cantidades reducidas de datos, luego pasa a modo pseudodistribuído (asegurándote que el entorno es similar al del clúster) y finalmente prueba en todo el clúster.
  • Utiliza Counters, por ejemplo, para conocer el número de entradas inválidas.

Para hacer pruebas en local no se usa HDFS, se usa el sistema de ficheros local y sólo se ejecuta un proceso llamado LocalJobRunner.


Logging

Como logging lo más efectivo es usar librerías como el log4java, ya que éste se puede configurar en niveles y desactivarlo en situaciones de producción.

Como buenas prácticas nunca se debería usar println como estrategia de loggin cuando hacemos nuestras primeras pruebas, ya que posteriormente deberíamos borrarlo en todo el código. Si dejáramos estas líneas y pasáramos el código al clúster en modo producción, podría afectar considerablemente a los tiempos de ejecución, piensa que son procesos que necesitamos que sean lo más rápidos posibles y el realizar un println supone un consumo de tiempo ejecutando el comando y realizando la escritura.
En el clúster deben estar desactivadas todas las escrituras de logging innecesarias (no perjudica el rendimiento el hacer loggin en los métodos setup y cleanup).







sábado, 29 de junio de 2013

Counters

Los Counters se utilizan para que los mappers o los reducers pasen valores al Driver de forma que éstos se van sumando. Puede servir para analizar ciertos eventos que suceden durante la ejecución de jobs MapReduce, como registros erróneos, o contar el número de veces que aparece cierto tipo de datos.

Cuando todas las tareas Map o Reduce han finalizado se suman todos los valores del contador. Nunca tenemos que fiarnos de los valores intermedios que vayan generando las tareas si estamos mirando la evolución del job a través de la página de administración, ya que a causa de la speculative execution unos contadores se pueden sumar temporalmente de forma repetida.

El contador se utiliza añadiendo en el mapper o reducer la línea

context.getCounter(group, name).increment(cantidad);


El valor del contador se recupera en el driver a través de:

long tipoGrupo = job.getCounters().findCounter(group, name).getValue();

Además de recuperar los resultados en el driver, éstos también se pueden ver a través de la página web del jobtracker (en el puerto 50030) y a través del shell si se ha ejecutado el job map/reduce desde la línea de comando.


Vemos mejor el Counter en un ejemplo en el cual, a partir de un fichero scores.txt en el que tenemos información sobre jugadores, fechas del juego y sus puntuaciones, en el programa, además de contar el número de palabras que hay en el fichero, queremos saber cuántas veces aparecen algunos de los jugadores (Ana, Pepe, Maria y Pablo) y también, para estos jugadores, cuándo se ha hecho un registro incorrecto y no tienen asociada una puntuación (en el ejemplo siguiente Ana, María y Pepe aparecen y sumarían 1 cada uno, y además Ana sumaría un error):

01-11-2012 Pepe Perez Gonzalez 21
01-11-2012 Ana Lopez Fernandez
01-11-2012 Maria Garcia Martinez 11
...


En el ejemplo también aprovecho para aplicar el paso de parámetros que habíamos visto en una entrada anterior.


public class WordCounterMapper extends 
  Mapper <LongWritable, Text, Text, IntWritable> {

  private final static IntWritable cuenta = new IntWritable(1);
  private Text palabra = new Text();
  private String GRUPO_JUGADOR;
  private String GRUPO_ERROR;
 
  @Override
  protected void setup(Context context) throws IOException,
   InterruptedException {
 Configuration conf = context.getConfiguration();
  
 GRUPO_JUGADOR = conf.getStrings("grupos")[0];
 GRUPO_ERROR = conf.getStrings("grupos")[1];
  }


  @Override
  public void map(LongWritable key, Text values, Context context) 
     throws IOException, InterruptedException{
 
  String linea = values.toString();
  String[] elems = linea.split("\t");
    
  for(String word : elems){
     if (word.length() > 0){
      String player = "";
      if(word.contains("Ana")){
       player = "Ana";
      }else if(word.contains("Pepe")){
       player = "Pepe";
      }else if(word.contains("Maria")){
       player = "Maria";
      }else if(word.contains("Pablo")){
       player = "Pablo";
      }
      
      if(!"".equals(player)){
       context.getCounter(GRUPO_JUGADOR, player).increment(1);
       if(elems.length < 3){
        context.getCounter(GRUPO_ERROR, player).increment(1);
       }
      }
      
      palabra.set(word);
      context.write(palabra, cuenta);
      }
  }

  }
}

public class WordCounterDriver extends Configured implements Tool{

 public int run(String[] args) throws Exception {

  Configuration conf = new Configuration();
  conf.setStrings("grupos", GRUPO_JUGADOR, GRUPO_ERROR);
  
  Job job = new Job(conf);
  job.setJarByClass(WordCounterDriver.class);
  
  job.setJobName("Word Count");
  
  job.setMapperClass(WordCounterMapper.class);
  job.setReducerClass(WordCounterReducer.class);

  FileInputFormat.setInputPaths(job, new Path("pruebas/score.txt"));
  FileOutputFormat.setOutputPath(job, new Path("pruebas/out"));
  
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);
  
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(IntWritable.class);
  
  boolean success = job.waitForCompletion(true);
  
  long tipoAna = job.getCounters().findCounter(GRUPO_JUGADOR, "Ana").getValue();
  long tipoPepe = job.getCounters().findCounter(GRUPO_JUGADOR, "Pepe").getValue();
  long tipoMaria = job.getCounters().findCounter(GRUPO_JUGADOR, "Maria").getValue();
  long tipoPablo = job.getCounters().findCounter(GRUPO_JUGADOR, "Pablo").getValue();
  
  long tipoErrorAna = job.getCounters().findCounter(GRUPO_ERROR, "Ana").getValue();
  long tipoErrorPepe = job.getCounters().findCounter(GRUPO_ERROR, "Pepe").getValue();
  long tipoErrorMaria = job.getCounters().findCounter(GRUPO_ERROR, "Maria").getValue();
  long tipoErrorPablo = job.getCounters().findCounter(GRUPO_ERROR, "Pablo").getValue();
  
  System.out.println("Ana:   "+tipoAna+" - Errores: "+tipoErrorAna);
  System.out.println("Pepe:  "+tipoPepe+" - Errores: "+tipoErrorPepe);
  System.out.println("Maria: "+tipoMaria+" - Errores: "+tipoErrorMaria);
  System.out.println("Pablo: "+tipoPablo+" - Errores: "+tipoErrorPablo);
  
  return (success ? 0:1); 
 }
  
 public static void main(String[] args) throws Exception {
     int exitCode = ToolRunner.run(new Configuration(), new WordCounterDriver(), args);
     System.exit(exitCode);
 }

A tener en cuenta, no es necesario que el contador vaya sumando de 1 en 1, se pueden poner otros valores e incluso se puede hacer decrementar este valor poniendo un número negativo.

domingo, 16 de junio de 2013

Sqoop: Importar o exportar datos de HDFS a BBDD relacionales

Sqoop es el primer software del ecosistema de Hadoop que voy a ver.


Una de las grandes ventajas de Hadoop es que puede trabajar con datos cuya procedencia pueden ser distintos tipos de formas. Pero para acceder a estos datos Hadoop suele necesitar APIs externas para tratarlos y el que Hadoop acceda a la fuente original no suele ser muy eficiente, así que se han buscado formas para importar estos datos al sistema de ficheros HDFS.

Una de las formas en las que nos solemos encontrar los datos es en sistemas de bases de datos relacionales (RDBMS) y para extraer esta información se utiliza Apache Sqoop.

Sqoop es un software de open source que permite extraer datos de sistemas de almacenamiento de datos estructurados e importarlos en Hadoop para su posterior procesamiento. Permite exportar estos datos a formato texto, Sequence Files o a Avro. También dispone de opciones para en vez de escribir en el sistema de ficheros HDFS, escribir los datos en Hive.
También permite exportar datos de HDFS e importarlos a una BBDD relacional.

Con Sqoop puedes importar todas las tablas, una o incluso sólo porciones de ellas. Se puede configurar de tal forma que haga una importación inicial y posteriormente haga actualizaciones incrementales.

Utiliza un job MapReduce para hacer esa importación, dispone de 4 Mappers por defecto (es configurable) y los datos los importa a ficheros de texto o a Sequence Files.

Instalación de Sqoop

Esta instalación es sólo para la versión 1.x de Sqoop, y no para la 2.

Prerequisitos:

- Java 1.6
- Hadoop 0.20x o más instalado.
- Un RDBMS, en mi caso dispongo de MAMP con MySQL

Para instalarlo primero tenemos que descargar la última versión del software (en este momento la 1.4.3) desde este enlace.
Y lo descomprimimos en nuestra carpeta donde estamos trabajando con hadoop, en mi caso /usr/local/hadoop/sqoop-1.4.3.bin__hadoop-1.0.0

Una vez descomprimido nos posicionamos a través del terminal en la carpeta sqoop-1.4.3.bin__hadoop-1.0.0 y hay que compilarlo introduciendo el comando

ant -Dhadoopversion=20

-Dhadoopversion=20 se indica ya que al instalar Hadoop éste se instala con las versiones CDH3 y CDH4 -la vieja y la nueva-, a través de este comando indicamos que nos descargamos y estamos configurando sqoop para la versión CDH4, si no lo indicamos configuraríamos sqoop para la versión CDH3 y a la hora de realizar pruebas nos daría error.

Configuramos el PATH con el sqoop

export SQOOP_HOME=/usr/local/hadoop/sqoop-1.4.3.bin__hadoop-1.0.0
export PATH=$SQOOP_HOME/bin:$PATH


Y por último comprobamos que Sqoop funciona introduciendo el comando

sqoop help


Si no hemos incluído el HADOOP_HOME en el PATH del sistema, tendremos que configurarlo a través del fichero de configuración.
En la carpeta  /usr/local/hadoop/sqoop-1.4.3.bin__hadoop-1.0.0/conf/ nos encontramos con el fichero sqoop-env-template.sh, habrá que renombrarlo a sqoop-env.sh y editarlo quedando de la forma:

#Set path to where bin/hadoop is available
export HADOOP_COMMON_HOME=/usr/local/hadoop/hadoop-1.0.4
#Set path to where hadoop-*-core.jar is available
export HADOOP_MAPRED_HOME=/usr/local/hadoop/hadoop-1.0.4



Ahora, para poder conectar a la base de datos de MySQL necesitamos descargar el Driver en este enlace, descomprimimos el fichero y ponemos la librería mysql-connector-java-5.1.24-bin.jar en la carpeta /usr/local/hadoop/sqoop-1.4.3.bin__hadoop-1.0.0/lib.

Para comprobar que conecta ejecutamos el comando de listar para ver qué esquemas tenemos en la base de datos

sqoop list-databases --connect jdbc:mysql://localhost --username root --password root



Para probar que sqoop funciona importaremos una tabla.
He creado un esquema llamado 'HADOOP' y en él una tabla llamada 'score', en ella he introducido los datos de puntuaciones de juegos que he utilizado de ejemplo en otros artículos (os dejo el script de creación de la tabla y la inserción de los datos).
A continuación (anteriormente tenemos que haber arrancado los demonios hadoop) introducimos el comando:

sqoop import --connect jdbc:mysql://localhost/HADOOP --table score --username root --password root


Veremos cómo se van a ejecutar las tareas MapReduce y si ejecutamos el comando de listar el sistema de ficheros HDFS (hadoop fs -ls) veremos que se ha creado una carpeta score que contendrá:


elena$ hadoop fs -ls score
Found 6 items
-rw-r--r--   1 elena supergroup          0 2013-04-10 19:08 /user/elena/score/_SUCCESS
drwxr-xr-x   - elena supergroup          0 2013-04-10 19:06 /user/elena/score/_logs
-rw-r--r--   1 elena supergroup        333 2013-04-10 19:08 /user/elena/score/part-m-00000
-rw-r--r--   1 elena supergroup        344 2013-04-10 19:08 /user/elena/score/part-m-00001
-rw-r--r--   1 elena supergroup        306 2013-04-10 19:08 /user/elena/score/part-m-00002
-rw-r--r--   1 elena supergroup        342 2013-04-10 19:08 /user/elena/score/part-m-00003



Como comenté anteriormente, Sqoop por defecto está configurado con 4 mappers, es por esa razón que ha creado los 4 ficheros part. Si queremos que sólo se ejecute uno para tener una única salida ejecutaríamos el comando sqoop import añadiendo el atributo -m 1

elena$ sqoop import --connect jdbc:mysql://localhost/HADOOP --table score --username root --password root -m 1

elena$ hadoop fs -ls score
Found 3 items
-rw-r--r--   1 elena supergroup          0 2013-04-10 19:13 /user/elena/score/_SUCCESS
drwxr-xr-x   - elena supergroup          0 2013-04-10 19:13 /user/elena/score/_logs
-rw-r--r--   1 elena supergroup       1325 2013-04-10 19:13 /user/elena/score/part-m-00000


Si listamos estarán todos los datos que teníamos en nuestra base de datos MySQL:

elena$ hadoop fs -cat score/part-m-00000
1,2012-11-01,Pepe Perez Gonzalez,21
2,2012-11-01,Ana Lopez Fernandez,14
3,2012-11-01,Maria Garcia Martinez,11
4,2012-11-01,Pablo Sanchez Rodriguez,9
5,2012-11-01,Angel Martin Hernandez,3
6,2012-11-15,Pepe Perez Gonzalez,22
7,2012-11-15,Maria Garcia Martinez,15
8,2012-11-15,Cristina Ruiz Gomez,23
9,2012-12-01,Pepe Perez Gonzalez,25
10,2012-12-01,Ana Lopez Fernandez,15
11,2012-12-01,Pablo Sanchez Rodriguez,8
...









martes, 14 de mayo de 2013

Map Files

Los Map Files son un tipo de ficheros Sequence Files que contienen un index que permite hacer búsquedas por key. Realmente es un directorio que contiene dos sequence files, uno con el índice, y el otro con los datos.
Se puede pensar en este tipo de ficheros como si fueran una especie de java.util.Map.
Si listamos la carpeta de salida donde se generan estos dos ficheros:


$ hadoop-1.0.4/bin/hadoop fs -ls pruebas/poemamapfile
Found 2 items
-rw-r--r--   1 elena supergroup        780 2013-04-22 19:42 /user/elena/pruebas/poemamapfile/data
-rw-r--r--   1 elena supergroup        203 2013-04-22 19:42 /user/elena/pruebas/poemamapfile/index
$


El índice no guarda todos los registros, sino que guarda el valor cada cierto número de registros, por defecto 128. Es decir, el fichero de índice va a tener un formato parecido a este:

1          128
129      6079
257      12054
385      18030
513      ...


Es decir, el número de registro y el offset de éste. Como vemos, el índice contiene registros cada 128.
Si quisiéramos cambiar este intervalo, que en vez los 128 por defecto, poner otro número, usaríamos:

writer.setIndexInterval(nIntervalos);

Siendo nIntervalos cada cuántos registros queremos que se indexen. Si nIntervalos  = 5, la visualización del índice sería:

1        128
6        1215
11      2410
16      3606
21      ...



Crear un MapFile


public class CreateMapFile {

 private static final String[] POEMA = { 
  "El ciego sol se estrella",
  "en las duras aristas de las armas,",
  "llaga de luz los petos y espaldares",
  "y flamea en las puntas de las lanzas.",
  "El ciego sol, la sed y la fatiga",
  "Por la terrible estepa castellana,",
  "al destierro, con doce de los suyos",
  "-polvo, sudor y hierro- el Cid cabalga.",
  "Cerrado está el mesón a piedra y lodo.",
  "Nadie responde... Al pomo de la espada",
  "y al cuento de las picas el postigo",
  "va a ceder ¡Quema el sol, el aire abrasa!"};
 
 private static final String rutaDestino = new String ("pruebas/poemamapfile");

 public static void main(String[] args) throws IOException {
  Configuration conf = new Configuration();
  FileSystem fs = FileSystem.get( conf);
  //Path path = new Path(rutaDestino);

  IntWritable key = new IntWritable();
  Text value = new Text();
  
  MapFile.Writer writer = new MapFile.Writer(conf, fs, 
   rutaDestino, key.getClass(), value.getClass());

  //Indico un intervalo de índice diferente a 128 (por defecto)
  writer.setIndexInterval(5);
   
  //No usaré posiciones consecutivas.
  int pos = 1;
  for (int i = 0; i < POEMA.length; i++) { 
   // La key es el número de línea
   key.set(pos); 
   // El value es la línea del poema correspondiente
   value.set(POEMA[i]); 
   // Escribimos el par en el sequenceFile 
   
   writer.append(key, value);
   pos += 3;
  }
  writer.close();
 }

}


Leer un MapFile


public class ReadMapFile {

 private static final String rutaDestino = new String ("pruebas/poemamapfile");
 
 public static void main(String[] args) throws IOException, InstantiationException, IllegalAccessException {
  Configuration conf = new Configuration();
  FileSystem fs = FileSystem.get(conf);

  MapFile.Reader reader = new MapFile.Reader(fs, rutaDestino, conf);
  
  IntWritable key = (IntWritable) reader.getKeyClass().newInstance();
  Text value = (Text) reader.getValueClass().newInstance();
  
  StringBuilder strBuilder;
  while(reader.next(key, value)){
   strBuilder = new StringBuilder(key.toString()).append(" - ").append(value.toString());
   System.out.println(strBuilder);
  }

  // Posiciono el reader en una key dada y conocida, si no existe la key daría un error
  value = (Text)reader.get(new IntWritable(7), new Text());
  strBuilder = new StringBuilder(" Probando get() ").append(value.toString());
  System.out.println(strBuilder);
  
  // Busco una key a partir de un valor, que si no existe, me daría el valor
  // siguiente más próximo y me posiciona el reader en ese lugar.
  key = (IntWritable) reader.getClosest(new IntWritable(11), new Text());
  strBuilder = new StringBuilder(" Probando getClosest() ").append(key.toString());
  System.out.println(strBuilder);

  
  reader.close();
 }

}


En la creación las key deben estar ordenadas, si intentamos crear un mapfile con un orden aleatorio, dará error de tipo:
Exception in thread "main" java.io.IOException: key out of order: n after m


Al igual que en los Sequence Files el MapFile.Reader nos da opciones para posicionarnos en el reader.
A través del método get(), dada una key conocida, aunque si esa key no existe al intentar acceder al Objeto devuelto nos dará error.
Y a través del método getClosest(), nos posicionaremos en el valor más próximo después de la key introducida.

lunes, 22 de abril de 2013

Paso de Parámetros en Hadoop

Algo tan simple como un paso de parámetros entre el Driver y el Mapper o Reducer puede llevarnos a un error ya que como desarrolladores estamos acostumbrados a que pueda haber una cierta "comunicación" entre las clases.

En Hadoop esto no es correcto, y a estas alturas deberíamos tener claro que este tipo de aplicaciones se ejecuta de forma distribuída, que cada tarea se ejecuta en una JVM diferente e incluso en distintos nodos del clúster.

Voy a dejaros dos ejemplos, uno con la forma correcta y otro con la forma incorrecta por si queréis ver qué pasa, ya que forma la incorrecta no es que dé un error, el Job va a ejecutarse sin problemas, pero el parámetro no se pasará.

También hay que tener cuidado, ya que si se ejecuta en modo local, el paso de parámetros SÍ que funcionará. Sin embargo, no funcionará en el momento que llevemos nuestro programa al modo distribuído o pseudo-distribuído.

El ejemplo es muy simple y ni si quiera vamos a usar la función Reducer. A partir del fichero score.txt que ya he usado para otros ejemplos de la forma:

01-11-2012 Pepe Perez Gonzalez 21
01-11-2012 Ana Lopez Fernandez 14
15-02-2013 Angel Martin Hernandez 3
01-11-2012 Maria Garcia Martinez 11
01-11-2012 Pablo Sanchez Rodriguez 9
01-11-2012 Angel Martin Hernandez 3
15-01-2013 Pepe Perez Gonzalez 17
15-01-2013 Maria Garcia Martinez 3
...

Sólo queremos como salida la lista de personas con una puntuación mayor de 25, y ese valor lo queremos pasar desde el Driver.

Forma Incorrecta

 
public class PersonaScoreDriver {
 
 private static int score;
 
 public static void main(String[] args) throws Exception {
  Configuration conf = new Configuration();
  //Indicamos el parámetro a pasar
  
  score = 25;
    
  Job job = new Job(conf);
  job.setJarByClass(PersonaScoreDriver.class);
  job.setJobName("Persona Score");
  
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);

  FileInputFormat.setInputPaths(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  
  job.setInputFormatClass(KeyValueTextInputFormat.class);
  
  job.setMapperClass(MyMapper.class);
  job.setNumReduceTasks(0);

  boolean success = job.waitForCompletion(true);
  System.exit(success ? 0:1);  
 }
 
 private static class MyMapper extends Mapper{
  @Override
  public void map(Text key, Text value,
    Context context) throws IOException, InterruptedException {
   
   int s = score;
   String[] personaSplit = value.toString().split(" ");
   
   if(personaSplit.length == 4){
    int mScore = Integer.valueOf(personaSplit[3]);
    
    if(mScore >= s)
     context.write(key, value);
   }
  }
 }
}


Forma Correcta

 
public class PersonaScoreDriver {
 
 private static class MyMapper extends Mapper{
  private int score;
  
  @Override
  protected void setup(Context context) throws IOException,
    InterruptedException {
   Configuration conf = context.getConfiguration();
   //Indicas cuál es el id del parámetro a recoger y el valor
   // por defecto.
   score = conf.getInt("score", 0);
  }
  
  @Override
  public void map(Text key, Text value,
    Context context) throws IOException, InterruptedException {
   
   int s = score;
   String[] personaSplit = value.toString().split(" ");
   
   if(personaSplit.length == 4){
    int mScore = Integer.valueOf(personaSplit[3]);
    
    if(mScore >= s)
     context.write(key, value);
   }
  }
 }
 
 public static void main(String[] args) throws Exception {
  Configuration conf = new Configuration();
  //Indicamos el parámetro a pasar
  conf.setInt("score", 25);
    
  Job job = new Job(conf);
  job.setJarByClass(PersonaScoreDriver.class);
  job.setJobName("Persona Score");
  
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);

  FileInputFormat.setInputPaths(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  
  job.setInputFormatClass(KeyValueTextInputFormat.class);
  
  job.setMapperClass(MyMapper.class);
  job.setNumReduceTasks(0);

  boolean success = job.waitForCompletion(true);
  System.exit(success ? 0:1);  
 }
}


Hay que tener en cuenta que esta forma de pasar información entre el Driver y las tareas (map o reduce) es muy útil siempre cuando esta información no ocupa demasiada memoria ya que esta información se copia en varios sitios: en la memoria del Driver, en la memoria del TaskTracker y en la memoria de la JVM que ejecuta la tarea.
Por lo tanto, si se trata de una información grande (por ejemplo, varios TB), conviene usar otro método (por ejemplo: almacenar esta información en un fichero HDFS y acceder a esta información vía el método setup(), o bien usar Distributed Cache).

Recordad que este código también lo podréis descargar desde este enlace.