欢迎投稿

今日深度:

Hadoop 研发之远程调试详细剖析--WordCount V2.0,hadoop--wordcount

Hadoop 研发之远程调试详细剖析--WordCount V2.0,hadoop--wordcount


希望本篇博文能帮助到那些想快速开始Hadoop程序开发调试的朋友们~

前言

之前学习Hadoop时,曾经错误的以为开发的Hadoop程序必须在运行的Hadoop集群上才能运行和调试,基于这个错误认识,花费了较多的时间在mac上搭建伪分布式Hadoop集群和IDE集成开发环境,虽然走了不少弯路,但现在回头想想还是很值得了,至少对Hadoop的认识加深了不少。

之所花费了较多的时间是因为,是因为:

随着知识的积累,发现关于Hadoop开发调试的一些认识其实是错误的,譬如:

有了上面的认识,再结合一下Java远程调试就可以非常方便的开发调试Hadoop程序了。

以下还是以Hadoop官网WordCount为例,详细分析Hadoop研发远程调试;以下源码中除了改造部分WordCount源码外,还增加了一个工具类RunJobTool,方便在Terminal下调试Hadoop程序,也因为这些原因,所以称本示例为WordCount V2.0。

本次调试输入的文件内容为:

a c b d
d b c a
a c d b
c a r s
d s g h

一、源码展示

本分源码分为两个部分:

WordCountV2源码

package hadoopTest;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountV2 {
  private static boolean debugFlag = false;

  public static void setDebug(boolean debugMode){
      debugFlag = debugMode;
  }

  /**
   * 定义MR Mapper.
   * */
  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);

        if(debugFlag){
            System.out.println("raw data: key= " + key + ", value= " + value + " maper output : " + word + " " + one);
        }

      }
    }
  }


  /**
   * 定义MR Reducer
   * */
  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      StringBuffer sb = new StringBuffer();
      for (IntWritable val : values) {
        sum += val.get();
        sb.append(val + " ");
      }

      if(debugFlag){
          System.out.println("raw data: key= " + key + ", value= " + sb.toString());
      }

      result.set(sum);
      context.write(key, result);
    }
  }

  /**
   * 配置job的详细属性,并提交执行完成。
   * */
  public static void run(Configuration conf, String jobName, Path inputPath, Path outputPath, boolean debug) throws IOException, ClassNotFoundException, InterruptedException{
      Job job = Job.getInstance(conf, jobName);
      job.setJobName(jobName);

      job.setJarByClass(WordCountV2.class);
      WordCountV2.setDebug(debug);

      job.setMapperClass(TokenizerMapper.class);
      job.setCombinerClass(IntSumReducer.class);
      job.setReducerClass(IntSumReducer.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(IntWritable.class);   

      FileInputFormat.addInputPath(job, inputPath);
      FileOutputFormat.setOutputPath(job, outputPath);

      job.submit();
      System.exit(job.waitForCompletion(true) ? 0 : 1);

      System.out.println("execute success!");
  }
}

源码分析:

RunJobTool源码

package hadoopTest;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobPriority;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.lexicalscope.jewel.cli.CliFactory;
import com.lexicalscope.jewel.cli.Option;

public class RunJobTool extends Configured implements Tool{

    /**
     * 使用com.lexicalscope.jewel.cli.CliFactory接收和解析命令行参数
     * */
    public interface MapReduceCommandLineOptions{
        @Option(longName = "job-priority", description = "作业优先级", defaultValue = "normal")
        String jobPriority();

        @Option(longName = "job-queueu", description = "作业队列", defaultValue= "default")
        String jobQueue();
    }

    public interface CommandLineOptions extends MapReduceCommandLineOptions{
        @Option(longName = "input-dir")
        String inputDir();

        @Option(longName = "output-dir")
        String outputDir();
    }

    /**
     * 主要用于从命令行解析参数
     * */
    @Override
    public int run(String [] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = getConf();
        CommandLineOptions cli = CliFactory.parseArguments(CommandLineOptions.class, args);
        configureMapReduce(conf, cli);

        Path inputPath = new Path(cli.inputDir());
        Path outputPath = new Path(cli.outputDir());
        boolean debug = true;
        String jobName = "WordCountV2";

        //用于submit job
        WordCountV2.run(conf, jobName, inputPath, outputPath, debug);

        return 0;
    }

    /**
     * 配置JobTracker属性
     * */
    public static void configureMapReduce(Configuration conf, MapReduceCommandLineOptions cli) {
        // configure JobTracker to local run mode
        System.out.println("This job will be running locally.");
        conf.set("mapreduce.framework.name", "local");
        conf.set("fs.defaultFS", "file:///");

        // configure job priority
        JobPriority jobPriority = parseJobPriority(cli.jobPriority());
        conf.set("mapreduce.job.priority", jobPriority.name());

        // configure job queue
        conf.set("mapreduce.job.queuename", cli.jobQueue());
    }

    /**
     * 配置JobTracker属性
     * */
    private static JobPriority parseJobPriority(String s) {
        for (JobPriority p : JobPriority.values()) {
            if (p.name().equalsIgnoreCase(s)) {
                return p;
            }
        }
        System.out.println("Unknown job priority: " + s + ". Fallback to NORMAL.");
        return JobPriority.NORMAL;
    }

    /**
     * Job入口函数
     * */
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        RunJobTool stJobTool = new RunJobTool();
        try {
            //重要:用Hadoop ToolRunner执行job任务。
            int exitCode = ToolRunner.run(conf, stJobTool, args);
            if(exitCode == 0){
                System.out.println("Job Success Completed!");
            }else{
                System.out.println("Job Failed!");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

源码分析:

二、终端下启动远程debugee

在终端下使用以下命令启动远程debugee:
java -cp `hadoop classpath`:/Users/zq/jars/jewelcli-0.8.6.jar:/Users/zq/Documents/workspace/hadoopLocal/bin/hadoopTest.jar -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=6789 -Djava.library.path=/Users/zq/big_data_workshop/usr_big_data/hadoop/hadoop-2.6.0/lib/native  hadoopTest/RunJobTool --input-dir=/Users/zq/tmp/word.txt --output-dir=/Users/zq/tmp/tmp
该命令分为5个部分:

三、IDE中启动本地debuger

详细分析过程参见博文Java远程调试 。

四、远程debugee部分打印结果

map部分打印结果

raw data: key= 0, value= a c b d maper output : a 1
raw data: key= 0, value= a c b d maper output : c 1
raw data: key= 0, value= a c b d maper output : b 1
raw data: key= 0, value= a c b d maper output : d 1
raw data: key= 8, value= d b c a maper output : d 1
raw data: key= 8, value= d b c a maper output : b 1
raw data: key= 8, value= d b c a maper output : c 1
raw data: key= 8, value= d b c a maper output : a 1
raw data: key= 16, value= a c d b maper output : a 1
raw data: key= 16, value= a c d b maper output : c 1
raw data: key= 16, value= a c d b maper output : d 1
raw data: key= 16, value= a c d b maper output : b 1
raw data: key= 24, value= c a r s maper output : c 1
raw data: key= 24, value= c a r s maper output : a 1
raw data: key= 24, value= c a r s maper output : r 1
raw data: key= 24, value= c a r s maper output : s 1
raw data: key= 32, value= d s g h maper output : d 1
raw data: key= 32, value= d s g h maper output : s 1
raw data: key= 32, value= d s g h maper output : g 1
raw data: key= 32, value= d s g h maper output : h 1

reduce部分打印结果

raw data: key= a, value= 1 1 1 1
raw data: key= b, value= 1 1 1
raw data: key= c, value= 1 1 1 1
raw data: key= d, value= 1 1 1 1
raw data: key= g, value= 1
raw data: key= h, value= 1
raw data: key= r, value= 1
raw data: key= s, value= 1 1
raw data: key= a, value= 4
raw data: key= b, value= 3
raw data: key= c, value= 4
raw data: key= d, value= 4
raw data: key= g, value= 1
raw data: key= h, value= 1
raw data: key= r, value= 1
raw data: key= s, value= 2

四、附录

so tired ><…终于整理完了,希望能帮助到读到本篇博客的你~

www.htsjk.Com true http://www.htsjk.com/Hadoop/28215.html NewsArticle Hadoop 研发之远程调试详细剖析--WordCount V2.0,hadoop--wordcount 希望本篇博文能帮助到那些想快速开始Hadoop程序开发调试的朋友们~ 前言 之前学习Hadoop时,曾经错误的以为开发的Hadoop程序必...
相关文章
    暂无相关文章
评论暂时关闭