2017년 9월 29일 금요일

MapReduce 프로그램 개발환경 세팅

Maven 설치
sudo apt-get install maven

IntelliJ IDEA 설치
http://home.zany.kr:9003/board/bView.asp?bCode=11&aCode=13879&cBlock=0&cPageNo=1&sType=0&sString=

  1. 다운로드
    https://www.jetbrains.com/idea/download/download-thanks.html?platform=linux&code=IIC
    wget https://download.jetbrains.com/idea/ideaIC-2017.2.5.tar.gz
  2. 압축 해제
    tar zxvf i
    deaIC-2017.2.5.tar.gz
  3. 실행
    bin/idea.sh 실행한 후 기본 세팅을 하면서 icon 만드는 옵션이 있다.


WordCount 예제 실행
http://115.137.34.186/hadoop_wordcount_example/


해당 예제에서는 main 함수가 들어 있는 class를 그냥 구현했는데, hadoop generic option(-conf, -D -fs, -jt, -files, -libjars, -archives)를 사용하려면 ToolRunner를 사용해야 한다.


WordCountJob.java

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class WordCountJob extends Configured implements Tool {

    public int run(String[] args) throws Exception {

        // Verify the number of parameters
        if (args.length != 2) {
            System.err.printf("Usage : %s [generic options] <input> <output>\n", getClass().getSimpleName());
            ToolRunner.printGenericCommandUsage(System.err);
            return -1;
        }

        // Create a configuration
        Configuration conf = getConf();

        // Create a job from the default configuration that will use the MaxTempJob class
        Job job = Job.getInstance(conf, "wordcount");

        // Configure the job: name, mapper, reducer, and combiner
        job.setJarByClass(WordCountJob.class);
        job.setMapperClass(WordCountMapper.class);
        job.setCombinerClass(WordCountReducer.class);
        job.setReducerClass(WordCountReducer.class);

        // Configure the input/output format and key, value class
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //job.setInputFormatClass(TextInputFormat.class);
        //job.setOutputFormatClass(TextOutputFormat.class);

        // Define our input path as the first command line argument and our output path as the second
        Path inputPath = new Path(args[0]);
        Path outputPath = new Path(args[1]);

        // Create File Input/Output formats for these paths (in the job)
        FileInputFormat.addInputPath(job, inputPath);
        FileOutputFormat.setOutputPath(job, outputPath);

        // Deleting the context path automatically from hdfs so tha we don't have delete it explicitly
        outputPath.getFileSystem(conf).delete(outputPath,true);

        // Run the job
        return (job.waitForCompletion(true) ? 0 : 1);
    }


    public static void main(String[] args) throws Exception {
        // Start the WordCount MapReduce application
        int res = ToolRunner.run(new Configuration(), new WordCountJob(), args);
        System.exit(res);
    }
}


WordCountMapper.java

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.StringTokenizer;

public class WordCountMapper extends Mapper<Object, Text, Text, IntWritable> {

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

   @Override
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        StringTokenizer itr = new StringTokenizer(value.toString());
        while (itr.hasMoreTokens()) {
            word.set(itr.nextToken());
            context.write(word, one);
        }
    }
}

WordCountReducer.java

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.Iterator;

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    private IntWritable result = new IntWritable();

   @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        result.set(sum);
        context.write(key, result);
    }
}

댓글 없음:

댓글 쓰기