Difference between revisions of "Hadoop: Contoh Program Sederhana"

From OnnoWiki
Jump to navigation Jump to search
(New page: Sumber: http://www.drdobbs.com/database/hadoop-writing-and-running-your-first-pr/240153197 Hadoop: Writing and Running Your First Project MapReduce on small datasets can be run e...)
 
Line 146: Line 146:
 
  ("dobbs", 42)
 
  ("dobbs", 42)
 
  ("doctor", 1214191)
 
  ("doctor", 1214191)
 +
 +
 +
MapReduce on small datasets can be run easily and without much coding or fiddling — provided you know what to do. Here's how.
 +
 +
Before we run our job, we need some driver code to wire up the mapper and reducer, which we do using the AggregateJob class, shown Listing Three (AggregateJob.java).
 +
Listing Three: AggregateJob.java.
 +
 +
package com.tom_e_white.drdobbs.mapreduce;
 +
 
 +
import org.apache.hadoop.conf.Configured;
 +
import org.apache.hadoop.fs.Path;
 +
import org.apache.hadoop.io.LongWritable;
 +
import org.apache.hadoop.io.Text;
 +
import org.apache.hadoop.mapreduce.Job;
 +
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 +
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 +
import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
 +
import org.apache.hadoop.util.Tool;
 +
import org.apache.hadoop.util.ToolRunner;
 +
 
 +
public class AggregateJob extends Configured implements Tool {
 +
 
 +
  @Override
 +
  public int run(String[] args) throws Exception {
 +
    Job job = new Job(getConf());
 +
    job.setJarByClass(getClass());
 +
    job.setJobName(getClass().getSimpleName());
 +
 
 +
    FileInputFormat.addInputPath(job, new Path(args[0]));
 +
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
 +
 
 +
    job.setMapperClass(ProjectionMapper.class);
 +
    job.setCombinerClass(LongSumReducer.class);
 +
    job.setReducerClass(LongSumReducer.class);
 +
 
 +
    job.setOutputKeyClass(Text.class);
 +
    job.setOutputValueClass(LongWritable.class);
 +
 
 +
    return job.waitForCompletion(true) ? 0 : 1;
 +
  }
 +
 
 +
  public static void main(String[] args) throws Exception {
 +
    int rc = ToolRunner.run(new AggregateJob(), args);
 +
    System.exit(rc);
 +
  }
 +
}
 +
 +
AggregateJob extends Hadoop's Configured class and implements the Tool interface's run() method. Doing this allows us to pass in configuration options at runtime, which can be very handy, as we'll see shortly.
 +
 +
The code to launch a job is in the run() method, and it is managed by the Job class from the org.apache.hadoop.mapreduce package. The Job instance specifies various things about the job: a name for display purposes, the mapper and reducer classes that we discussed earlier, and the job output types, which have to match the mapper and reducer output types. (In case your map output types are different from your reduce output types, you should also call setMapOutputKeyClass() and setMapOutputValueClass().) There are a few other things that we set that are less obvious.
 +
 +
The call to setJarByClass() is needed because MapReduce is a distributed system, and Hadoop needs to know which JAR file to ship to the nodes in the cluster running the map and reduce tasks. By calling this method, we tell Hadoop to look for the local JAR that contains the AggregateJob class. As long as the JAR contains both AggregateJob and ProjectionMapper, then the job will run, as there are no other dependencies. (If you need to ship third-party libraries, you can use the -libjars command-line flag, which takes a comma-separated list of local JAR file names.)
 +
 +
We also need to tell the job what input data to process and where to place the output. We do this via the static APIs on FileInputFormat and FileOutputFormat, using positional command-line arguments to specify the file paths.
 +
 +
The last line of the run() method launches the job and waits for it to complete. While it is running, it prints the progress on the console.
 +
Running the Job
 +
 +
Let's run the program:
 +
 +
 +
% $HADOOP_HOME/bin/hadoop jar target/hadoop-drdobbs-1.0.0.jar \
 +
  com.tom_e_white.drdobbs.mapreduce.AggregateJob data output
 +
 +
The hadoop jar command is a convenient way to run Hadoop programs because it adds all the Hadoop JARs onto the client classpath. It takes the JAR file and the main class as arguments and passes the remaining arguments to the program. In this case, they are interpreted as the input and output directories. When the program has completed, we can look at the output. Note that both the input and output is on the local filesystem by default in Hadoop.
 +
 +
% cat output/part-r-00000
 +
dobbs 42
 +
doctor 1214191
 +
 +
The output filenames are named by the reducer task number that produced them. Because we only ran a single reducer (the default), we only have a single output file: part-r-00000. Happily, it contains the output that we expected: the tab-separated word counts.
 +
 +
==A Single-Node Cluster==
 +
 +
We don't need to make any changes to the code to run the same program against a Hadoop cluster. We do need to load the data into HDFS, though, and tell the program which cluster to run against, so let's see how to do that.
 +
 +
The first thing we need is to find a suitable cluster. If you don't have access to a Hadoop cluster, you can install a single-node cluster (by following the Apache Hadoop instructions or using a Hadoop VM, which several Hadoop vendors provide for free).
 +
 +
Instead of running against data on the local filesystem, we will run against HDFS. Let's copy the sample data from the local filesystem to HDFS using the hadoop fs command:
 +
 +
% hadoop fs -copyFromLocal data data
 +
 +
The -copyFromLocal subcommand takes two or more arguments: All but the first refer to files or directories on the local filesystem, and the last refers to a file or directory on HDFS. After issuing this command, the data resides in the /user/<username>/data directory in HDFS. Now we can run the program again:
 +
 +
 +
% hadoop jar target/hadoop-drdobbs-1.0.0.jar \
 +
com.tom_e_white.drdobbs.mapreduce.AggregateJob data output
 +
 +
This command assumes that the client Hadoop configuration files are set up appropriately to point to your cluster. Configuration files are found in the Hadoop installation's conf directory or in /etc/hadoop/conf. The two relevant settings here are:
 +
 +
fs.default.name to specify the HDFS namenode URL (in core-site.xml)
 +
mapred.job.tracker to specify the MapReduce jobtracker host and port (in mapred-site.xml)
 +
 +
It's possible to specify these settings on the command line, too, which is handy if you ever need to switch between clusters:
 +
 +
 +
% hadoop jar target/hadoop-drdobbs-1.0.0.jar \
 +
  com.tom_e_white.drdobbs.mapreduce.AggregateJob \
 +
  -D fs.default.name=hdfs://localhost:8020 \
 +
  -D mapred.job.tracker=localhost:8021 \
 +
  data output
 +
 +
Note that a space is required between the -D argument and the key-value pair (in contrast to specifying Java system properties).
 +
 +
The output is in HDFS, so we use the hadoop fs command again to view it:
 +
 +
% hadoop fs -cat output/part-r-00000
 +
dobbs 42
 +
doctor 1214191
 +
 +
==A Multi-Node Cluster==
 +
 +
Running on a multi-node cluster is no different than running on a single-node cluster, although it is wise to set the number of reducers to something higher than the default (one). On an otherwise unused cluster, the number of reducers can be set to the number of reduce slots in the whole cluster (that is, the number of task trackers × number of reduce task slots per task tracker). In practice, and particularly on a heavily used cluster, you should set the number of reducers to a smaller fraction of the cluster.
 +
 +
I ran the job over the full dataset using four reducers with the following command (the cluster settings were in core-site.xml and mapred-site.xml, so they were not needed on the command line):
 +
 +
 +
% hadoop jar target/hadoop-drdobbs-1.0.0.jar \
 +
  com.tom_e_white.drdobbs.mapreduce.AggregateJob \
 +
  -D mapred.reduce.tasks=4 \
 +
  data output
 +
% hadoop fs -cat output/part-r-00000 | head -5
 +
!  202142786
 +
""""    3312106937
 +
$0.00005    52
 +
$0.0003 111
 +
$0.0007 117
 +
 +
The output is sorted by key (within each reduce partition); so at the beginning of the file, we get 1-grams that start with punctuation. In the next installment of the series, which runs next week, we'll write some more complex queries using higher-level languages and take a look at real-world Hadoop clusters and applications.
 +
  
  
Line 157: Line 287:
  
 
* http://www.drdobbs.com/database/hadoop-writing-and-running-your-first-pr/240153197
 
* http://www.drdobbs.com/database/hadoop-writing-and-running-your-first-pr/240153197
 +
* http://www.drdobbs.com/database/hadoop-writing-and-running-your-first-pr/240153197?pgno=2

Revision as of 08:04, 6 November 2015

Sumber: http://www.drdobbs.com/database/hadoop-writing-and-running-your-first-pr/240153197




Hadoop: Writing and Running Your First Project


MapReduce on small datasets can be run easily and without much coding or fiddling — provided you know what to do. Here's how.

In the first part of this series on Apache Hadoop, I explained how MapReduce works at a conceptual level. In this installment, the second of three, I show how to write code that runs on Hadoop — starting with a MapReduce program in Java. Development Environment

To get started, we need Java (Oracle JDK 6 is required), Git, Maven, and Hadoop itself. Download the latest stable release of Apache Hadoop (1.0.4) from the releases page, then extract it to a suitable place. On my laptop:

% tar zxf hadoop-1.0.4.tar.gz
% export HADOOP_HOME=$(pwd)/hadoop-1.0.4
% $HADOOP_HOME/bin/hadoop version
Hadoop 1.0.4

In another directory, checkout the Git repository that accompanies this article:

% git clone git://github.com/tomwhite/hadoop-drdobbs.git
% cd hadoop-drdobbs
% mvn install

The repository contains a small amount of sample data for testing:

% cat data/*.tsv
dobbs	2007	20	18	15
dobbs	2008	22	20	12
doctor	2007	545525	366136	57313
doctor	2008	668666	446034	72694

The file contains a few lines from the Google Books Ngram Dataset, which I mentioned in the first part of the series. To recap, the first line says that the word "dobbs" in books from 2007 occurred 20 times overall, and these occurrences were found on 18 pages in 15 books.

Java MapReduce

Let's write the MapReduce job to find the total count for each word. We start with the map function, which is represented in Java by an instance of org.apache.hadoop.mapreduce.Mapper. The first thing we need to decide about our mapper is the types of the input key-value pairs and the output key-value pairs. The declaration of the Mapper class is:

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>

Because we are processing text, we use TextInputFormat, which determines the input types for us as LongWritable and Text (both found in the org.apache.hadoop.io package). These Writable types are wrappers around standard Java types (long and String, in this case) optimized for efficiency of serialization. Authors of MapReduce programs can use the Writable types without worrying about serialization. The only time that you might be exposed to serialization is when writing a custom Writable type. In such cases, it is usually better to use a serialization library, such as Avro.

Going back to the input type, TextInputFormat presents the input to our mapper as (LongWritable, Text) pairs, like this:

(0,  "dobbs	2007	20	18	15")
(20, "dobbs	2008	22	20	12")
(40, "doctor	2007	545525	366136	57313")
(72, "doctor	2008	668666	446034	72694")

The key is the offset within the file, and the value is the content of the line. It is the job of the mapper to extract the word and the number of occurrences, and ignore everything else. Therefore, its output is (word, count) pairs, of type (Text, LongWritable). The signature of the mapper looks like this:

public class ProjectionMapper extends Mapper<LongWritable, Text, Text, LongWritable>

All that remains is for us to write the implementation of the map() method. The source for the whole mapper class appears in Listing One (ProjectionMapper.java).

Listing One: ProjectionMapper.java.


package com.tom_e_white.drdobbs.mapreduce;
  
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
 
import java.io.IOException;
 
public class ProjectionMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
  private Text word = new Text();
  private LongWritable count = new LongWritable();
 
  @Override
  protected void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {
    // value is tab separated values: word, year, occurrences, #books, #pages
    // we project out (word, occurrences) so we can sum over all years
    String[] split = value.toString().split("\t+");
    word.set(split[0]);
    if (split.length > 2) {
      try {
        count.set(Long.parseLong(split[2]));
        context.write(word, count);
      } catch (NumberFormatException e) {
        // cannot parse - ignore
      }
    }
  }
}

There are a few things to note about this code. First, there are two instance variables, word and count, which are used to store the map output key and value. The map() method is called once per input record, so it pays to avoid unnecessary object creation. The body of map() is straightforward: It splits the tab-separated input line into fields, and uses the first field as the word, and the third as the count. The map output is written using the write method in Context. For simplicity, this code ignores lines with an occurrence field that is not a number, but there are other actions you could take, such as incrementing a MapReduce counter to track how many lines it affects (see the getCounter() method on Context for details).

Running through our tiny dataset, the map output looks like this:

("dobbs", 20)
("dobbs", 22)
("doctor", 545525)
("doctor", 668666)

As I discussed in the first part of the series, Hadoop transforms the map output so that the values are brought together for a given key, in a process called the shuffle. In our abstract representation, the input to the reduce step looks like this:

("dobbs", [20, 22])
("doctor", [545525, 668666])

All our reduce implementation has to do is sum the counts. We need an implementation of org.apache.hadoop.mapreduce.Reducer with the following signature:

public class LongSumReducer extends Reducer<
       Text, LongWritable, Text, LongWritable>

We could write the class ourselves, but we don't need to because Hadoop comes with an implementation, shown in Listing Two (LongSumReducer.java). Listing Two LongSumReducer.java (code from Apache Hadoop project).


package org.apache.hadoop.mapreduce.lib.reduce;
 
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Reducer;
 
public class LongSumReducer<KEY> extends Reducer<KEY, LongWritable,
                                                 KEY,LongWritable> {
 
  private LongWritable result = new LongWritable();
 
  public void reduce(KEY key, Iterable<LongWritable> values,
                     Context context) throws IOException, InterruptedException {
    long sum = 0;
    for (LongWritable val : values) {
      sum += val.get();
    }
    result.set(sum);
    context.write(key, result);
  }
 
}

Notice that the reduce() method signature is different from map() because it has an iterator over the values, rather than a single value. This reflects the grouping that the framework performs on the values for a key. In LongSumReducer, the implementation is very simple: It sums the values, then writes the total out using the same key as the input.

The output of the reducer will be:

("dobbs", 42)
("doctor", 1214191)


MapReduce on small datasets can be run easily and without much coding or fiddling — provided you know what to do. Here's how.

Before we run our job, we need some driver code to wire up the mapper and reducer, which we do using the AggregateJob class, shown Listing Three (AggregateJob.java). Listing Three: AggregateJob.java.

package com.tom_e_white.drdobbs.mapreduce;
 
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
 
public class AggregateJob extends Configured implements Tool {
 
  @Override
  public int run(String[] args) throws Exception {
    Job job = new Job(getConf());
    job.setJarByClass(getClass());
    job.setJobName(getClass().getSimpleName());
 
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
 
    job.setMapperClass(ProjectionMapper.class);
    job.setCombinerClass(LongSumReducer.class);
    job.setReducerClass(LongSumReducer.class);
 
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(LongWritable.class);
  
    return job.waitForCompletion(true) ? 0 : 1;
  }
 
  public static void main(String[] args) throws Exception {
    int rc = ToolRunner.run(new AggregateJob(), args);
    System.exit(rc);
  }
}

AggregateJob extends Hadoop's Configured class and implements the Tool interface's run() method. Doing this allows us to pass in configuration options at runtime, which can be very handy, as we'll see shortly.

The code to launch a job is in the run() method, and it is managed by the Job class from the org.apache.hadoop.mapreduce package. The Job instance specifies various things about the job: a name for display purposes, the mapper and reducer classes that we discussed earlier, and the job output types, which have to match the mapper and reducer output types. (In case your map output types are different from your reduce output types, you should also call setMapOutputKeyClass() and setMapOutputValueClass().) There are a few other things that we set that are less obvious.

The call to setJarByClass() is needed because MapReduce is a distributed system, and Hadoop needs to know which JAR file to ship to the nodes in the cluster running the map and reduce tasks. By calling this method, we tell Hadoop to look for the local JAR that contains the AggregateJob class. As long as the JAR contains both AggregateJob and ProjectionMapper, then the job will run, as there are no other dependencies. (If you need to ship third-party libraries, you can use the -libjars command-line flag, which takes a comma-separated list of local JAR file names.)

We also need to tell the job what input data to process and where to place the output. We do this via the static APIs on FileInputFormat and FileOutputFormat, using positional command-line arguments to specify the file paths.

The last line of the run() method launches the job and waits for it to complete. While it is running, it prints the progress on the console. Running the Job

Let's run the program:


% $HADOOP_HOME/bin/hadoop jar target/hadoop-drdobbs-1.0.0.jar \
  com.tom_e_white.drdobbs.mapreduce.AggregateJob data output

The hadoop jar command is a convenient way to run Hadoop programs because it adds all the Hadoop JARs onto the client classpath. It takes the JAR file and the main class as arguments and passes the remaining arguments to the program. In this case, they are interpreted as the input and output directories. When the program has completed, we can look at the output. Note that both the input and output is on the local filesystem by default in Hadoop.

% cat output/part-r-00000 
dobbs	42
doctor	1214191

The output filenames are named by the reducer task number that produced them. Because we only ran a single reducer (the default), we only have a single output file: part-r-00000. Happily, it contains the output that we expected: the tab-separated word counts.

A Single-Node Cluster

We don't need to make any changes to the code to run the same program against a Hadoop cluster. We do need to load the data into HDFS, though, and tell the program which cluster to run against, so let's see how to do that.

The first thing we need is to find a suitable cluster. If you don't have access to a Hadoop cluster, you can install a single-node cluster (by following the Apache Hadoop instructions or using a Hadoop VM, which several Hadoop vendors provide for free).

Instead of running against data on the local filesystem, we will run against HDFS. Let's copy the sample data from the local filesystem to HDFS using the hadoop fs command:

% hadoop fs -copyFromLocal data data

The -copyFromLocal subcommand takes two or more arguments: All but the first refer to files or directories on the local filesystem, and the last refers to a file or directory on HDFS. After issuing this command, the data resides in the /user/<username>/data directory in HDFS. Now we can run the program again:


% hadoop jar target/hadoop-drdobbs-1.0.0.jar \
com.tom_e_white.drdobbs.mapreduce.AggregateJob data output

This command assumes that the client Hadoop configuration files are set up appropriately to point to your cluster. Configuration files are found in the Hadoop installation's conf directory or in /etc/hadoop/conf. The two relevant settings here are:

fs.default.name to specify the HDFS namenode URL (in core-site.xml)
mapred.job.tracker to specify the MapReduce jobtracker host and port (in mapred-site.xml)

It's possible to specify these settings on the command line, too, which is handy if you ever need to switch between clusters:


% hadoop jar target/hadoop-drdobbs-1.0.0.jar \
  com.tom_e_white.drdobbs.mapreduce.AggregateJob \
  -D fs.default.name=hdfs://localhost:8020 \
  -D mapred.job.tracker=localhost:8021 \
  data output

Note that a space is required between the -D argument and the key-value pair (in contrast to specifying Java system properties).

The output is in HDFS, so we use the hadoop fs command again to view it:

% hadoop fs -cat output/part-r-00000 
dobbs	42
doctor	1214191

A Multi-Node Cluster

Running on a multi-node cluster is no different than running on a single-node cluster, although it is wise to set the number of reducers to something higher than the default (one). On an otherwise unused cluster, the number of reducers can be set to the number of reduce slots in the whole cluster (that is, the number of task trackers × number of reduce task slots per task tracker). In practice, and particularly on a heavily used cluster, you should set the number of reducers to a smaller fraction of the cluster.

I ran the job over the full dataset using four reducers with the following command (the cluster settings were in core-site.xml and mapred-site.xml, so they were not needed on the command line):


% hadoop jar target/hadoop-drdobbs-1.0.0.jar \
  com.tom_e_white.drdobbs.mapreduce.AggregateJob \
  -D mapred.reduce.tasks=4 \
  data output
% hadoop fs -cat output/part-r-00000 | head -5
!   202142786
""""    3312106937
$0.00005    52
$0.0003 111
$0.0007 117

The output is sorted by key (within each reduce partition); so at the beginning of the file, we get 1-grams that start with punctuation. In the next installment of the series, which runs next week, we'll write some more complex queries using higher-level languages and take a look at real-world Hadoop clusters and applications.





Referensi