2017년 10월 25일 수요일

MapReduce 프로그래밍 예제 : 날씨 데이터


위 동영상을 참고하면서 예제를 따라 했다. 34:27 부터 해당 예제 설명이 나온다.
날씨 데이터를 다운 받아서 최저 / 최고 기온을 추출한 후 Hot / Cold와 날짜 그리고 기온을 최종 출력하는 프로그램이다. map 함수에서 파싱을 하고 reduce 함수는 그냥 아무런 역할을 안한다. (동영상에서 reduce 함수 코드를 확인하지 못해서 그냥 임의로 작성한 것이다. 아무런 역할을 안하는 reduce는 identity reducer를 이용하면 된다고 본 것 같은데 확인봐야 될 듯...)

데이터 다운로드 위치 : ftp://ftp.ncdc.noaa.gov/pub/data/uscrn/products/daily01/

개발환경 세팅 참고 : http://modoleesi.blogspot.kr/2017/09/blog-post_28.html

input 예시

23907 20150101  2.423  -98.08   30.62     2.2    -0.6     0.8     0.9     7.0     1.47 C     3.7     1.1     2.5    99.9    85.4    97.2   0.369   0.308 -99.000 -99.000 -99.000     7.0     8.1 -9999.0 -9999.0 -9999.0

pom.xml

<dependencies>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.4</version>
        </dependency>

    </dependencies>

MaxTempJob.java

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MaxTempJob extends Configured implements Tool {

    public int run(String[] args) throws Exception {

        // Verify the number of parameters
        if (args.length != 2) {
            System.err.printf("Usage : %s [generic options] <input> <output>\n", getClass().getSimpleName());
            ToolRunner.printGenericCommandUsage(System.err);
            return -1;
        }

        // Create a configuration
        Configuration conf = getConf();

        // Create a job from the default configuration that will use the MaxTempJob class
        Job job = Job.getInstance(conf, "weather example");

        // Configure the job: name, mapper, reducer, and combiner
        job.setJarByClass(MaxTempJob.class);
        job.setMapperClass(MaxTempMapper.class);
        job.setReducerClass(MaxTempReducer.class);
        job.setCombinerClass(MaxTempReducer.class);

        // Configure the input/output format and key, value class
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        // Define our input path as the first command line argument and our output path as the second
        Path inputPath = new Path(args[0]);
        Path outputPath = new Path(args[1]);

        // Create File Input/Output formats for these paths (in the job)
        FileInputFormat.addInputPath(job, inputPath);
        FileOutputFormat.setOutputPath(job, outputPath);

        // Deleting the context path automatically from hdfs so tha we don't have delete it explicitly
        outputPath.getFileSystem(conf).delete(outputPath,true);

        // Run the job
        job.waitForCompletion(true);

        return 0;
    }

    public static void main(String[] args) throws Exception
    {
        // Start the WordCount MapReduce application
        int res = ToolRunner.run( new Configuration(), new MaxTempJob(), args );
        System.exit( res );
    }
}

MaxTempMapper.java

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;


import java.io.IOException;

public class MaxTempMapper extends Mapper<LongWritable, Text, Text, Text> {

    public static final int MISSING = 9999;

   @Override
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        String line = value.toString();

        if(!(line.length() == 0)) {
            String date = line.substring(6, 14);

            float maxTemp = Float.parseFloat(line.substring(39, 45).trim());
            float minTemp = Float.parseFloat(line.substring(47, 52).trim());

            if(maxTemp > 35.0 && maxTemp != MISSING) {
                context.write(new Text("Hot Day " + date), new Text(String.valueOf(maxTemp)));
            }

            if(minTemp > 10.0 && minTemp != MISSING) {
                context.write(new Text("Cold Day " + date), new Text(String.valueOf(minTemp)));
            }
        }
    }
}

MaxTempReducer.java

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.Iterator;

public class MaxTempReducer extends Reducer<Text, Text, Text, Text> {

   @Override
    public void reduce(Text key, Iterator<Text> values, Context context) throws IOException, InterruptedException {

        while(values.hasNext()) {
            Text value = values.next();
            context.write(key, value);
        }
    }
}


output 예시

Cold Day 20150128 13.0
Cold Day 20150208 12.0
Cold Day 20150209 13.0
Cold Day 20150210 12.0
...
Hot Day 20150907 36.5
Hot Day 20150908 35.5

댓글 없음:

댓글 쓰기