참고 사이트
http://wpcertification.blogspot.kr/2014/05/using-elasticsearch-to-store-output-of.html우선 Elastic Search를 설치한다.
http://modoleesi.blogspot.kr/2017/10/elastic-search.html
연동을 위해서는 Elastic에서 제공하는 ES-Hadoop 라이브러리가 있다.
https://www.elastic.co/kr/products/hadoop
라이브러리 다운받고 압축을 푼다.
압축을 풀고 dist 디렉터리에 들어가면 파일이 엄청 많이 있는데, 그 중에서 elasticsearch-hadoop-mr-5.6.3.jar 를 사용할 것이다.
해당 파일을 lib 파일을 저장할 곳에 복사한다.
나는 $ES_HOME/lib/에 저장을 하고 있다.
cp elasticsearch-hadoop-mr-5.6.3.jar $ES_HOME/lib/
환경변수
vi $HOME/.bashrc#ELASTIC-HADOOP export ES_HADOOP_MR_LIB=$ES_HOME/lib/elasticsearch-hadoop-mr-5.6.3.jar export LIBJARS=$ES_HADOOP_MR_LIB export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$ES_HADOOP_MR_LIB
source $HOME/.bashrc
hadoop classpath
명령을 쳐서 정상적으로 추가됐는지 확인한다.
그리고 이제 wordcount 예제를 수정해서 결과를 ElasticSearch에 저장해보자.
EMR 최신버전(5.9)가 Hadoop 2.7.3 버전을 지원해서 지금부터는 아마 2.7.3 으로 작성할 것 같다.
pom.xml
<dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-hadoop-mr</artifactId> <version>5.6.3</version> </dependency> </dependencies>
EsHadoopJob.java
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.elasticsearch.hadoop.mr.EsOutputFormat; public class EsHadoopJob extends Configured implements Tool { public int run(String[] args) throws Exception { // Verify the number of parameters if (args.length != 2) { System.err.printf("Usage : %s [generic options] <input> <output>\n", getClass().getSimpleName()); ToolRunner.printGenericCommandUsage(System.err); return -1; } // Create a configuration Configuration conf = getConf(); // Configuration for ES-Haoop connector conf.set("es.nodes","localhost:9200"); conf.set("es.resource", "hadoop/wordcount"); // Create a job from the default configuration that will use the wordcount class Job job = Job.getInstance(conf, "wordcount"); // for ES-HADOOP job.setSpeculativeExecution(false); // Configure the job: name, mapper, reducer, and combiner job.setJarByClass(EsHadoopJob.class); job.setMapperClass(EsHadoopMapper.class); // job.setCombinerClass(EsHadoopReducer.class); job.setReducerClass(EsHadoopReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // Configure the input/output format and key, value class job.setOutputKeyClass(Text.class); job.setOutputValueClass(MapWritable.class); // OutputFormatClass for ES-Haoop connector job.setOutputFormatClass(EsOutputFormat.class); // Define our input path as the first command line argument and our output path as the second Path inputPath = new Path(args[0]); Path outputPath = new Path(args[1]); // Create File Input/Output formats for these paths (in the job) FileInputFormat.addInputPath(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); // Deleting the context path automatically from hdfs so tha we don't have delete it explicitly outputPath.getFileSystem(conf).delete(outputPath,true); // Run the job return (job.waitForCompletion(true) ? 0 : 1); } public static void main(String[] args) throws Exception { // Start the WordCount MapReduce application int res = ToolRunner.run(new Configuration(), new EsHadoopJob(), args); System.exit(res); } }
EsHadoopMapper.java -> 바꿀 것 없음
EsHadoopReducer.java
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.Iterator; public class EsHadoopReducer extends Reducer<Text, IntWritable, Text, MapWritable> { @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } private MapWritable result = new MapWritable(); result.put(key, new IntWritable(sum)); context.write(key, result); } }
Elastic Search Index 생성
index 리스트를 조회해 보고, 기존에 index가 있을 경우 삭제하고, 새로 생성하며 field의 limit을 4000으로 설정한다. (설정 안 하면 에러가 발생할 수 있어서 우선 설정한다. 에러는 밑에 정리되어 있다.)curl -XGET 'localhost:9200/_cat/indices?v&pretty'
curl -XDELETE http://localhost:9200/hadoop
curl -XPUT 'localhost:9200/hadoop?pretty' curl -XPUT 'localhost:9200/hadoop/_settings' -d ' { "index.mapping.total_fields.limit": 4000 }'
빌드 후 실행
mvn clean install
hadoop jar es-hadoop-1.0-SNAPSHOT.jar EsHadoopJob -libjars elasticsearch-hadoop-mr-5.6.3.jar input output
데이터 확인
http://localhost:9200/hadoop/_search?pretty=true&q=*:*&size=999이렇게 브라우저에 입력하면 인덱스에 입력된 모든 데이터 중 999개를 출력해준다.
발생했던 에러
- es.resource 를 지정하지 않았다는 에러
: 지정하는 위치가 문제. job 생성시 Job.getInstance(conf, "wordcount"); 이렇게 conf를 사용하는데, conf 값 지정을 그 밑에서 하고 있었음. 그래서 위로 올렸더니 정상 동작 - classpath에 넣어 놓은 라이브러리를 찾지 못한다고 (-libjars 명령 안 먹힘)
: job 생성 시 conf를 new Configuration()을 했었는데, 그걸 getConf()로 바꿈 - wrong value class: class org.apache.hadoop.io.MapWritable is not class org.apache.hadoop.io.IntWritable
: 이거는 Combiner 클래스를 그대로 사용해서 문제가 발생. combiner 클래스의 output이 mapper의 output과 같아야 하는데 combiner를 reducer와 같게 지정해 놔서 IntWritable이 나와야 될 자리에 MapWritable이 나와서 발생한 에러 - failed to execute bulk item (index) BulkShardRequest [[hadoop][1]] containing [232] requests org.elasticsearch.index.mapper.MapperParsingException: failed to parse -> elastic search 디렉터리에 log 디렉터리의 elasticsearch.log에 자세히 적혀 있음
object field starting or ending with a [.] makes object resolution ambiguous: [survive...that]
: field 이름이 .으로 시작하고 끝나는데 field 이름을 survive...that으로 하려니 ... 에러가 발생
이건 wordcount에서 survice...that을 한 단어로 봐서 생긴일 그래서 그냥 원본 파일에서 ... 제거 함. 우선은 테스트 해 보는게 목적이니깐. 나중에는 field 명에 . 을 넣는일은 없을테니깐 - Limit of total fields [1000] in index [hadoop] has been exceeded
: 필드 생성 개수 초과
index 삭제하고 다시 만들어서 한계치를 올려준다.
curl -XDELETE http://localhost:9200/hadoop curl -XPUT 'localhost:9200/hadoop?pretty' curl -XPUT 'localhost:9200/hadoop/_settings' -d ' { "index.mapping.total_fields.limit": 4000 }'
댓글 없음:
댓글 쓰기