[ 深入雲計算 ] Hadoop 的安裝和配置: Hadoop Eclipse 簡介與使用

Hadoop 是一個強大的併行軟體開發框架, 它讓任務在分佈式叢集上並行處理, 從而提高執行效率. 但是它也有缺點, 如編碼, 調試 Hadoop 程序難度較高, 這樣的缺點直接導致開發人員入門門檻提高. Hadoop 開發者為了降低 Hadoop 的使用難度, 開發出 Hadoop Eclipse 插件 (plugin), 透過它可以直接在 Eclipse IDE 上開發 Hadoop 程式, 從而降低編程的難度. 

Eclipse 插件開發配置: 
Hadoop 有提供一個插件 hadoop-0.20.2-eclipse-plugin.jar 讓使用者可以配置 Eclipse 並使用此 IDE 開發 MapReduce 應用程序. 下面說明如何在 Eclipse IDE 進行 MapReduce 開發: 
(這邊我使用 Eclipse v3.7
Step1: 將 hadoop-0.20.2-eclipse-plugin.jar 下載並置放於 Eclipse 的 plugins 目錄中, 然後重啟 Eclipse. 
Step2: 設置 Windows Perspective 
在上面 Menu 選擇: Windows > Open Perspective > Other > Map/Reduce 
接著會出現下面圖標. 點擊 "Map/Reduce": 

接著會出現另一個 Perspective, 在上面點擊滑鼠右鍵並選擇 "New Hadoop Location": 

再出現的 Pop windows 填入"Location Name", "Map/Reduce Master" 與 "DFS Master" 
(Map/Reduce 與 DFS 使用的 port 與當初你在建置環境的設置有關, 請注意!

此時在左方面板的 Project Explorer 可以使用 DFS Locations 來瀏覽之前在建置環境產生的一些目錄與檔案: 

新增 Map/Reduce Project: 
Step1: 建立專案 
接著我們要來利用 Eclipse ID 來開發 Map/Reduce 程式. 首先在上方 Menu 選擇: File > New > Project > Map/Reduce Project 

在出現的 Dialog 作如下設定: 

在出現的 Dialog 作如下設定: 

回到上一步後點擊 "Finish" 回到 Project Perspective 並完成專案建立: 

Step2: 撰寫 Word Count 程式 
接著要來撰寫 Map/Reduce 程式, 首先是 mapper. 
- WCMapper.java 
  1. package demo;  
  3. import java.io.IOException;  
  4. import java.util.StringTokenizer;  
  6. import org.apache.hadoop.io.IntWritable;  
  7. import org.apache.hadoop.io.Text;  
  8. import org.apache.hadoop.mapreduce.Mapper;  
  10. public class WCMapper extends Mapper{      
  11.     private final static IntWritable one = new IntWritable(1);  
  12.     private Text word = new Text();  
  14.     @Override  
  15.     public void map(Object key, Text value, Context context)  
  16.             throws IOException, InterruptedException {  
  17.         System.out.printf("\t[demo] Mapper: Key='%s'; value='%s'...\n", key, value);  
  18.         StringTokenizer itr = new StringTokenizer(value.toString());  
  19.         while (itr.hasMoreTokens()) {  
  20.             word.set(itr.nextToken());  
  21.             context.write(word, one);  
  22.         }  
  23.     }  
  24. }  
接下來是 Reducer: 
- WCReducer.java 
  1. package demo;  
  3. import java.io.IOException;  
  5. import org.apache.hadoop.io.IntWritable;  
  6. import org.apache.hadoop.io.Text;  
  7. import org.apache.hadoop.mapreduce.Reducer;  
  9. public class WCReducer extends Reducer {  
  10.     private IntWritable result = new IntWritable();  
  12.     @Override  
  13.     public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {  
  14.       int sum = 0;        
  15.       for (IntWritable val : values) {  
  16.         sum += val.get();  
  17.       }  
  18.       System.out.printf("\t[demo] Reducer: key='%s' (%d)...\n", key, sum);  
  19.       result.set(sum);  
  20.       context.write(key, result);  
  21.     }  
  22.   }  
最後是主程式, 提供使用者呼叫並傳入參數: 
- WordCount.java 
  1. package demo;  
  3. import org.apache.hadoop.conf.Configuration;  
  4. import org.apache.hadoop.fs.Path;  
  5. import org.apache.hadoop.io.IntWritable;  
  6. import org.apache.hadoop.io.Text;  
  7. import org.apache.hadoop.mapreduce.Job;  
  8. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  9. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  10. import org.apache.hadoop.util.GenericOptionsParser;  
  12. public class WordCount {  
  13.     public static void main(String args[]) throws Exception  
  14.     {  
  15.         Configuration conf = new Configuration();  
  16.         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();  
  17.         if (otherArgs.length != 2) {  
  18.           System.err.println("Usage: wordcount ");  
  19.           System.exit(2);  
  20.         }  
  21.         Job job = new Job(conf, "word count");  
  22.         job.setJarByClass(WordCount.class);  
  23.         job.setMapperClass(WCMapper.class);  
  24.         job.setCombinerClass(WCReducer.class);  
  25.         job.setReducerClass(WCReducer.class);  
  26.         job.setOutputKeyClass(Text.class);  
  27.         job.setOutputValueClass(IntWritable.class);  
  28.         FileInputFormat.addInputPath(job, new Path(otherArgs[0]));  
  29.         FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));  
  30.         System.exit(job.waitForCompletion(true) ? 0 : 1);  
  31.     }  
  32. }  
Step3: 包裝成 JAR 檔 
接著請在專案 MRTest 上點節滑鼠右鍵: Export > 選擇 Java/Jar file 

Step4: 執行 
請將剛剛產生的 JAR 檔 "MRTest.jar" 置放於 NameNode 下面 (假設是 /home/john/ ): 
# 現在我們是在 NameNode /home/john 下面
$ hadoop dfs -rmr /input/output # 刪除之前跑過的結果 rmr=Recursive version of delete.
Deleted hdfs://ubuntun:9000/input/output
$ hadoop jar MRTest.jar demo.WordCount /input /input/output # 執行 Word Count
13/11/01 01:32:43 INFO mapred.JobClient: map 100% reduce 100%

$ hadoop dfs -ls /input/output
Found 2 items
drwxr-xr-x - john supergroup 0 2013-11-01 01:32 /input/output/_logs
-rw-r--r-- 3 john supergroup 41 2013-11-01 01:32 /input/output/part-r-00000

$ hadoop dfs -cat /input/output/part-r-00000 # 檢視 Word Count 結果
Bye 1
Goodbye 1
Hadoop 2
Hello 2
World 2

$ ssh # 登入 DataNode1
$ ls hadoop/logs/userlogs/ # 剛剛 WordCount 分配下來的 task, 跑完的 log 都放在這裡
attempt_201310310045_0002_m_000000_0 attempt_201310310045_0002_m_000003_0 attempt_201310310045_0003_m_000001_0
attempt_201310310045_0002_m_000002_0 attempt_201310310045_0002_r_000000_0

$ cat hadoop/logs/userlogs/attempt_201310310045_0002_r_000000_0/stdout # 檢視 Reducer 在 DataNode1 產生的 stdout log
[demo] Reducer: key='Bye' (1)...
[demo] Reducer: key='Goodbye' (1)...
[demo] Reducer: key='Hadoop' (2)...
[demo] Reducer: key='Hello' (2)...
[demo] Reducer: key='World' (2)...

這時回去 Eclipse IDE 的 Project Explorer 並在 DFSLocations 上應該可以看到跑完的結果 (記得要 refresh): 

hadoop 0.20 程式開發 
Debugging a Hadoop MapReduce Program in Eclipse 
