코드출처: https://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html
import java.io.IOException; // 예외처리
import java.util.StringTokenizer; //스트링 토큰 처리기
import org.apache.hadoop.conf.Configuration; // 하둡 구성 정보
import org.apache.hadoop.fs.Path; // 파일 시스템 경로
import org.apache.hadoop.io.IntWritable; // 정수형 쓰기 가능 데이터 삽입
import org.apache.hadoop.io.Text; //텍스트 데이터타입
import org.apache.hadoop.mapreduce.Job; //맵리듀스 Job
import org.apache.hadoop.mapreduce.Mapper; // 맵리듀스 매퍼
import org.apache.hadoop.mapreduce.Reducer; //맵리듀스 리듀서
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; //파일 입력 포맷
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; //파일 입력 출력
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{ // 매퍼클래스에서 상속을 받는다. 첫번째 두번째는 인풋용 세번째 네번째는 아우풋 <인풋키, 인풋밸류, 아웃풋키, 아웃풋밸류>
// 변수 선언
private final static IntWritable one = new IntWritable(1); // one 변수 선언 IntWritable이란 하둡 시스템에서 만들어 놓은 특별한 타입 빅데이터 처리시 직렬화 역질렬화 시 네트워크에 타고 갈 수 있도록 한다.병렬로 작업했던 것이 직렬화할 수 있도록 한다.
private Text word = new Text(); // word 변수 선언 java에서는 String의 형태라고 보면 된다. 라인 한 줄씩 들어 온다고 보면 된다.
// 키와 밸류로 되어 있고 컨텍스트라는 파라미터를 받는다. 나중에 맵과 리듀스들 끼리 커뮤니케이션할 때 사용한다.
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException { // 예외가 날때 예외 핸들러가 처리한다.
StringTokenizer itr = new StringTokenizer(value.toString()); // StringTokenizer써서 밸류를 하나씩 잘라낸다.
while (itr.hasMoreTokens()) {
word.set(itr.nextToken()); // 단어를 하나씩 추출
context.write(word, one); // (word, 1)의 쌍으로 출력
}
}
}
// Writable 자바에서의 인터페이스 구현과 유사 자바의 인터페이스 완전 추상 클래스
// 자바 기본형에 대한 Wrapper 클래스로 동작 int -> intWriatble String -> Text
// 하둡에서 직렬화 데이터 타입 생성 시 사용 - 직렬화는 하둡에서 데이터의 크기를 줄이고 쉽게 전송할 수 있게 함
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable(); // 결과를 담을 result 변수 선언
// 중복을 허용하는 key의 형대로 입력을 받는다.
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get(); // value의 값을 얻어서 합계
}
result.set(sum);
context.write(key, result); //(ket, result)의 쌍으로 출력
}
}
// main 함수를 드라이브 함수라고 하며 실제로 구동시키는 함수이다.
public static void main(String[] args) throws Exception { // 아규먼트를 받는다.
Configuration conf = new Configuration(); // conf 하둡의 구성 정보를 담고 있다.
Job job = Job.getInstance(conf, "word count"); // word count라는 이름을 가진 job을 만든다.
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0])); // 입력 파일 지정
FileOutputFormat.setOutputPath(job, new Path(args[1])); // 목적지 파일 지정
System.exit(job.waitForCompletion(true) ? 0 : 1); // 맵리듀스 job제출 - 완성이 끝날때 까지 기다린다. 결국 이 부분이 잡이 실행되는 부분
}
}