Recuerda que puedes descargarte algunos de los ejemplos en la pestaña de Código Fuente

miércoles, 13 de febrero de 2013

HDFS: Interfaz Java (FileSystem)

Como ya comenté en una entrada anterior (Hadoop Distributed File System (HDFS)), para acceder a HDFS lo podemos hacer de dos formas, por un lado y como vimos en esa misma entrada (con algunos ejemplos) utilizando un terminal y a través de los comandos propios de hadoop y por otro lado a través de la API de Java, que es la parte que vamos a ver ahora.

Para interactuar con HDFS con Java, se hace a través de la clase FileSystem.
El desarrollo es en Java y los ejecutamos desde línea de comando tras haber arrancado los demonios Hadoop (/bin/start-all.sh).

Lo primero que he hecho es crear una carpeta pruebas en mi sistema de ficheros HDFS:
      hadoop fs -mkdir pruebas

Algunos ejemplos que también podéis descargar en este enlace son:

  • Copiar un fichero o carpeta del sistema de ficheros local a HDFS

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class LocalToHdfs {
 public static void main(String[] args) throws Exception{
  Configuration conf = new Configuration();
  FileSystem hdfs = FileSystem.get(conf);
  Path sourcePath = new Path(args[0]);
  Path destPath = new Path(args[1]);
  hdfs.copyFromLocalFile(sourcePath, destPath);
 }
}


Y este es el resultado en el que copio desde mi carpeta local training el fichero score.txt a mi carpeta pruebas en HDFS:

$ hadoop jar training/JavaHdfs.jar LocalToHdfs training/score.txt /user/elena/pruebas 
$ hadoop fs -ls pruebas 
Found 1 items -rw-r--r--  1 elena supergroup  363 2013-02-11 19:31 /user/elena/pruebas/score.txt


  • Crear un fichero HDFS

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class CreateFileHdfs {

 public static void main(String[] args) throws Exception{
  String contentFile = "Este es el contenido del fichero\n";
  byte[] texto = contentFile.getBytes();
  Configuration conf = new Configuration();
  FileSystem hdfs = FileSystem.get(conf);
  Path path = new Path(args[0]+"/nombreFichero");
  FSDataOutputStream outputStream = hdfs.create(path);
  outputStream.write(texto, 0, texto.length);
 }
}


En el código Java tengo puesto el nombre del fichero y paso como argumento la carpeta donde se debe crear el fichero.

$ hadoop jar training/JavaHdfs.jar CreateFileHdfs pruebas 
$ hadoop fs -ls pruebas 
Found 2 items 
-rw-r--r--  1 elena supergroup  32 2013-02-11 19:34 /user/elena/pruebas/nombreFichero 
-rw-r--r--  1 elena supergroup  363 2013-02-11 19:31 /user/elena/pruebas/score.txt 
$ hadoop fs -cat pruebas/nombreFichero 
Este es el contenido del fichero


  • Leer el contenido de un fichero HDFS

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

public class ReadFileHdfs {

 public static void main(String[] args) throws Exception {
  Configuration conf = new Configuration();
  FileSystem hdfs = FileSystem.get(conf);
  Path path = new Path(args[0]+"/nombreFichero");
  FSDataInputStream inputStream = hdfs.open(path);
  IOUtils.copyBytes(inputStream, System.out, conf, true); 
 }
}

$ hadoop jar training/JavaHdfs.jar ReadFileHdfs pruebas 
Este es el contenido del fichero 


  • Borrar un fichero HDFS

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class DeleteFileHdfs {

 public static void main(String[] args) throws Exception{
  Configuration conf = new Configuration();
  FileSystem hdfs = FileSystem.get(conf);
  Path path = new Path(args[0]+"/nombreFichero");
  hdfs.delete(path, false);
 }
}


$ hadoop jar training/JavaHdfs.jar DeleteFileHdfs pruebas 
$ hadoop fs -ls pruebas 
Found 1 items 
-rw-r--r--  1 elena supergroup  363 2013-02-11 19:31 /user/elena/pruebas/score.txt 

sábado, 9 de febrero de 2013

Alta Disponibilidad

En la última versión de Hadoop, la CDH4, se ha puesto en marcha la Alta Disponibilidad.

Hasta ahora, el NameNode era un punto de fallo crítico (SPOF en inglés) de nuestro sistema Hadoop. Si el NameNode falla, todos los clientes, incluído los Job MapReduce se inutilizan.
El administrador tendría que encargarse de crear un nuevo NameNode a partir de una réplica de los metadata y configurar los clientes y DataNodes para utilizar este nuevo NameNode, más todas las tareas que hay por detrás hasta que el NameNode está disponible para funcionar.
Hasta que no se hace todo eso, el sistema se encuentra bloqueado.

Un tiempo de recuperación largo puede ser un grabe problema en ciertas situaciones.

Como hemos visto, en un cluster Hadoop disponemos de un NameNode y de un Secondary Namenode, pero si la Alta Disponibilidad está activada, en vez de disponer de estos dos elementos dispondríamos de un Active NameNode y de un Standby NameNode que estarían haciendo las mismas funciones de principal y secundario y el Standby realizando las tareas que hemos visto que ejerce el Secondary.
Pero en este caso, si el Active NameNode falla, el Standby NameNode se activa automáticamente (para esto se necesita tener un cluster Zookeeper) y empezaría a funcionar como el NameNode ya que el Standby NameNode siempre tiene en memoria la misma información de metadata que el NameNode activo (en caso de alta disponibilidad, los DataNodes envían sus informes tanto al NameNode activo como al Standby).

lunes, 4 de febrero de 2013

Instalación de Hadoop en Mac OS

En este artículo describo la instalación de Hadoop en un ordenador MAC.
Cuando finalicemos estas configuraciones y arranquemos los demonios Hadoop, podremos trabajar en un clúster en modo pseudo-distribuído.

Esta instalación es bastante genérica y un compañero ha probado a hacer la misma instalación en Linux y ha funcionado a la perfección.
En el caso de Windows no voy a entrar porque las instalaciones de Hadoop en este entorno son demasiado complejas.

Remarcar también que esta instalación se ha hecho a medida para mi portátil personal, con sus limitaciones hardware. En ningún caso se pretende tener una configuración para un servidor u otro
entorno de producción.

Prerequisitos:

- Java 1.6 instalado (java -version)

Lo primero es descargar Hadoop desde su web: http://hadoop.apache.org.
En este momento la última versión estable es la 1.0.4, aquí tenéis el enlace a las releases:
http://hadoop.apache.org/releases.html

De toda la lista que aparece, descargo hadoop-1.0.4.tar.gz, este paquete contiene tanto los fuentes como los binarios.

Hay que elegir una ruta para la instalación, en este caso va a ser /usr/local/hadoop
Como mi usuario no está en modo administrador, no puedo crear las carpetas necesarias a través del finder, por eso abro un terminal y paso a modo administrador: 

sudo su - 

este comando pedirá la contraseña de tu usuario (en mi caso, elena)

Nos posicionamos en el directorio donde queremos crear el directorio, y lo creamos:

cd /usr/local
mkdir hadoop


Y ahora le damos a la carpeta hadoop permisos al usuario (en mi caso elena):

chown elena hadoop

Salimos del modo Administrador y volvemos como usuario normal (elena) a través del comando
exit

Ahora ya podemos hacer modificaciones en el directorio a través del Finder.

Copiamos el fichero descargado hadoop-1.0.4.tar.gz en la carpeta hadoop recién creada.

Otra vez a través de línea de comandos entramos en la carpeta hadoop y descomprimimos el fichero (también se puede hacer a través del Finder haciendo doble click sobre el fichero):

cd /usr/local/hadoop
tar xvfz hadoop-1.0.4.tar.gz


Ahora vamos a proceder a las diferentes configuraciones para hacer funcionar Hadoop, va a haber que modificar los ficheros:
  • core-site.xml
  • hdfs-site.xml
  • mapred-site.xml
  • hadoop-env.sh

Así que de nuevo a través del Finder accedemos a la carpeta /usr/local/hadoop/hadoo-1.0.4/conf, y antes de empezar a modificar estos 4 ficheros, creamos una copia de seguridad por precaución, llamándolos, por ejmplo: core-site.xml.orig, hdfs-site.xml.orig, mapred-site.xml.orig y hadoo-env.sh.orig

A continuación detallamos el contenido de dichos ficheros. Para cada propiedad de Hadoop, comentaré qué describe el valor asociado.

core-site.xml

<configuration>
    <property>
      <!-- Indicamos la URL donde se va a lanzar el servicio hdfs (namenode) -->
      <name>fs.default.name</name>
      <value>hdfs://localhost:8020</value>
    </property>
</configuration>

hdfs-site.xml


<configuration>
    <property>
      <!-- Como se trata de un clúster pseudodistribuído el factor de replicación de hdfs es 1 (por defecto si no se indica sería 3) -->
      <name>dfs.replication</name>
      <value>1</value>
    </property>
    <property>
      <!-- Deshabilitamos la seguridad en hdfs -->
      <name>dfs.permissions</name>
      <value>false</value>
    </property>
    <property>
      <!-- Indicamos el directorio donde se almacenarán los metadatos de hdfs (namenode). En esta configuración hemos localizado todos los datos que se generan en hadoop bajo una misma carpeta /data -->
      <name>dfs.name.dir</name>
      <value>/usr/local/hadoop/hadoop-1.0.4/data/dfs/nn</value>
    </property>
    <property>
      <!-- Indicamos el directorio bajo el cual se almacenarán los bloques de datos hdfs (datanode) -->
      <name>dfs.data.dir</name>
      <value>/usr/local/hadoop/hadoop-1.0.4/data/dfs/dn</value>
    </property>
    <property>
      <!-- Este parámetro permite salir más rápido del safemode. Indica el número de segundos que espera el namenode cuando haya detectado la localización de los bloques de los datanodes -->
      <name>dfs.safemode.extension</name>
      <value>0</value>
    </property>
    <property>
      <!-- Configuramos el tamaño de bloque en hdfs (estándar 128MB), como no pretendo hacer pruebas con demasiados datos, lo he configurado a 16MB -->
      <name>dfs.block.size</name>
      <value>16777216</value>
    </property>
</configuration>


mapred-site.xml

<configuration>
    <property>
      <!-- Indicamos la URL donde se va a lanzar el demonio del jobtracker -->
      <name>mapred.job.tracker</name>
      <value>localhost:8021</value>
    </property>
    <property>
      <!-- Configura el tamaño del HEAP de las tareas map o reduce. Por defecto es 200, pero reducimos a 100 por limitación de RAM en mi portátil -->
      <name>mapred.child.java.opts</name>
      <value>-Xmx100m</value>
    </property>
    <property>
      <!-- Directorio donde se almacenan datos temporales de los job mapreduce -->
      <name>mapred.local.dir</name>
      <value>/usr/local/hadoop/hadoop-1.0.4/data/mapred/local</value>
    </property>
    <property>
      <!-- Define el número mínimo de tareas map a ejecutar para cualquier job -->
      <name>mapred.map.tasks</name>
      <value>1</value>
    </property>
    <property>
      <!-- Define el número máximo de tareas map que se pueden ejecutar al mismo tiempo -->
      <name>mapred.tasktracker.map.tasks.maximum</name>
      <value>2</value>
    </property>
    <property>
      <!-- Define el número máximo de tareas reduce que se pueden ejecutar al mismo tiempo. Como suele haber menos reduce que map, lo bajo a 1 (por defecto es 2) -->
      <name>mapred.tasktracker.reduce.tasks.maximum</name>
      <value>1</value>
    </property>
    <property>
      <!-- Número de jobs ejecutados que permanecen en la memoria del jobtracker. Mejor un número pequeño para no despilfarrar memoria -->
      <name>mapred.job.tracker.retiredjobs.cache.size</name>
      <value>15</value>
    </property>
    <property>
      <!-- Tamaño en MegaBytes del buffer en las tareas map o reduce donde se almacenan los datos intermedios -->
      <name>io.sort.mb</name>
      <value>25</value>
    </property>
    <property>
      <!-- Número de veces que se replican los ficheros mapreduce en el distributed cache -->
      <name>mapred.submit.replication</name>
      <value>1</value>
    </property>
</configuration>


hadoop-env.sh

Descomentamos las líneas JAVA_HOME y HADOOP_HEAPSIZE y deberán quedar de la forma siguiente:
export JAVA_HOME=/System/Library/Frameworks/JavaVM.framework/Home
export HADOOP_HEAPSIZE=128


Una vez modificados los ficheros, pasamos a formatear el HDFS:

Creamos un directorio para guardar los logs en la carpeta raíz de hadoop, luego nos posicionamos en la carpeta bin y ejecutamos el comando format:

cd /usr/local/hadoop/hadoop-1.0.4/
mkdir -p logs data/dfs/nn data/dfs/dn data/mapred/local
cd /usr/local/hadoop/hadoop-1.0.4/bin

./hadoop namenode -format


Este es el resultado que debemos obtener:

elena:bin elena$ ./hadoop namenode -format
13/02/24 13:08:21 INFO namenode.NameNode: STARTUP_MSG:
/************************************************************
STARTUP_MSG: Starting NameNode
STARTUP_MSG: host = elena.local/192.168.1.129
STARTUP_MSG: args = [-format]
STARTUP_MSG: version = 1.0.4
STARTUP_MSG: build = https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1.0 -r 1393290; compiled by 'hortonfo' on Wed Oct 3 05:13:58 UTC 2012
************************************************************/
Re-format filesystem in /usr/local/hadoop/hadoop-1.0.4/data/dfs/nn ? (Y or N) Y
13/02/24 13:08:28 INFO util.GSet: VM type = 64-bit
13/02/24 13:08:28 INFO util.GSet: 2% max memory = 2.51875 MB
13/02/24 13:08:28 INFO util.GSet: capacity = 2^18 = 262144 entries
13/02/24 13:08:28 INFO util.GSet: recommended=262144, actual=262144
13/02/24 13:08:29 INFO namenode.FSNamesystem: fsOwner=elena
13/02/24 13:08:29 INFO namenode.FSNamesystem: supergroup=supergroup
13/02/24 13:08:29 INFO namenode.FSNamesystem: isPermissionEnabled=false
13/02/24 13:08:29 INFO namenode.FSNamesystem: dfs.block.invalidate.limit=100
13/02/24 13:08:29 INFO namenode.FSNamesystem: isAccessTokenEnabled=false accessKeyUpdateInterval=0 min(s), accessTokenLifetime=0 min(s)
13/02/24 13:08:29 INFO namenode.NameNode: Caching file names occuring more than 10 times
13/02/24 13:08:29 INFO common.Storage: Image file of size 111 saved in 0 seconds.
13/02/24 13:08:29 INFO common.Storage: Storage directory /usr/local/hadoop/hadoop-1.0.4/data/dfs/nn has been successfully formatted.
13/02/24 13:08:29 INFO namenode.NameNode: SHUTDOWN_MSG:
/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at elena.local/192.168.1.129
************************************************************/


En la carpeta bin, vamos a editar el fichero start-dfs.sh  así que también creamos una copia de seguridad:

cp start-dfs.sh start-dfs.sh.orig

Lo que hay que modificar, es quitar las letras "s" finales de donde pone hadoop-daemons.sh al final del fichero.
El fichero origen contiene:

# note: datanodes will log connection errors until namenode starts
"$bin"/hadoop-daemon.sh --config $HADOOP_CONF_DIR start namenode $nameStartOpt
"$bin"/hadoop-daemons.sh --config $HADOOP_CONF_DIR start datanode $dataStartOpt
"$bin"/hadoop-daemons.sh --config $HADOOP_CONF_DIR --hosts masters start secondarynamenode

Y lo modificamos para que sea (es para ejecutar directamente los comandos de arranque Hadoop sin establecer una conexión ssh):

# note: datanodes will log connection errors until namenode starts
"$bin"/hadoop-daemon.sh --config $HADOOP_CONF_DIR start namenode $nameStartOpt
"$bin"/hadoop-daemon.sh --config $HADOOP_CONF_DIR start datanode $dataStartOpt
"$bin"/hadoop-daemon.sh --config $HADOOP_CONF_DIR --hosts masters start secondarynamenode

Sucede lo mismo en los ficheros start-mapred.sh, stop-dfs.sh y stop-mapred.sh, así que creamos las copias de seguridad y los editamos:

start-mapred.sh

cp start-mapred.sh start-mapred.sh.orig


Datos antes:
# start jobtracker first to minimize connection errors at startup
"$bin"/hadoops-daemon.sh --config $HADOOP_CONF_DIR start jobtracker
"$bin"/hadoops-daemons.sh --config $HADOOP_CONF_DIR start tasktracker


Datos después:

# start jobtracker first to minimize connection errors at startup
"$bin"/hadoop-daemon.sh --config $HADOOP_CONF_DIR start jobtracker
"$bin"/hadoop-daemon.sh --config $HADOOP_CONF_DIR start tasktracker


stop-dfs.sh

cp stop-dfs.sh stop-dfs.sh.orig

Datos antes:

"$bin"/hadoop-daemon.sh --config $HADOOP_CONF_DIR stop namenode
"$bin"/hadoop-daemons.sh --config $HADOOP_CONF_DIR stop datanode
"$bin"/hadoop-daemons.sh --config $HADOOP_CONF_DIR --hosts masters stop secondarynamenode

Datos después:
"$bin"/hadoop-daemon.sh --config $HADOOP_CONF_DIR stop namenode
"$bin"/hadoop-daemon.sh --config $HADOOP_CONF_DIR stop datanode
"$bin"/hadoop-daemon.sh --config $HADOOP_CONF_DIR --hosts masters stop secondarynamenode


stop-mapred.sh

cp stop-mapred.sh stop-mapred.sh.orig

Datos antes:

"$bin"/hadoop-daemon.sh --config $HADOOP_CONF_DIR stop jobtracker
"$bin"/hadoop-daemons.sh --config $HADOOP_CONF_DIR stop tasktracker


Datos después:

"$bin"/hadoop-daemon.sh --config $HADOOP_CONF_DIR stop jobtracker
"$bin"/hadoop-daemon.sh --config $HADOOP_CONF_DIR stop tasktracker



Una vez hechas todas las configuraciones vamos a ver si todo funciona correctamente.

Nos posicionamos en la carpeta raíz del hadoop:

cd /usr/local/hadoop/hadoop-1.0.4/

Y arrancamos todo con el comando:

bin/start-all.sh

Este comando arranca los 3 demonios de HDFS y los 2 de MapReduce.

Para comprobar que todo ha funcionado correctamente, podemos ver los ficheros de logs de cada demonio en la carpeta logs.
También podemos usar un navegador web y mirar las web UI de:
A tener en cuenta que en los 2 casos tenemos que ver 1 nodo activo

Para detener todo el cluster, se hace con el comando

bin/stop-all.sh


Hay que tener en cuenta que no hemos puesto el comando hadoop en el PATH del sistema, así que para ejecutarlo hay que posicionarse en la carpeta bin y ponerlo de la forma:

./hadoop fs