2017년 11월 2일 목요일

html 파일 읽어서 파싱 후 원하는 데이터만 파일로 출력하는 MapReduce 프로그램

참고

HTML 파싱
https://github.com/shsdev/hocr-parser-hadoopjob/blob/master/src/main/java/eu/scape_project/tb/lsdr/hocrparser/HocrParser.java

Jsoup
http://partnerjun.tistory.com/42

Hadoop InputFormat
하둡 완벽 가이드 4판 - 파일의 전체 내용을 하나의 레코드로 처리하기
https://github.com/tomwhite/hadoop-book/blob/master/ch08-mr-types/src/main/java/WholeFileInputFormat.java

동기

하둡을 이용해서 네이버 블로그(모바일 버전)를 다운 받아서 해당 페이지에서 필요한 내용을 파싱하고 싶었다. 그래서 맵리듀스로 html 파싱이 가능한지 찾아봤는데, 아주 조금의 예제들이 나왔다. 그런데 map 함수에서 뭔가를 다 처리하고 있어서 이쁘게 보이지 않아서 custom InputFormat을 만들어서 사용하는 법을 알아봤다. 다행히 하둡 완벽 가이드에 적절한 예제가 있어서 두 가지를 조합에서 간단한 예제 프로그램을 만들었다.

준비물

모바일 네이버 블로그 게시물 html 파일
http://m.blog.naver.com/Recommendation.nhn 에 들어가서 맘에 드는 게시물의 URL을 복사한다. 복사한 URL로 그대로 wget으로 받으려 하면 엉뚱한 페이지가 다운 받아 진다. 그래서 URL은 https://m.blog.naver.com/PostView.nhn?blogId=xxxx&logNo=xxxx 를 치고 들어가면 나오는 https://m.blog.naver.com/id/post_no 를 wget으로 다운 받으면 된다.
적당히 5 ~ 6 개 정도 다운 받아서 디렉터리 통째로 HDFS에 올린다.

프로그램 작성

pom.xml
<dependencies>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.3</version>
        </dependency>

        <dependency>
            <groupId>org.jsoup</groupId>
            <artifactId>jsoup</artifactId>
            <version>1.10.3</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.1.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>
                            jar-with-dependencies
                        </descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id> <!-- this is used for inheritance merges -->
                        <phase>package</phase> <!-- bind to the packaging phase -->
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

HtmlFileInputFormat.java
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import java.io.IOException;

/**
 * HTML 파일을 통째로 읽는 FileInputFormat
 */
public class HtmlFileInputFormat extends FileInputFormat<NullWritable, Text> {

    /* 파일을 통째로 읽어야 되기 때문에 split 되지 않게 함 */
    protected boolean isSplitable(JobContext context, Path file) {
        return false;
    }

    public RecordReader<NullWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        HtmlFileRecordReader reader = new HtmlFileRecordReader();
        reader.initialize(split, context);
        return reader;
    }
}

HtmlFileRecordReader.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;

public class HtmlFileRecordReader extends RecordReader <NullWritable, Text> {

    private FileSplit fileSplit;
    private Configuration conf;
    private Text value = new Text();
    private boolean processed = false;

    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        this.fileSplit = (FileSplit) split;
        this.conf = context.getConfiguration();
    }

    public boolean nextKeyValue() throws IOException, InterruptedException {
        if(!processed) {
            byte [] contents = new byte[(int) fileSplit.getLength()];
            Path file =  fileSplit.getPath();
            FileSystem fs = file.getFileSystem(conf);
            FSDataInputStream in = null;
            try {
                in = fs.open(file);
                IOUtils.readFully(in, contents, 0, contents.length);
                InputStream inputStream = new ByteArrayInputStream(contents);
                Document doc = Jsoup.parse(inputStream, "UTF-8", "http://home.yoursite.com");
                value.set(doc.html());
            } finally {
                IOUtils.closeStream(in);
            }
            processed = true;
            return true;
        }
        return false;
    }

    public NullWritable getCurrentKey() throws IOException, InterruptedException {
        return NullWritable.get();
    }

    public Text getCurrentValue() throws IOException, InterruptedException {
        return value;
    }

    public float getProgress() throws IOException, InterruptedException {
        return processed ? 1.0f : 0.0f;
    }

    public void close() throws IOException {

    }
}

JobBuilder.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;

import java.io.IOException;

/**
 * Hadoop Job 생성
 * 1. 파라미터 개수 체크
 * 2. 설정, Job 이름 받아서 Job 생성
 * 3. 입/출력 데이터 위치 지정
 * 4. 출력 위치에 데이터가 있으면 삭제
 */
public class JobBuilder {
    public static Job parseInputAndOutput(Tool tool, Configuration conf, String[] args, String jobName) throws IOException {

        /* 파라미터 개수 체크 */
        if (args.length != 2) {
            printUsage(tool, "<input> <output>");
            return null;
        }

        /* 설정, Job 이름 받아서 Job 생성 */
        Job job = Job.getInstance(conf, jobName);
        job.setJarByClass(tool.getClass());

        /* 입/출력 데이터 위치 지정 */
        Path inputPath = new Path(args[0]);
        Path outputPath = new Path(args[1]);
        FileInputFormat.addInputPath(job, inputPath);
        FileOutputFormat.setOutputPath(job, outputPath);

        /* 출력 위치에 데이터가 있으면 삭제 */
        outputPath.getFileSystem(conf).delete(outputPath,true);

        return job;
    }

    /* 사용법 출력 */
    public static void printUsage(Tool tool, String extraArgsUsage) {
        System.err.printf("Usage: %s [genericOptions] %s\n\n", tool.getClass().getSimpleName(), extraArgsUsage);
        GenericOptionsParser.printGenericCommandUsage(System.err);
    }
}


HtmlParserJob.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * HTML 파일을 읽어서 DOM 파싱 하는 Hadoop Job
 */
public class HtmlParserJob extends Configured implements Tool {

    public int run(String[] args) throws Exception {

        /* 설정 가져와서 Job 생성 */
        Configuration conf = getConf();
        String jobName = new String("htmlparser");
        Job job = JobBuilder.parseInputAndOutput(this, conf, args, jobName);
        if (job == null) {
            return -1;
        }

        /* 입출력 포맷 클래스 지정 */
        job.setInputFormatClass(HtmlFileInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        /* 출력 Key/Value 클래스 지정 */
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        /* Mapper / Reducer 클래스 지정 */
        job.setMapperClass(HtmlParserMapper.class);

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new HtmlParserJob(), args);
        System.exit(res);
    }

}


HtmlParserMapper.java
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;

import java.io.IOException;

/**
 * HTML 파일 받아서 DOM을 파싱해서 원하는 데이터를 뽑아 내는 Mapper
 * http://m.blog.naver.com/id/post_no 의 데이터를 받아와서 파싱
 * Input Key (NullWritable) : 없음
 * Input Value (Text) : HTML 파일 전체
 * Output Key (Text) : 블로그 URL
 * Output Value (Text) : 태그 리스트 (태그 태그 태그 ...)
 */
public class HtmlParserMapper extends Mapper<NullWritable, Text, Text, Text> {

    @Override
    protected void map(NullWritable key, Text value, Context context) throws IOException, InterruptedException {

        /* Jsoup 문서 생성 */
        Document doc = Jsoup.parse(value.toString());

        /**
         * 블로그 URL 파싱
         * <meta property="og:url" content="http://blog.naver.com/id/post_no">
         * 형태의 데이터에서 content의 데이터를 파싱해 온다.
         */
        Element ogUrl = doc.select("meta[property=og:url]").first();
        String ogUrlContent = ogUrl.attr("content");
        Text textKey = new Text(ogUrlContent);

        /**
         * 태그 파싱
         * <div class="post_tag">
         *     <ul>
         *         <li>
         *             <a>
         *                 <span>태그</span>
         *                 <span>태그</span>
         *                 <span>태그</span>
         *             </a>
         *         </li>
         *     </ul>
         * </div>
         * span 태그 사이의 텍스트 파싱
         */
        Elements postTags = doc.getElementsByClass("post_tag").first().select("span");

        String postTagValues = new String();

        /* span 리스트를 돌면서 텍스트를 뽑아내서 문자열로 이어 붙임 */
        for(Element postTag : postTags) {
            postTagValues += postTag.ownText() + " ";
        }

        Text textValue = new Text(postTagValues);

        context.write(textKey, textValue);
    }

}

결과물
http://blog.naver.com/xxxxxx/xxxxxx      #태그1
http://blog.naver.com/xxxxxx/xxxxxx      #태그1 #태그2 #태그3 #태그4 #태그5
http://blog.naver.com/xxxxxx/xxxxxx   #태그1 #태그2
http://blog.naver.com/xxxxxx/xxxxxx      #태그1
http://blog.naver.com/xxxxxx/xxxxxx     #태그1 #태그2 #태그3
http://blog.naver.com/xxxxxx/xxxxxx     #태그1

이런식으로 블로그 URL과 태그 목록이 나온다.

댓글 없음:

댓글 쓰기