2017년 10월 27일 금요일

Hadoop 출력을 Elastic Search에 저장하기

참고 사이트

http://wpcertification.blogspot.kr/2014/05/using-elasticsearch-to-store-output-of.html

우선 Elastic Search를 설치한다.
http://modoleesi.blogspot.kr/2017/10/elastic-search.html

연동을 위해서는 Elastic에서 제공하는 ES-Hadoop 라이브러리가 있다.
https://www.elastic.co/kr/products/hadoop

라이브러리 다운받고 압축을 푼다.
압축을 풀고 dist 디렉터리에 들어가면 파일이 엄청 많이 있는데, 그 중에서 elasticsearch-hadoop-mr-5.6.3.jar 를 사용할 것이다.

해당 파일을 lib 파일을 저장할 곳에 복사한다.
나는 $ES_HOME/lib/에 저장을 하고 있다.
cp elasticsearch-hadoop-mr-5.6.3.jar $ES_HOME/lib/

환경변수

vi $HOME/.bashrc

#ELASTIC-HADOOP
export ES_HADOOP_MR_LIB=$ES_HOME/lib/elasticsearch-hadoop-mr-5.6.3.jar
export LIBJARS=$ES_HADOOP_MR_LIB
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$ES_HADOOP_MR_LIB

source $HOME/.bashrc

hadoop classpath
명령을 쳐서 정상적으로 추가됐는지 확인한다.

그리고 이제 wordcount 예제를 수정해서 결과를 ElasticSearch에 저장해보자.
EMR 최신버전(5.9)가 Hadoop 2.7.3 버전을 지원해서 지금부터는 아마 2.7.3 으로 작성할 것 같다.

pom.xml
<dependencies>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.3</version>
        </dependency>

        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-hadoop-mr</artifactId>
            <version>5.6.3</version>
        </dependency>

    </dependencies>

EsHadoopJob.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.elasticsearch.hadoop.mr.EsOutputFormat;

public class EsHadoopJob extends Configured implements Tool {

    public int run(String[] args) throws Exception {

        // Verify the number of parameters
        if (args.length != 2) {
            System.err.printf("Usage : %s [generic options] <input> <output>\n", getClass().getSimpleName());
            ToolRunner.printGenericCommandUsage(System.err);
            return -1;
        }

        // Create a configuration
        Configuration conf = getConf();

        // Configuration for ES-Haoop connector
        conf.set("es.nodes","localhost:9200");
        conf.set("es.resource", "hadoop/wordcount");

        // Create a job from the default configuration that will use the wordcount class
        Job job = Job.getInstance(conf, "wordcount");

       // for ES-HADOOP
        job.setSpeculativeExecution(false);

        // Configure the job: name, mapper, reducer, and combiner
        job.setJarByClass(EsHadoopJob.class);
        job.setMapperClass(EsHadoopMapper.class);
       //  job.setCombinerClass(EsHadoopReducer.class); 
        job.setReducerClass(EsHadoopReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // Configure the input/output format and key, value class
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(MapWritable.class);

        // OutputFormatClass for ES-Haoop connector
        job.setOutputFormatClass(EsOutputFormat.class);

        // Define our input path as the first command line argument and our output path as the second
        Path inputPath = new Path(args[0]);
        Path outputPath = new Path(args[1]);

        // Create File Input/Output formats for these paths (in the job)
        FileInputFormat.addInputPath(job, inputPath);
        FileOutputFormat.setOutputPath(job, outputPath);

        // Deleting the context path automatically from hdfs so tha we don't have delete it explicitly
        outputPath.getFileSystem(conf).delete(outputPath,true);

        // Run the job
        return (job.waitForCompletion(true) ? 0 : 1);
    }


    public static void main(String[] args) throws Exception {
        // Start the WordCount MapReduce application
        int res = ToolRunner.run(new Configuration(), new EsHadoopJob(), args);
        System.exit(res);
    }
}


EsHadoopMapper.java -> 바꿀 것 없음

EsHadoopReducer.java
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.Iterator;

public class EsHadoopReducer extends Reducer<Text, IntWritable, Text, MapWritable> {

    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }

       private MapWritable result = new MapWritable();
        result.put(key, new IntWritable(sum));
        context.write(key, result);
    }
}


Elastic Search Index 생성

index 리스트를 조회해 보고, 기존에 index가 있을 경우 삭제하고, 새로 생성하며 field의 limit을 4000으로 설정한다. (설정 안 하면 에러가 발생할 수 있어서 우선 설정한다. 에러는 밑에 정리되어 있다.)
curl -XGET 'localhost:9200/_cat/indices?v&pretty'

curl -XDELETE http://localhost:9200/hadoop

curl -XPUT 'localhost:9200/hadoop?pretty'

curl -XPUT 'localhost:9200/hadoop/_settings' -d '
{
    "index.mapping.total_fields.limit": 4000
}'

빌드 후 실행

mvn clean install

hadoop jar es-hadoop-1.0-SNAPSHOT.jar EsHadoopJob -libjars elasticsearch-hadoop-mr-5.6.3.jar input output

데이터 확인

http://localhost:9200/hadoop/_search?pretty=true&q=*:*&size=999

이렇게 브라우저에 입력하면 인덱스에 입력된 모든 데이터 중 999개를 출력해준다.

발생했던 에러
  1. es.resource 를 지정하지 않았다는 에러
    : 지정하는 위치가 문제. job 생성시 Job.getInstance(conf, "wordcount"); 이렇게 conf를 사용하는데, conf 값 지정을 그 밑에서 하고 있었음. 그래서 위로 올렸더니 정상 동작
  2. classpath에 넣어 놓은 라이브러리를 찾지 못한다고 (-libjars 명령 안 먹힘)
    : job 생성 시  conf를 new Configuration()을 했었는데, 그걸 getConf()로 바꿈
  3. wrong value class: class org.apache.hadoop.io.MapWritable is not class org.apache.hadoop.io.IntWritable
    : 이거는 Combiner 클래스를 그대로 사용해서 문제가 발생. combiner 클래스의 output이 mapper의 output과 같아야 하는데 combiner를 reducer와 같게 지정해 놔서 IntWritable이 나와야 될 자리에 MapWritable이 나와서 발생한 에러
  4. failed to execute bulk item (index) BulkShardRequest [[hadoop][1]] containing [232] requests org.elasticsearch.index.mapper.MapperParsingException: failed to parse -> elastic search 디렉터리에 log 디렉터리의 elasticsearch.log에 자세히 적혀 있음
    object field starting or ending with a [.] makes object resolution ambiguous: [survive...that]
    : field 이름이 .으로 시작하고 끝나는데 field 이름을 survive...that으로 하려니 ... 에러가 발생
    이건 wordcount에서 survice...that을 한 단어로 봐서 생긴일 그래서 그냥 원본 파일에서 ... 제거 함. 우선은 테스트 해 보는게 목적이니깐. 나중에는 field 명에 . 을 넣는일은 없을테니깐
  5. Limit of total fields [1000] in index [hadoop] has been exceeded
    : 필드 생성 개수 초과
    index 삭제하고 다시 만들어서 한계치를 올려준다.
    curl -XDELETE http://localhost:9200/hadoop
    curl -XPUT 'localhost:9200/hadoop?pretty'
    curl -XPUT 'localhost:9200/hadoop/_settings' -d '
    {
        "index.mapping.total_fields.limit": 4000
    }'
    

댓글 없음:

댓글 쓰기