레이블이 예제인 게시물을 표시합니다. 모든 게시물 표시
레이블이 예제인 게시물을 표시합니다. 모든 게시물 표시

2017년 11월 2일 목요일

html 파일 읽어서 파싱 후 원하는 데이터만 파일로 출력하는 MapReduce 프로그램

참고

HTML 파싱
https://github.com/shsdev/hocr-parser-hadoopjob/blob/master/src/main/java/eu/scape_project/tb/lsdr/hocrparser/HocrParser.java

Jsoup
http://partnerjun.tistory.com/42

Hadoop InputFormat
하둡 완벽 가이드 4판 - 파일의 전체 내용을 하나의 레코드로 처리하기
https://github.com/tomwhite/hadoop-book/blob/master/ch08-mr-types/src/main/java/WholeFileInputFormat.java

동기

하둡을 이용해서 네이버 블로그(모바일 버전)를 다운 받아서 해당 페이지에서 필요한 내용을 파싱하고 싶었다. 그래서 맵리듀스로 html 파싱이 가능한지 찾아봤는데, 아주 조금의 예제들이 나왔다. 그런데 map 함수에서 뭔가를 다 처리하고 있어서 이쁘게 보이지 않아서 custom InputFormat을 만들어서 사용하는 법을 알아봤다. 다행히 하둡 완벽 가이드에 적절한 예제가 있어서 두 가지를 조합에서 간단한 예제 프로그램을 만들었다.

준비물

모바일 네이버 블로그 게시물 html 파일
http://m.blog.naver.com/Recommendation.nhn 에 들어가서 맘에 드는 게시물의 URL을 복사한다. 복사한 URL로 그대로 wget으로 받으려 하면 엉뚱한 페이지가 다운 받아 진다. 그래서 URL은 https://m.blog.naver.com/PostView.nhn?blogId=xxxx&logNo=xxxx 를 치고 들어가면 나오는 https://m.blog.naver.com/id/post_no 를 wget으로 다운 받으면 된다.
적당히 5 ~ 6 개 정도 다운 받아서 디렉터리 통째로 HDFS에 올린다.

프로그램 작성

pom.xml
<dependencies>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.3</version>
        </dependency>

        <dependency>
            <groupId>org.jsoup</groupId>
            <artifactId>jsoup</artifactId>
            <version>1.10.3</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.1.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>
                            jar-with-dependencies
                        </descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id> <!-- this is used for inheritance merges -->
                        <phase>package</phase> <!-- bind to the packaging phase -->
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

HtmlFileInputFormat.java
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import java.io.IOException;

/**
 * HTML 파일을 통째로 읽는 FileInputFormat
 */
public class HtmlFileInputFormat extends FileInputFormat<NullWritable, Text> {

    /* 파일을 통째로 읽어야 되기 때문에 split 되지 않게 함 */
    protected boolean isSplitable(JobContext context, Path file) {
        return false;
    }

    public RecordReader<NullWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        HtmlFileRecordReader reader = new HtmlFileRecordReader();
        reader.initialize(split, context);
        return reader;
    }
}

HtmlFileRecordReader.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;

public class HtmlFileRecordReader extends RecordReader <NullWritable, Text> {

    private FileSplit fileSplit;
    private Configuration conf;
    private Text value = new Text();
    private boolean processed = false;

    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        this.fileSplit = (FileSplit) split;
        this.conf = context.getConfiguration();
    }

    public boolean nextKeyValue() throws IOException, InterruptedException {
        if(!processed) {
            byte [] contents = new byte[(int) fileSplit.getLength()];
            Path file =  fileSplit.getPath();
            FileSystem fs = file.getFileSystem(conf);
            FSDataInputStream in = null;
            try {
                in = fs.open(file);
                IOUtils.readFully(in, contents, 0, contents.length);
                InputStream inputStream = new ByteArrayInputStream(contents);
                Document doc = Jsoup.parse(inputStream, "UTF-8", "http://home.yoursite.com");
                value.set(doc.html());
            } finally {
                IOUtils.closeStream(in);
            }
            processed = true;
            return true;
        }
        return false;
    }

    public NullWritable getCurrentKey() throws IOException, InterruptedException {
        return NullWritable.get();
    }

    public Text getCurrentValue() throws IOException, InterruptedException {
        return value;
    }

    public float getProgress() throws IOException, InterruptedException {
        return processed ? 1.0f : 0.0f;
    }

    public void close() throws IOException {

    }
}

JobBuilder.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;

import java.io.IOException;

/**
 * Hadoop Job 생성
 * 1. 파라미터 개수 체크
 * 2. 설정, Job 이름 받아서 Job 생성
 * 3. 입/출력 데이터 위치 지정
 * 4. 출력 위치에 데이터가 있으면 삭제
 */
public class JobBuilder {
    public static Job parseInputAndOutput(Tool tool, Configuration conf, String[] args, String jobName) throws IOException {

        /* 파라미터 개수 체크 */
        if (args.length != 2) {
            printUsage(tool, "<input> <output>");
            return null;
        }

        /* 설정, Job 이름 받아서 Job 생성 */
        Job job = Job.getInstance(conf, jobName);
        job.setJarByClass(tool.getClass());

        /* 입/출력 데이터 위치 지정 */
        Path inputPath = new Path(args[0]);
        Path outputPath = new Path(args[1]);
        FileInputFormat.addInputPath(job, inputPath);
        FileOutputFormat.setOutputPath(job, outputPath);

        /* 출력 위치에 데이터가 있으면 삭제 */
        outputPath.getFileSystem(conf).delete(outputPath,true);

        return job;
    }

    /* 사용법 출력 */
    public static void printUsage(Tool tool, String extraArgsUsage) {
        System.err.printf("Usage: %s [genericOptions] %s\n\n", tool.getClass().getSimpleName(), extraArgsUsage);
        GenericOptionsParser.printGenericCommandUsage(System.err);
    }
}


HtmlParserJob.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * HTML 파일을 읽어서 DOM 파싱 하는 Hadoop Job
 */
public class HtmlParserJob extends Configured implements Tool {

    public int run(String[] args) throws Exception {

        /* 설정 가져와서 Job 생성 */
        Configuration conf = getConf();
        String jobName = new String("htmlparser");
        Job job = JobBuilder.parseInputAndOutput(this, conf, args, jobName);
        if (job == null) {
            return -1;
        }

        /* 입출력 포맷 클래스 지정 */
        job.setInputFormatClass(HtmlFileInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        /* 출력 Key/Value 클래스 지정 */
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        /* Mapper / Reducer 클래스 지정 */
        job.setMapperClass(HtmlParserMapper.class);

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new HtmlParserJob(), args);
        System.exit(res);
    }

}


HtmlParserMapper.java
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;

import java.io.IOException;

/**
 * HTML 파일 받아서 DOM을 파싱해서 원하는 데이터를 뽑아 내는 Mapper
 * http://m.blog.naver.com/id/post_no 의 데이터를 받아와서 파싱
 * Input Key (NullWritable) : 없음
 * Input Value (Text) : HTML 파일 전체
 * Output Key (Text) : 블로그 URL
 * Output Value (Text) : 태그 리스트 (태그 태그 태그 ...)
 */
public class HtmlParserMapper extends Mapper<NullWritable, Text, Text, Text> {

    @Override
    protected void map(NullWritable key, Text value, Context context) throws IOException, InterruptedException {

        /* Jsoup 문서 생성 */
        Document doc = Jsoup.parse(value.toString());

        /**
         * 블로그 URL 파싱
         * <meta property="og:url" content="http://blog.naver.com/id/post_no">
         * 형태의 데이터에서 content의 데이터를 파싱해 온다.
         */
        Element ogUrl = doc.select("meta[property=og:url]").first();
        String ogUrlContent = ogUrl.attr("content");
        Text textKey = new Text(ogUrlContent);

        /**
         * 태그 파싱
         * <div class="post_tag">
         *     <ul>
         *         <li>
         *             <a>
         *                 <span>태그</span>
         *                 <span>태그</span>
         *                 <span>태그</span>
         *             </a>
         *         </li>
         *     </ul>
         * </div>
         * span 태그 사이의 텍스트 파싱
         */
        Elements postTags = doc.getElementsByClass("post_tag").first().select("span");

        String postTagValues = new String();

        /* span 리스트를 돌면서 텍스트를 뽑아내서 문자열로 이어 붙임 */
        for(Element postTag : postTags) {
            postTagValues += postTag.ownText() + " ";
        }

        Text textValue = new Text(postTagValues);

        context.write(textKey, textValue);
    }

}

결과물
http://blog.naver.com/xxxxxx/xxxxxx      #태그1
http://blog.naver.com/xxxxxx/xxxxxx      #태그1 #태그2 #태그3 #태그4 #태그5
http://blog.naver.com/xxxxxx/xxxxxx   #태그1 #태그2
http://blog.naver.com/xxxxxx/xxxxxx      #태그1
http://blog.naver.com/xxxxxx/xxxxxx     #태그1 #태그2 #태그3
http://blog.naver.com/xxxxxx/xxxxxx     #태그1

이런식으로 블로그 URL과 태그 목록이 나온다.

2017년 10월 27일 금요일

Hadoop 출력을 Elastic Search에 저장하기

참고 사이트

http://wpcertification.blogspot.kr/2014/05/using-elasticsearch-to-store-output-of.html

우선 Elastic Search를 설치한다.
http://modoleesi.blogspot.kr/2017/10/elastic-search.html

연동을 위해서는 Elastic에서 제공하는 ES-Hadoop 라이브러리가 있다.
https://www.elastic.co/kr/products/hadoop

라이브러리 다운받고 압축을 푼다.
압축을 풀고 dist 디렉터리에 들어가면 파일이 엄청 많이 있는데, 그 중에서 elasticsearch-hadoop-mr-5.6.3.jar 를 사용할 것이다.

해당 파일을 lib 파일을 저장할 곳에 복사한다.
나는 $ES_HOME/lib/에 저장을 하고 있다.
cp elasticsearch-hadoop-mr-5.6.3.jar $ES_HOME/lib/

환경변수

vi $HOME/.bashrc

#ELASTIC-HADOOP
export ES_HADOOP_MR_LIB=$ES_HOME/lib/elasticsearch-hadoop-mr-5.6.3.jar
export LIBJARS=$ES_HADOOP_MR_LIB
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$ES_HADOOP_MR_LIB

source $HOME/.bashrc

hadoop classpath
명령을 쳐서 정상적으로 추가됐는지 확인한다.

그리고 이제 wordcount 예제를 수정해서 결과를 ElasticSearch에 저장해보자.
EMR 최신버전(5.9)가 Hadoop 2.7.3 버전을 지원해서 지금부터는 아마 2.7.3 으로 작성할 것 같다.

pom.xml
<dependencies>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.3</version>
        </dependency>

        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-hadoop-mr</artifactId>
            <version>5.6.3</version>
        </dependency>

    </dependencies>

EsHadoopJob.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.elasticsearch.hadoop.mr.EsOutputFormat;

public class EsHadoopJob extends Configured implements Tool {

    public int run(String[] args) throws Exception {

        // Verify the number of parameters
        if (args.length != 2) {
            System.err.printf("Usage : %s [generic options] <input> <output>\n", getClass().getSimpleName());
            ToolRunner.printGenericCommandUsage(System.err);
            return -1;
        }

        // Create a configuration
        Configuration conf = getConf();

        // Configuration for ES-Haoop connector
        conf.set("es.nodes","localhost:9200");
        conf.set("es.resource", "hadoop/wordcount");

        // Create a job from the default configuration that will use the wordcount class
        Job job = Job.getInstance(conf, "wordcount");

       // for ES-HADOOP
        job.setSpeculativeExecution(false);

        // Configure the job: name, mapper, reducer, and combiner
        job.setJarByClass(EsHadoopJob.class);
        job.setMapperClass(EsHadoopMapper.class);
       //  job.setCombinerClass(EsHadoopReducer.class); 
        job.setReducerClass(EsHadoopReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // Configure the input/output format and key, value class
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(MapWritable.class);

        // OutputFormatClass for ES-Haoop connector
        job.setOutputFormatClass(EsOutputFormat.class);

        // Define our input path as the first command line argument and our output path as the second
        Path inputPath = new Path(args[0]);
        Path outputPath = new Path(args[1]);

        // Create File Input/Output formats for these paths (in the job)
        FileInputFormat.addInputPath(job, inputPath);
        FileOutputFormat.setOutputPath(job, outputPath);

        // Deleting the context path automatically from hdfs so tha we don't have delete it explicitly
        outputPath.getFileSystem(conf).delete(outputPath,true);

        // Run the job
        return (job.waitForCompletion(true) ? 0 : 1);
    }


    public static void main(String[] args) throws Exception {
        // Start the WordCount MapReduce application
        int res = ToolRunner.run(new Configuration(), new EsHadoopJob(), args);
        System.exit(res);
    }
}


EsHadoopMapper.java -> 바꿀 것 없음

EsHadoopReducer.java
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.Iterator;

public class EsHadoopReducer extends Reducer<Text, IntWritable, Text, MapWritable> {

    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }

       private MapWritable result = new MapWritable();
        result.put(key, new IntWritable(sum));
        context.write(key, result);
    }
}


Elastic Search Index 생성

index 리스트를 조회해 보고, 기존에 index가 있을 경우 삭제하고, 새로 생성하며 field의 limit을 4000으로 설정한다. (설정 안 하면 에러가 발생할 수 있어서 우선 설정한다. 에러는 밑에 정리되어 있다.)
curl -XGET 'localhost:9200/_cat/indices?v&pretty'

curl -XDELETE http://localhost:9200/hadoop

curl -XPUT 'localhost:9200/hadoop?pretty'

curl -XPUT 'localhost:9200/hadoop/_settings' -d '
{
    "index.mapping.total_fields.limit": 4000
}'

빌드 후 실행

mvn clean install

hadoop jar es-hadoop-1.0-SNAPSHOT.jar EsHadoopJob -libjars elasticsearch-hadoop-mr-5.6.3.jar input output

데이터 확인

http://localhost:9200/hadoop/_search?pretty=true&q=*:*&size=999

이렇게 브라우저에 입력하면 인덱스에 입력된 모든 데이터 중 999개를 출력해준다.

발생했던 에러
  1. es.resource 를 지정하지 않았다는 에러
    : 지정하는 위치가 문제. job 생성시 Job.getInstance(conf, "wordcount"); 이렇게 conf를 사용하는데, conf 값 지정을 그 밑에서 하고 있었음. 그래서 위로 올렸더니 정상 동작
  2. classpath에 넣어 놓은 라이브러리를 찾지 못한다고 (-libjars 명령 안 먹힘)
    : job 생성 시  conf를 new Configuration()을 했었는데, 그걸 getConf()로 바꿈
  3. wrong value class: class org.apache.hadoop.io.MapWritable is not class org.apache.hadoop.io.IntWritable
    : 이거는 Combiner 클래스를 그대로 사용해서 문제가 발생. combiner 클래스의 output이 mapper의 output과 같아야 하는데 combiner를 reducer와 같게 지정해 놔서 IntWritable이 나와야 될 자리에 MapWritable이 나와서 발생한 에러
  4. failed to execute bulk item (index) BulkShardRequest [[hadoop][1]] containing [232] requests org.elasticsearch.index.mapper.MapperParsingException: failed to parse -> elastic search 디렉터리에 log 디렉터리의 elasticsearch.log에 자세히 적혀 있음
    object field starting or ending with a [.] makes object resolution ambiguous: [survive...that]
    : field 이름이 .으로 시작하고 끝나는데 field 이름을 survive...that으로 하려니 ... 에러가 발생
    이건 wordcount에서 survice...that을 한 단어로 봐서 생긴일 그래서 그냥 원본 파일에서 ... 제거 함. 우선은 테스트 해 보는게 목적이니깐. 나중에는 field 명에 . 을 넣는일은 없을테니깐
  5. Limit of total fields [1000] in index [hadoop] has been exceeded
    : 필드 생성 개수 초과
    index 삭제하고 다시 만들어서 한계치를 올려준다.
    curl -XDELETE http://localhost:9200/hadoop
    curl -XPUT 'localhost:9200/hadoop?pretty'
    curl -XPUT 'localhost:9200/hadoop/_settings' -d '
    {
        "index.mapping.total_fields.limit": 4000
    }'
    

2017년 10월 25일 수요일

MapReduce 프로그래밍 예제 : 날씨 데이터


위 동영상을 참고하면서 예제를 따라 했다. 34:27 부터 해당 예제 설명이 나온다.
날씨 데이터를 다운 받아서 최저 / 최고 기온을 추출한 후 Hot / Cold와 날짜 그리고 기온을 최종 출력하는 프로그램이다. map 함수에서 파싱을 하고 reduce 함수는 그냥 아무런 역할을 안한다. (동영상에서 reduce 함수 코드를 확인하지 못해서 그냥 임의로 작성한 것이다. 아무런 역할을 안하는 reduce는 identity reducer를 이용하면 된다고 본 것 같은데 확인봐야 될 듯...)

데이터 다운로드 위치 : ftp://ftp.ncdc.noaa.gov/pub/data/uscrn/products/daily01/

개발환경 세팅 참고 : http://modoleesi.blogspot.kr/2017/09/blog-post_28.html

input 예시

23907 20150101  2.423  -98.08   30.62     2.2    -0.6     0.8     0.9     7.0     1.47 C     3.7     1.1     2.5    99.9    85.4    97.2   0.369   0.308 -99.000 -99.000 -99.000     7.0     8.1 -9999.0 -9999.0 -9999.0

pom.xml

<dependencies>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.4</version>
        </dependency>

    </dependencies>

MaxTempJob.java

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MaxTempJob extends Configured implements Tool {

    public int run(String[] args) throws Exception {

        // Verify the number of parameters
        if (args.length != 2) {
            System.err.printf("Usage : %s [generic options] <input> <output>\n", getClass().getSimpleName());
            ToolRunner.printGenericCommandUsage(System.err);
            return -1;
        }

        // Create a configuration
        Configuration conf = getConf();

        // Create a job from the default configuration that will use the MaxTempJob class
        Job job = Job.getInstance(conf, "weather example");

        // Configure the job: name, mapper, reducer, and combiner
        job.setJarByClass(MaxTempJob.class);
        job.setMapperClass(MaxTempMapper.class);
        job.setReducerClass(MaxTempReducer.class);
        job.setCombinerClass(MaxTempReducer.class);

        // Configure the input/output format and key, value class
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        // Define our input path as the first command line argument and our output path as the second
        Path inputPath = new Path(args[0]);
        Path outputPath = new Path(args[1]);

        // Create File Input/Output formats for these paths (in the job)
        FileInputFormat.addInputPath(job, inputPath);
        FileOutputFormat.setOutputPath(job, outputPath);

        // Deleting the context path automatically from hdfs so tha we don't have delete it explicitly
        outputPath.getFileSystem(conf).delete(outputPath,true);

        // Run the job
        job.waitForCompletion(true);

        return 0;
    }

    public static void main(String[] args) throws Exception
    {
        // Start the WordCount MapReduce application
        int res = ToolRunner.run( new Configuration(), new MaxTempJob(), args );
        System.exit( res );
    }
}

MaxTempMapper.java

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;


import java.io.IOException;

public class MaxTempMapper extends Mapper<LongWritable, Text, Text, Text> {

    public static final int MISSING = 9999;

   @Override
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        String line = value.toString();

        if(!(line.length() == 0)) {
            String date = line.substring(6, 14);

            float maxTemp = Float.parseFloat(line.substring(39, 45).trim());
            float minTemp = Float.parseFloat(line.substring(47, 52).trim());

            if(maxTemp > 35.0 && maxTemp != MISSING) {
                context.write(new Text("Hot Day " + date), new Text(String.valueOf(maxTemp)));
            }

            if(minTemp > 10.0 && minTemp != MISSING) {
                context.write(new Text("Cold Day " + date), new Text(String.valueOf(minTemp)));
            }
        }
    }
}

MaxTempReducer.java

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.Iterator;

public class MaxTempReducer extends Reducer<Text, Text, Text, Text> {

   @Override
    public void reduce(Text key, Iterator<Text> values, Context context) throws IOException, InterruptedException {

        while(values.hasNext()) {
            Text value = values.next();
            context.write(key, value);
        }
    }
}


output 예시

Cold Day 20150128 13.0
Cold Day 20150208 12.0
Cold Day 20150209 13.0
Cold Day 20150210 12.0
...
Hot Day 20150907 36.5
Hot Day 20150908 35.5

2017년 9월 29일 금요일

MapReduce 프로그램 개발환경 세팅

Maven 설치
sudo apt-get install maven

IntelliJ IDEA 설치
http://home.zany.kr:9003/board/bView.asp?bCode=11&aCode=13879&cBlock=0&cPageNo=1&sType=0&sString=

  1. 다운로드
    https://www.jetbrains.com/idea/download/download-thanks.html?platform=linux&code=IIC
    wget https://download.jetbrains.com/idea/ideaIC-2017.2.5.tar.gz
  2. 압축 해제
    tar zxvf i
    deaIC-2017.2.5.tar.gz
  3. 실행
    bin/idea.sh 실행한 후 기본 세팅을 하면서 icon 만드는 옵션이 있다.


WordCount 예제 실행
http://115.137.34.186/hadoop_wordcount_example/


해당 예제에서는 main 함수가 들어 있는 class를 그냥 구현했는데, hadoop generic option(-conf, -D -fs, -jt, -files, -libjars, -archives)를 사용하려면 ToolRunner를 사용해야 한다.


WordCountJob.java

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class WordCountJob extends Configured implements Tool {

    public int run(String[] args) throws Exception {

        // Verify the number of parameters
        if (args.length != 2) {
            System.err.printf("Usage : %s [generic options] <input> <output>\n", getClass().getSimpleName());
            ToolRunner.printGenericCommandUsage(System.err);
            return -1;
        }

        // Create a configuration
        Configuration conf = getConf();

        // Create a job from the default configuration that will use the MaxTempJob class
        Job job = Job.getInstance(conf, "wordcount");

        // Configure the job: name, mapper, reducer, and combiner
        job.setJarByClass(WordCountJob.class);
        job.setMapperClass(WordCountMapper.class);
        job.setCombinerClass(WordCountReducer.class);
        job.setReducerClass(WordCountReducer.class);

        // Configure the input/output format and key, value class
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //job.setInputFormatClass(TextInputFormat.class);
        //job.setOutputFormatClass(TextOutputFormat.class);

        // Define our input path as the first command line argument and our output path as the second
        Path inputPath = new Path(args[0]);
        Path outputPath = new Path(args[1]);

        // Create File Input/Output formats for these paths (in the job)
        FileInputFormat.addInputPath(job, inputPath);
        FileOutputFormat.setOutputPath(job, outputPath);

        // Deleting the context path automatically from hdfs so tha we don't have delete it explicitly
        outputPath.getFileSystem(conf).delete(outputPath,true);

        // Run the job
        return (job.waitForCompletion(true) ? 0 : 1);
    }


    public static void main(String[] args) throws Exception {
        // Start the WordCount MapReduce application
        int res = ToolRunner.run(new Configuration(), new WordCountJob(), args);
        System.exit(res);
    }
}


WordCountMapper.java

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.StringTokenizer;

public class WordCountMapper extends Mapper<Object, Text, Text, IntWritable> {

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

   @Override
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        StringTokenizer itr = new StringTokenizer(value.toString());
        while (itr.hasMoreTokens()) {
            word.set(itr.nextToken());
            context.write(word, one);
        }
    }
}

WordCountReducer.java

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.Iterator;

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    private IntWritable result = new IntWritable();

   @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        result.set(sum);
        context.write(key, result);
    }
}