참고
HTML 파싱
https://github.com/shsdev/hocr-parser-hadoopjob/blob/master/src/main/java/eu/scape_project/tb/lsdr/hocrparser/HocrParser.javaJsoup
http://partnerjun.tistory.com/42
Hadoop InputFormat
하둡 완벽 가이드 4판 - 파일의 전체 내용을 하나의 레코드로 처리하기
https://github.com/tomwhite/hadoop-book/blob/master/ch08-mr-types/src/main/java/WholeFileInputFormat.java
동기
하둡을 이용해서 네이버 블로그(모바일 버전)를 다운 받아서 해당 페이지에서 필요한 내용을 파싱하고 싶었다. 그래서 맵리듀스로 html 파싱이 가능한지 찾아봤는데, 아주 조금의 예제들이 나왔다. 그런데 map 함수에서 뭔가를 다 처리하고 있어서 이쁘게 보이지 않아서 custom InputFormat을 만들어서 사용하는 법을 알아봤다. 다행히 하둡 완벽 가이드에 적절한 예제가 있어서 두 가지를 조합에서 간단한 예제 프로그램을 만들었다.준비물
모바일 네이버 블로그 게시물 html 파일http://m.blog.naver.com/Recommendation.nhn 에 들어가서 맘에 드는 게시물의 URL을 복사한다. 복사한 URL로 그대로 wget으로 받으려 하면 엉뚱한 페이지가 다운 받아 진다. 그래서 URL은 https://m.blog.naver.com/PostView.nhn?blogId=xxxx&logNo=xxxx 를 치고 들어가면 나오는 https://m.blog.naver.com/id/post_no 를 wget으로 다운 받으면 된다.
적당히 5 ~ 6 개 정도 다운 받아서 디렉터리 통째로 HDFS에 올린다.
프로그램 작성
pom.xml
HtmlFileInputFormat.java
HtmlFileRecordReader.java
JobBuilder.java
HtmlParserJob.java
HtmlParserMapper.java
<dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>org.jsoup</groupId> <artifactId>jsoup</artifactId> <version>1.10.3</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <version>3.1.0</version> <configuration> <descriptorRefs> <descriptorRef> jar-with-dependencies </descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <!-- this is used for inheritance merges --> <phase>package</phase> <!-- bind to the packaging phase --> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
HtmlFileInputFormat.java
import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import java.io.IOException; /** * HTML 파일을 통째로 읽는 FileInputFormat */ public class HtmlFileInputFormat extends FileInputFormat<NullWritable, Text> { /* 파일을 통째로 읽어야 되기 때문에 split 되지 않게 함 */ protected boolean isSplitable(JobContext context, Path file) { return false; } public RecordReader<NullWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { HtmlFileRecordReader reader = new HtmlFileRecordReader(); reader.initialize(split, context); return reader; } }
HtmlFileRecordReader.java
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.jsoup.Jsoup; import org.jsoup.nodes.Document; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; public class HtmlFileRecordReader extends RecordReader <NullWritable, Text> { private FileSplit fileSplit; private Configuration conf; private Text value = new Text(); private boolean processed = false; public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { this.fileSplit = (FileSplit) split; this.conf = context.getConfiguration(); } public boolean nextKeyValue() throws IOException, InterruptedException { if(!processed) { byte [] contents = new byte[(int) fileSplit.getLength()]; Path file = fileSplit.getPath(); FileSystem fs = file.getFileSystem(conf); FSDataInputStream in = null; try { in = fs.open(file); IOUtils.readFully(in, contents, 0, contents.length); InputStream inputStream = new ByteArrayInputStream(contents); Document doc = Jsoup.parse(inputStream, "UTF-8", "http://home.yoursite.com"); value.set(doc.html()); } finally { IOUtils.closeStream(in); } processed = true; return true; } return false; } public NullWritable getCurrentKey() throws IOException, InterruptedException { return NullWritable.get(); } public Text getCurrentValue() throws IOException, InterruptedException { return value; } public float getProgress() throws IOException, InterruptedException { return processed ? 1.0f : 0.0f; } public void close() throws IOException { } }
JobBuilder.java
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import java.io.IOException; /** * Hadoop Job 생성 * 1. 파라미터 개수 체크 * 2. 설정, Job 이름 받아서 Job 생성 * 3. 입/출력 데이터 위치 지정 * 4. 출력 위치에 데이터가 있으면 삭제 */ public class JobBuilder { public static Job parseInputAndOutput(Tool tool, Configuration conf, String[] args, String jobName) throws IOException { /* 파라미터 개수 체크 */ if (args.length != 2) { printUsage(tool, "<input> <output>"); return null; } /* 설정, Job 이름 받아서 Job 생성 */ Job job = Job.getInstance(conf, jobName); job.setJarByClass(tool.getClass()); /* 입/출력 데이터 위치 지정 */ Path inputPath = new Path(args[0]); Path outputPath = new Path(args[1]); FileInputFormat.addInputPath(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); /* 출력 위치에 데이터가 있으면 삭제 */ outputPath.getFileSystem(conf).delete(outputPath,true); return job; } /* 사용법 출력 */ public static void printUsage(Tool tool, String extraArgsUsage) { System.err.printf("Usage: %s [genericOptions] %s\n\n", tool.getClass().getSimpleName(), extraArgsUsage); GenericOptionsParser.printGenericCommandUsage(System.err); } }
HtmlParserJob.java
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * HTML 파일을 읽어서 DOM 파싱 하는 Hadoop Job */ public class HtmlParserJob extends Configured implements Tool { public int run(String[] args) throws Exception { /* 설정 가져와서 Job 생성 */ Configuration conf = getConf(); String jobName = new String("htmlparser"); Job job = JobBuilder.parseInputAndOutput(this, conf, args, jobName); if (job == null) { return -1; } /* 입출력 포맷 클래스 지정 */ job.setInputFormatClass(HtmlFileInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); /* 출력 Key/Value 클래스 지정 */ job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); /* Mapper / Reducer 클래스 지정 */ job.setMapperClass(HtmlParserMapper.class); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new HtmlParserJob(), args); System.exit(res); } }
HtmlParserMapper.java
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.jsoup.Jsoup; import org.jsoup.nodes.Document; import org.jsoup.nodes.Element; import org.jsoup.select.Elements; import java.io.IOException; /** * HTML 파일 받아서 DOM을 파싱해서 원하는 데이터를 뽑아 내는 Mapper * http://m.blog.naver.com/id/post_no 의 데이터를 받아와서 파싱 * Input Key (NullWritable) : 없음 * Input Value (Text) : HTML 파일 전체 * Output Key (Text) : 블로그 URL * Output Value (Text) : 태그 리스트 (태그 태그 태그 ...) */ public class HtmlParserMapper extends Mapper<NullWritable, Text, Text, Text> { @Override protected void map(NullWritable key, Text value, Context context) throws IOException, InterruptedException { /* Jsoup 문서 생성 */ Document doc = Jsoup.parse(value.toString()); /** * 블로그 URL 파싱 * <meta property="og:url" content="http://blog.naver.com/id/post_no"> * 형태의 데이터에서 content의 데이터를 파싱해 온다. */ Element ogUrl = doc.select("meta[property=og:url]").first(); String ogUrlContent = ogUrl.attr("content"); Text textKey = new Text(ogUrlContent); /** * 태그 파싱 * <div class="post_tag"> * <ul> * <li> * <a> * <span>태그</span> * <span>태그</span> * <span>태그</span> * </a> * </li> * </ul> * </div> * span 태그 사이의 텍스트 파싱 */ Elements postTags = doc.getElementsByClass("post_tag").first().select("span"); String postTagValues = new String(); /* span 리스트를 돌면서 텍스트를 뽑아내서 문자열로 이어 붙임 */ for(Element postTag : postTags) { postTagValues += postTag.ownText() + " "; } Text textValue = new Text(postTagValues); context.write(textKey, textValue); } }
결과물
http://blog.naver.com/xxxxxx/xxxxxx #태그1 http://blog.naver.com/xxxxxx/xxxxxx #태그1 #태그2 #태그3 #태그4 #태그5 http://blog.naver.com/xxxxxx/xxxxxx #태그1 #태그2 http://blog.naver.com/xxxxxx/xxxxxx #태그1 http://blog.naver.com/xxxxxx/xxxxxx #태그1 #태그2 #태그3 http://blog.naver.com/xxxxxx/xxxxxx #태그1
이런식으로 블로그 URL과 태그 목록이 나온다.
댓글 없음:
댓글 쓰기