-
Hadoop WordCount 소스 코드 레벨에서 살펴보기Software Development/Big Data 2020. 7. 1. 13:08
import java.io.IOException; // 예외처리 import java.util.StringTokenizer; //스트링 토큰 처리기 import org.apache.hadoop.conf.Configuration; // 하둡 구성 정보 import org.apache.hadoop.fs.Path; // 파일 시스템 경로 import org.apache.hadoop.io.IntWritable; // 정수형 쓰기 가능 데이터 삽입 import org.apache.hadoop.io.Text; //텍스트 데이터타입 import org.apache.hadoop.mapreduce.Job; //맵리듀스 Job import org.apache.hadoop.mapreduce.Mapper; // 맵리듀스 매퍼 import org.apache.hadoop.mapreduce.Reducer; //맵리듀스 리듀서 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; //파일 입력 포맷 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; //파일 입력 출력 public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ // 매퍼클래스에서 상속을 받는다. 첫번째 두번째는 인풋용 세번째 네번째는 아우풋 <인풋키, 인풋밸류, 아웃풋키, 아웃풋밸류> // 변수 선언 private final static IntWritable one = new IntWritable(1); // one 변수 선언 IntWritable이란 하둡 시스템에서 만들어 놓은 특별한 타입 빅데이터 처리시 직렬화 역질렬화 시 네트워크에 타고 갈 수 있도록 한다.병렬로 작업했던 것이 직렬화할 수 있도록 한다. private Text word = new Text(); // word 변수 선언 java에서는 String의 형태라고 보면 된다. 라인 한 줄씩 들어 온다고 보면 된다. // 키와 밸류로 되어 있고 컨텍스트라는 파라미터를 받는다. 나중에 맵과 리듀스들 끼리 커뮤니케이션할 때 사용한다. public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { // 예외가 날때 예외 핸들러가 처리한다. StringTokenizer itr = new StringTokenizer(value.toString()); // StringTokenizer써서 밸류를 하나씩 잘라낸다. while (itr.hasMoreTokens()) { word.set(itr.nextToken()); // 단어를 하나씩 추출 context.write(word, one); // (word, 1)의 쌍으로 출력 } } } // Writable 자바에서의 인터페이스 구현과 유사 자바의 인터페이스 완전 추상 클래스 // 자바 기본형에 대한 Wrapper 클래스로 동작 int -> intWriatble String -> Text // 하둡에서 직렬화 데이터 타입 생성 시 사용 - 직렬화는 하둡에서 데이터의 크기를 줄이고 쉽게 전송할 수 있게 함 public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); // 결과를 담을 result 변수 선언 // 중복을 허용하는 key의 형대로 입력을 받는다. public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); // value의 값을 얻어서 합계 } result.set(sum); context.write(key, result); //(ket, result)의 쌍으로 출력 } } // main 함수를 드라이브 함수라고 하며 실제로 구동시키는 함수이다. public static void main(String[] args) throws Exception { // 아규먼트를 받는다. Configuration conf = new Configuration(); // conf 하둡의 구성 정보를 담고 있다. Job job = Job.getInstance(conf, "word count"); // word count라는 이름을 가진 job을 만든다. job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); // 입력 파일 지정 FileOutputFormat.setOutputPath(job, new Path(args[1])); // 목적지 파일 지정 System.exit(job.waitForCompletion(true) ? 0 : 1); // 맵리듀스 job제출 - 완성이 끝날때 까지 기다린다. 결국 이 부분이 잡이 실행되는 부분 } }
'Software Development > Big Data' 카테고리의 다른 글
[빅데이터 전문가의 하둡관리] 1. 하둡 소개 및 하둡의 주변 환경 (0) 2022.08.20 [스파크 완벽 가이드] 1. 아파치 스파크란 (0) 2022.05.08 [Spark] 튜닝, 디버깅, 그리고 개발자가 신경 쓰지 않는 것들 (2) 2021.03.28 [Spark] 효율적인 트랜스포메이션 (0) 2021.02.14 hadoop wordcount 예제 eclilpse maven build 시 발생하는 오류 (0) 2020.07.01