Wednesday, December 31, 2014

Kafka Round Robin Partitioner

In this blog we will cover how we can write a custom partition in Kafka to distribute data uniformly between a topic partitions.

Let’s consider, we have a TopicA in Kafka which has partitions count 5 and replication factor 3 and we want to distribute data uniformly between all the partitions so that all the partitions contains same data size.
The Kafka uses the default partition mechanism to distribute data between partitions, but in case of default partition mechanism it might be possible that our some partitions size larger than others. Suppose we have inserted 40 GB data into Kafka, then the data size of each partition may look like:
Partition0 à 10 GB
Partition1 à 8 GB
Partition2 à 6 GB
Partition3 à 9 GB
Partition4 à 11 GB

Hence, to distribute a data uniformly, we would need to code a round robin custom partition class.
In Kafka, we can write custom partition class by implementing the Kafka kafka.producer.Partitioner interface. The implemented interface contain a method called partition which has two arguments, one is key that we provide from producer and use to partition the data and second one is number of partitions of a topic. The partition method contains logic to calculate the destination partition and return the target partition number.

import java.util.concurrent.atomic.AtomicInteger;
import org.apache.log4j.Logger;
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

/**
 * Round robin partitioner using a simple thread safe AotmicInteger
 */
public class RoundRobinPartitioner implements Partitioner {
    private static final Logger log = Logger.getLogger(RoundRobinPartitioner.class);

    final AtomicInteger counter = new AtomicInteger(0);

    public RoundRobinPartitioner(VerifiableProperties props) {
        log.trace("Instatiated the Round Robin Partitioner class");
    }
    /**
     * Take key as value and return the partition number
     */
    public int partition(Object key, int partitions) {

    int partitionId = counter.incrementAndGet() % partitions;
if (counter.get() > 65536) {
           counter.set(0);
}
return partitionId; 
    }
}

Now, while writing a producer we can set “partitioner.class” property in the instance of kafka.producer.ProducerConfig.

Properties props = new Properties();
props.put("partitioner.class",”com.learnining.kafka.RoundRobinPartitioner");

After using the round robin partition class, the disk usage of each partition will be look like:
Partition0 à 8 GB
Partition1 à 8 GB
Partition2 à 8 GB
Partition3 à 8 GB
Partition4 à 8 GB



Saturday, December 7, 2013

I’ve written a book “Instant Apache Sqoop”

I recently finished writing a book Instant Apache SqoopApache Sqoop is a tool which is designed for moving data between Hadoop Ecosystem and structured data stores such as relational databases.
Instant Apache Sqoop covers following topics:
  • Working with the import process
  • Incremental import
  • Populating the HBase table
  • Importing data into HBase
  • Populating the Hive table
  • Importing data into Hive
  • Working with the export process
  • Exporting data from Hive
  • Using Sqoop connectors
Instant Apache Sqoop is a practical, hands-on guide that provides you with a number of clear, step-by-step exercises that will help you to take advantage of the real power of Apache Sqoop and give you a good grounding in the knowledge required to transfer data between RDBMS and the Hadoop ecosystem.


Tuesday, January 10, 2012

Sqoop export and import commands

Sqoop Import Examples:
Sqoop Import :- Import data from a relational database management system (RDBMS) such as MySQL or Oracle into the Hadoop Distributed File System (HDFS) and its subprojects (Hive, HBase).


Import the data (MySQL table) to HBase:

Case 1: If table have primary key and import all the column of MySQL table into HBase table.

$ bin/sqoop import --connect jdbc:mysql://localhost/db1 --username root --password root --table tableName --hbase-table hbase_tableName  --column-family hbase_table_col1 --hbase-create-table

Case 2: If table have primary key and import only few columns of MySQL table into HBase table.  

$ bin/sqoop import --connect jdbc:mysql://localhost/db1 --username root --password root --table tableName --hbase-table hbase_tableName --columns column1,column2 --column-family hbase_table_col1 --hbase-create-table

Note : Column names specified in --columns attribute must contain the primary key column.

Case 3: If table doesn't have primary key then choose one column as a hbase-row-key. Import all the column of MySQL table into HBase table.

$ bin/sqoop import --connect jdbc:mysql://localhost/db1 --username root --password root --table tableName --hbase-table hbase_tableName --column-family hbase_table_col1 --hbase-row-key column1 --hbase-create-table

Case 4: If table doesn't have primary key then choose one column as a hbase-row-key. Import only few columns of MySQL table into HBase table.

$ bin/sqoop import --connect jdbc:mysql://localhost/db1 --username root --password root --table tableName --hbase-table hbase_tableName --columns column1,column2 --column-family hbase_table_col --hbase-row-key column1 --hbase-create-table 

Note: Column name specified in hbase-row-key atribute must be in columns list. Otherwise command will execute successfully but no records are inserted into hbase.


Note : The value of primary key column or column specified in --hbase-row-key attribute become the HBase row value. If MySQL table doesn't have primary key or column specified in --hbase-row-key attribute doesn't have unique value then there is a lost of few records.

Example : Let us consider a MySQL table test_table which have two columns name,address. The table test_table doesn't have primary key or unique key column.

Records of test_table:
________________
name    address
----------------
abc    123
sqw    345
abc    125
sdf    1234
aql    23dw


Run the following command to import test_table data into HBase:

$ bin/sqoop import --connect jdbc:mysql://localhost/db1 --username root --password root --table test_table --hbase-table hbase_test_table --column-family test_table_col1 --hbase-row-key name --hbase-create-table

Only 4 records are visible into HBase table instead of 5. In above example two rows have same value 'abc' of name column and value of this column is used as a HBase row key value. If record having value 'abc' of name column come then thoes record will inserted into HBase table. Next time, another record having the same value 'abc' of name column come then thoes column will overwrite the value previous column.

Above problem also occured if table have composite primary key because the one column from composite key is used as a HBase row key.

Import the data (MySQL table) to Hive

Case 1: Import MySQL table into Hive if table have primary key.

bin/sqoop-import  --connect jdbc:mysql://localhost:3306/db1 -username root -password password --table tableName  --hive-table tableName --create-hive-table --hive-import --hive-home path/to/hive_home

Case 2: Import MySQL table into Hive if table doesn't have primary key.

$ bin/sqoop-import  --connect jdbc:mysql://localhost:3306/db1 -username root -password password --table tableName  --hive-table tableName --create-hive-table --hive-import --hive-home path/to/hive_home --split-by column_name

or

$ bin/sqoop-import  --connect jdbc:mysql://localhost:3306/db1 -username root -password password --table tableName  --hive-table tableName --create-hive-table --hive-import --hive-home path/to/hive_home -m 1



Import the data (MySQL table) to HDFS


Case 1: Import MySQL table into HDFS if table have primary key.

$ bin/sqoop import -connect jdbc:mysql://localhost:3306/db1 -username root -password password --table tableName --target-dir /user/ankit/tableName

Case 2: Import MySQL table into HDFS if table doesn't have primary key.

$ bin/sqoop import -connect jdbc:mysql://localhost:3306/db1 -username root -password password --table tableName --target-dir /user/ankit/tableName  -m 1



Sqoop Export Examples:

Sqoop Export: export the HDFS and its subproject (Hive, HBase) data back into an RDBMS. 

Export Hive table back to an RDBMS:

By default, Hive will stored data using ^A as a field delimiter and \n as a row delimiter.

$ bin/sqoop export --connect jdbc:mysql://localhost/test_db --table tableName  --export-dir /user/hive/warehouse/tableName --username root --password password -m 1 --input-fields-terminated-by '\001'

where '\001' is octal representation of ^A.


Sunday, January 8, 2012

Flume-Solr Integration

My previous post contain the installation step of Flume in psuedo and distributed mode. Few days before I did a small POC to integrate Flume with Solr. I have created a new sink. This sink is usually used with the regexAll decorators that perform light transformation of event data into attributes. This attributes are converted into solr document and commited in solr.

I have used flume-0.9.3 and apache-solr-3.1.0 for this POC.

RegexAllExtractor decorator prepare events that contain attributes ready to be written into an Solr. Implementing a RegexAllExtractor decorator is very simple.


package com.cloudera.flume.core.extractors;

import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.ArrayList;
import java.util.List;

import com.cloudera.flume.conf.Context;
import com.cloudera.flume.conf.SinkFactory.SinkBuilder;
import com.cloudera.flume.conf.SinkFactory.SinkDecoBuilder;
import com.cloudera.flume.core.Attributes;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventSink;
import com.cloudera.flume.core.EventSinkDecorator;
import com.cloudera.util.Pair;
import com.google.common.base.Preconditions;

/**
 * This takes a regex and any number of attribute names to assign to each
 * sub pattern in pattern order
 *
 * Example 1:
 *   regexAll("(\d+):(\d+):(\d+)", "s1, s2, s3")
 *
 *   "123:456:789" -> {s1:123, s2:456, s3:789}
 *
 * Example 2:
 *   regexAll("(\d+):(\d+):(\d+)", "s1, s2")
 *
 *   "123:456:789" -> {s1:123, s2:456}
 *
 * Example 3:
 *   regexAll("(\d+):(\d+):(\d+)", "s1, , s2")
 *
 *   "123:456:789" -> {s1:123, s2:789}
 */
public class RegexAllExtractor extends EventSinkDecorator<EventSink> {
  
  final Pattern pat;
  final List<String> names;

  /**
   * This will not thrown an exception
   */
  public RegexAllExtractor(EventSink snk, Pattern pat, List<String> names) {
    super(snk);
    this.pat = pat;
    this.names = names;
  }

  /**
   * Convenience constructor that may throw a PatternSyntaxException (runtime
   * exn).
   */
  public RegexAllExtractor(EventSink snk, String regex, List<String> names) {
    this(snk, Pattern.compile(regex), names);
  }

  @Override
  public void append(Event e) throws IOException, InterruptedException {
    String s = new String(e.getBody());
    Matcher m = pat.matcher(s);
    String val = "";
    Integer grpCnt = m.groupCount();

    if(m.find()){
      for(int grp = 1; grp <= grpCnt; grp++){
        val = "";
        try {
          val = m.group(grp);
        } catch (IndexOutOfBoundsException ioobe) {
          val = "";
        }

        //Try/Catch so that we don't require there be the same number of names as patterns.
        try {
          //Ignore blank names. These are most likely sub patterns we don't care about keeping.
          if (!"".equals(names.get(grp-1))) {
            Attributes.setString(e, names.get(grp-1), val);
          }
        } catch (IndexOutOfBoundsException ioobe) {
          break;
        }
      }
    }
    super.append(e);
  }

  public static SinkDecoBuilder builder() {
    return new SinkDecoBuilder() {
      @Override
      public EventSinkDecorator<EventSink> build(Context context, String... argv) {
        Preconditions.checkArgument(argv.length == 2, "usage: regexAll(\"regex\", \"col1,col2\")");
        String regex = argv[0];
        Pattern pat = Pattern.compile(regex);
        String name = argv[1];
        ArrayList<String> nameList = new ArrayList<String>();
        String[] names = name.split("\\,");
        for(int i=0; i<names.length; ++i){
         nameList.add(names[i]);
        }

        EventSinkDecorator<EventSink> snk = new RegexAllExtractor(null, pat, nameList);
        return snk;

      }
    };
  }
  
  /**
  * This is a special function used by the SourceFactory to pull in this class
  * as a plugin decorator.
  */
      public static List<Pair<String, SinkDecoBuilder>> getDecoratorBuilders() {
          List<Pair<String, SinkDecoBuilder>> builders =
                  new ArrayList<Pair<String, SinkDecoBuilder>>();
          builders.add(new Pair<String, SinkDecoBuilder>("regexAll",
                  builder()));
          return builders;
   }
}

Flume-Solr sink commit a single records into an Solr server based on a single Flume event. This sink have one input attribute: url. The attribute url is the url of the output Solr server. Yes, that means currently one sink can be configured to output into just one Solr server. Implementing a SolrEventSink sink is very simple.

package com.cloudera.flume.handlers.solr;
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;

import org.apache.hadoop.hbase.util.Bytes;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;

import com.cloudera.flume.conf.Context;
import com.cloudera.flume.conf.SinkFactory.SinkBuilder;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventSink;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import com.cloudera.util.*;
import org.slf4j.LoggerFactory;

/**
* This class will convert attributes added into Flume event from RegexAllExtractor class into solr document and 
* store into solr.
*/
public class SolrEventSink extends EventSink.Base {

 static final Logger log = LoggerFactory.getLogger(SolrEventSink.class);
 private CommonsHttpSolrServer server;
 private String solrURL;

  public SolrEventSink(String url) {
   Preconditions.checkNotNull(url);
   this.solrURL = url;
   log.info("solr server url"+solrURL);
   try {
  this.server = new CommonsHttpSolrServer(solrURL);
 } catch (MalformedURLException e) {
  log.error("Invalid Solr URL");
 }
  }
  
  @Override
  public void append(Event e) throws IOException {
   
/* if(e.getAttrs().get((Object)"UnUsedField")==null)
 {
*/  // generate solr document.
  SolrInputDocument document = new SolrInputDocument();
  for (Entry<String, byte[]> a : e.getAttrs().entrySet()) {
   if(!a.getKey().equals("AckType") && !a.getKey().equals("AckTag") && !a.getKey().equals("AckChecksum") && !a.getKey().equals("tailSrcFile"))
    document.addField(a.getKey(), Bytes.toString(a.getValue()));  
  }
    try {
     // Add documnet into solr.
     if(document.size()!=0){
      server.add(document);
      // Commit the data into solr.
      server.commit();
     }
 } catch (SolrServerException e1) {
  log.error("SolrServerException : Exception communicating to the Solr Server instance");
 }catch (SolrException exception) {
  log.error("Solr Exception: "+ exception);
 }catch (Exception exception) {
  log.error("Exception : "+ exception);
 }
 //}
  }

  public static SinkBuilder builder() {
    return new SinkBuilder() {

      @Override
      public EventSink build(Context context, String... argv) {
        Preconditions.checkArgument(argv.length == 1,
            "usage: SolrEventSink(serverURL)");

        return new SolrEventSink(argv[0]);
      }

    };
  }
  
  /**
  * This is a special function used by the SourceFactory to pull in this class
  * as a plugin sink.
  */
    public static List<Pair<String, SinkBuilder>> getSinkBuilders() {
      List<Pair<String, SinkBuilder>> builders =
        new ArrayList<Pair<String, SinkBuilder>>();
      builders.add(new Pair<String, SinkBuilder>("solrEventSink", builder()));
      return builders;
    }
  
}

You need to add flume-plugin-solrEventSink and flume-plugin-regexAllExtractor jar into Flume lib dir. You can compile it from Flume sources and Solr jars, then you need to add following jars into Flume's lib dir.

Jar names :
    apache-solr-solrj-3.1.0.jar
    commons-codec-1.3.jar
    commons-fileupload-1.2.2.jar
    commons-httpclient-3.1.jar
    commons-io-1.4.jar
    commons-logging-1.0.4.jar
    geronimo-stax-api_1.0_spec-1.0.1.jar
    hbase-0.90.1-CDH3B4.jar
    lucene-analyzers-2.9.1.jar
    lucene-core-2.9.1.jar
    lucene-highlighter-2.9.1.jar
    lucene-memory-2.9.1.jar
    lucene-misc-2.9.1.jar
    lucene-queries-2.9.1.jar
    lucene-snowball-2.9.1.jar
    lucene-spellchecker-2.9.1.jar
    servlet-api-2.5.jar
    slf4j-api-1.6.1.jar
    slf4j-simple-1.6.1.jar
    solr-commons-csv-1.4.0.jar
    solr-core-1.4.0.jar
    stax-api-1.0.1.jar
    wstx-asl-3.2.7.jar

The last step is to add plugins to Flume configuration(flume-conf.xml) file .

<configuration>
<property>
<name>flume.plugin.classes</name>
 <value>com.cloudera.flume.handlers.solr.SolrEventSink,com.cloudera.flume.core.extractors.RegexAllExtractor</value>
</property>  
.
.
</configuration>

Run Solr server:
java -Dsolr.solr.home=path/to/solr_home/solr/  -Djetty.port=12000 -jar start.jar

Note: Solr schema.xml file must contain the entries of column names specified in configuration of regexAll decorator in field tag. Otherwise invalid column name error ocurred and document not saved in solr.

configuration of Flume agent:
source : tail("path/to/log/file")
sink :  agentSink("collector_machine_name",35853)

configuration of Flume collector:
source : collectorSource(35853)
sink : {regexAll("regex","column_names") => solrEventSink("solr_server_url")}


Example:-

Run Solr server :
    $ cd SOLR_HOME
    $ java -Dsolr.solr.home=path/to/SOLR_HOME/solr/  -Djetty.port=12000 -jar start.jar

Create a file test.log and add following lines:
abc,bcd,cde
abc,swd,qwe
bcd,wer,asd
ghj,rty,ghj

Run Flume master:

    $ cd FLUME_HOME
    $ bin/flume master
Run Flume agent:
    $ cd FLUME_HOME
    $ bin/flume node -n test_agent
Run Flume collector:
    $ cd FLUME_HOME
    $ bin/flume node -n test_collector

Configuration of Flume agent:
    source : tail("/path/to/test.log")
    sink :  agentSink("localhost",35853)

Configuration of Flume collector:
    source : collectorSource(35853)
    sink : {regexAll("([^,]*),([^,]*),([^,]*)","id,cat,name")=>solrEventSink("http://hadoop-namenode:12000/solr")}


Note : Solr schema.xml must contains the entries of id,cat,name in field tag. Otherwise invalid column name error ocurred and document not saved in Solr. 



Thursday, January 5, 2012

Error that occured in Hadoop and its sub-projects

1. OOZIE job failed:

Error message : ERROR is considered as FAILED for SLA
   
Cause 1 : Not able to find hadoop namenode (master), jobtracker machine.
Suppose you are running oozie, hadoop-master and job tracker on one machine  and datanode, tasktracker are running on another machine.

Your job.properties file contains following lines:
        nameNode=hdfs://localhost:9000
        jobTracker=localhost:9001
   
In above case, FS action will work fine because no map-reduce opertion is perform in FS action case. But, if you run map-reduce action then tasktracker will look hadoop-master on localhost machine becuase we have used localhost:9000 in job.properties file.
   
Solution : Used  IP of hadoop-namenode and jobtracker machine in job.properties file instead of localhost.   
   
Cause 2 : Oozie not able to find Mysql server.
Suppose I am using mysql as a metastore for hive.
Hive hive-default.xml file have following lines :
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://localhost:3306/hive?createDatabaseIfNotExist=true</value>
<description>JDBC connect string for a JDBC metastore</description>
</property>
       
Solution : Use IP of mysql machine instead of localhost. 


2. Zookeeper server not running:
Error message: Could not find my address: zk-serevr1 in list of ZooKeeper quorum servers
   
Causes :
HBase tries to start a ZK server on some machine but that machine isn't able to find itself in the hbase.zookeeper.quorum configuration. This is a name lookup problem. 

Solution:   
Use the hostname presented in the error message instead of the value you used (zk-server1). If you have a DNS server, you can set hbase.zookeeper.dns.interface and hbase.zookeeper.dns.nameserver in hbase-site.xml to make sure it resolves to the correct FQDN.

3. Hadoop-datanode job failed or datanode not running: java.io.IOException: File ../mapred/system/jobtracker.info could only be replicated to 0 nodes, instead of 1
   
Cause 1: Make sure atleast one datanode is running.

Cause 2: namespaceID of master and slaves machines are not same.
If you see the error java.io.IOException: Incompatible namespaceIDs in the logs of a datanode , chances are you are affected by bug HADOOP-1212 (well, I’ve been affected by it at least).
           
Solution :               
If namespaceID of master and slaves machines are not same. Than replace the namespaceID of slaves machine with master namespaceID.
- dfs/name/current/VERSION file contains the namespaceID of master machine
- dfs/data/current/VERSION file contains the namespaceID of master machine
        
Cause 3: Datanode instance running out of space.
Solution : Free some space.

Cause 4 : You may also get this message due to permissions. May be JobTracker can not create jobtracker.info on startup.

4.    Sqoop export command failed:
Error message:
attempt_201101151840_1006_m_000001_0, Status : FAILED
java.util.NoSuchElementException
at java.util.AbstractList$Itr.next(AbstractList.java:350)
at impressions_by_zip.__loadFromFields(impressions_by_zip.java:159)
at impressions_by_zip.parse(impressions_by_zip.java:108)

   
Cause : Given field separator is not valid
Solution : Specify correct field delimeter in sqoop export command.

5. HBase regionserver not running :

Error message: 2012-01-02 13:48:49,973 FATAL org.apache.hadoop.hbase.regionserver.HRegionServer: Master rejected startup because clock is out of sync
org.apache.hadoop.hbase.ClockOutOfSyncException: org.apache.hadoop.hbase.ClockOutOfSyncException: Server hadoop-datanode2,60020,1325492317440 has been rejected; Reported time is too far out of sync with master.  Time difference of 206141ms > max allowed of 30000ms

Solution: Clock of regionservers are not sync with master machine. Synchronized the clock of hbase master and regionserver machines.

Sunday, May 8, 2011

Installing Flume in the cluster - A complete step by step tutorial

Flume Cluster Setup :




In the previous post, you used Flume on a single machine with the default configuration settings. With the default settings, nodes automatically search for a Master on localhost on a standard port. In order for the Flume nodes to find the Master in a fully distributed setup, you must specify site-specific static configuration settings.

Before we start:-
Before we start configure flume, you need to have a running Hadoop cluster, which will be the centralize storage for flume. Please refer to Installing Hadoop in the cluster - A complete step-by-step tutorial post before continuing.

Installation steps:-

Perform following steps on Master Machine.
1. Download flume-0.9.1.tar.gz from  https://github.com/cloudera/flume/downloads   and extract to some path in your computer. Now I am calling Flume installation root as $FLUME_INSTALL_DIR. 

2. Edit the file /etc/hosts on the master machine (Also in agent and collector machines) and add the following lines.

192.168.41.67 flume-master
192.168.41.53 flume-collector
hadoop-namenode-machine-IP hadoop-namenode

3. Open the file $FLUME_INSTALL_DIR/conf/flume-site.xml and Edit the following properties.

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>flume.master.servers</name>
<value>flume-master</value>
</property>
<property>
<name>flume.collector.event.host</name>
<value>flume-collector</value>
<description>This is the host name of the default "remote" collector.
</description>
</property>
<property>
<name>flume.collector.port</name>
<value>35853</value>
<description>This default tcp port that the collector listens to in order to receive events it is collecting.
</description>
</property>
</configuration>

4. Repeat step 1 to 3 on collector and agents machines.
Note: - The Agent Flume nodes are co-located on machines with the service that is producing logs.

Start flume processes:-

1. Start Flume master:- The Master can be manually started by executing the following command on Master Machine.
        1.1 $Flume_INSTALL_DIR/bin/flume master
1.2 After the Master is started, you can access it by pointing a web browser to http://flume-master:35871/. This web page displays the status of all Flume nodes that have contacted the Master, and shows each node’s currently assigned configuration. When you start this up without Flume nodes running, the status and configuration tables will be empty.

2. Start Flume collector:- The Collector can be manually started by executing the following command on Collector Machine.
         2.1 $Flume_INSTALL_DIR/bin/flume node –n flume-collector

2.2 To check whether a Flume node (collector) is up, point your browser to the Flume Node status page athttp://flume-collector:35862/. Each node displays its own data on a single table that includes diagnostics and metrics data about the node, its data flows, and the system metrics about the machine it is running on. If you have multiple instances of the flume node program running on a machine, it will automatically increment the port number and attempt to bind to the next port (35863, 35864, etc) and log the eventually selected port.

2.3 If the node is up, you should also refresh the Master’s status page (http://flume-master:35871) to make sure that the node has contacted the Master. You brought up one node whose name is flume-collector, so you should have one node listed in the Master’s node status table.

3. Start Flume agent:- The Agent can be manually started by executing the following command on Agent Machine (agent Flume nodes are co-located on machines with the service that is producing logs.)
      3.1 $Flume_INSTALL_DIR/bin/flume node –n flume-agent

      3.2 Perform step 2.3 again.

Note: - Similarly you can start other Flume agent by executing following commands:-
Start second agent:- $Flume_INSTALL_DIR/bin/flume node –n flume-agent1
Start third agent:- $Flume_INSTALL_DIR/bin/flume node –n flume-agent2

Configuring Flume nodes via master:-

1. Configuration of Flume Collector: - On the Master’s web page click on the config link. Enter the following values into the "Configure a node" form, and then click Submit.
Node name: flume-collector
Source: collectorSource(35853)
Sink: collectorSink("hdfs://hadoop-namenode:9000/user/flume /logs/%Hoo ","%{host}-")
Note: - The collector writes to an HDFS cluster (assuming the HDFS namenode machine is called hadoop-namenode)

2. Configuration of Flume Agent:- On the Master’s web page, click on the config link. Enter the following values into the "Configure a node" form, and then click Submit.
Node name: flume-agent
Source: tail(“path/to/logfile”)
Ex:- tail("/home/$USER/logAnalytics/dot.log")
Sink: agentSink("flume-collector",35853)

Note: - Use same configuration for each Flume Agent.

Friday, May 6, 2011

Installing Flume in the pseudo mode - A complete step by step tutorial




Flume is a distributed, reliable, and available service for efficiently moving large amounts of data soon after the data is produced.

The primary use case for Flume is as a logging system that gathers a set of log files on every machine in a cluster and aggregates them to a centralized persistent store such as the Hadoop Distributed File System (HDFS).

Installation in pseudo-distributed mode:-


In pseudo-distributed mode, several processes of flume are run on single machine.There are two kinds of processes in the system:
1. Flume Master: - The Flume Master is the central management point, controls the Flume node data flows and monitors Flume nodes.
2. Flume Node: - The Flume nodes are divided into two categories:-
2.1 Flume Agent: - The agent Flume nodes are co-located on machines with the service that is producing logs.
2.2 Flume collector: - The collector listens for data from multiple agents, aggregates logs, and then eventually write the data to HDFS. 

Fig: - Flume processes and there configuration.

Before we start:-

Before we start configure flume, you need to have a running Hadoop cluster, which will be the centralize storage for flume. Please refere to Installing Hadoop in the cluster - A complete step by step tutorial post before continuing.


Installation steps:-

1. Download flume-0.9.1.tar.gz from https://github.com/cloudera/flume/downloads and extract to some path in your computer. Now I am calling Fllume installation root as $Flume_INSTALL_DIR.



2. The Master can be manually started by executing the following command:


2.1 $Flume_INSTALL_DIR/bin/flume master


2.2 After the Master is started, you can access it by pointing a web browser to http://localhost:35871/.This web page displays the status of all Flume nodes that have contacted the Master, and shows each node’s currently assigned configuration. When you start this up without Flume nodes running, the status and configuration tables will be empty.


3. The flume collector can be manually started by executing the following command in another terminal.


3.1 $Flume_INSTALL_DIR/bin/flume node –n flume-collector


3.2 To check whether a Flume node is up, point your browser to the Flume Node status page athttp://localhost:35862/. Each node displays its own data on a single table that includes diagnostics and metrics data about the node, its data flows, and the system metrics about the machine it is running on. If you have multiple instances of the flume node program running on a machine, it will automatically increment the port number and attempt to bind to the next port (35863, 35864, etc) and log the eventually selected port.


3.3 If the node is up, you should also refresh the Master’s status page (http:// localhost: 35871) to make sure that the node has contacted the Master. You brought up one node whose name is flume-collector, so you should have one node listed in the Master’s node status table.


4. Configuring a collector via master:-

4.1 On the Master’s web page click on the config link. Enter the following values into the "Configure a node" form, and then click Submit.

Node name:flume-collector

Source: collectorSource(35853)

Sink:collectorSink("hdfs://hadoop-namenode:9000/user/flume /logs/%Hoo ","%{host}-")

Note: - The collector writes to an HDFS cluster (assuming the HDFS nameNode is called namenode).


5. The flume node can be manually started by executing the following command in another terminal.

5.1 $Flume_INSTALL_DIR/bin/flume node –n flume-agent

5.2 Perform step 3.2 and 3.3 again.


6. Configuring an agent via master:-

6.1 On the Master’s web page, click on the config link. Enter the following values into the "Configure a node" form, and then click Submit.

Node name:flume-agent

Source: tail(“path/to/logfile”)
Ex:-tail("/home/impetus/logAnalytics/dot.log")

Sink: agentSink("localhost",35853)


7. To check whether data is stored into hdfs or not, you can check it by pointing browser to http://localhost:50070/.