2014年1月7日星期二

hadoop作业提交过程分析(第二篇)

本邮件内容由第三方提供,如果您不想继续收到该邮件,可 点此退订
hadoop作业提交过程分析(第二篇)  阅读原文»

用户名:zengzhaozheng
文章数:8
评论数:6
访问量:671
无忧币:166
博客积分:142
博客等级:2
注册日期:2013-11-18

hadoop作业提交过程分析(源码分析第二篇)

说明:本文基于Hadoop1.0.0版本源码进行分析。

(一)概述

本文基于Hadoop1.0.0版本的源代进行分析,研究用户从输入作业提交命令到作业提交到jobTracker的整个流程,其中涉及到的组件JobClient和JobTracker的具体工作细节。

(二)具体分析

从源代码来看,hadoop作业的提交过程是比较简单的,主要包含了几个过程:运行提交作业脚本、创建目录、上传作业文件以及产生InputSplit文件。

(1)提交作业命令过程

假设用户用java写了一个MapReduce程序,并且打包成了一个jar文件,wordCount.jar,然后运行下面命令进行作业的提交操作:

$HADOOP_HOME/bin/hadoop jar xx.jar \

-D mapred.job.name="wordCount" \

-D mapred.reduce.tasks=5 \

-files=resources1.txt,resources2.txt \

-libjars=depend.jar \

-archives=dictionary.zip \

-input /test/input \

-output /test/output

然后,我们在来看看$HADOOP_HOME/bin/hadoop 脚本对作业提交jar命令处理,调用了org.apache.hadoop.util.RunJar类。

wKioL1LGZhLCsm4jAABY5n5GHsU204.jpg

在RunJar类中通过unJar(File jarFile, File toDir)方法对jar进行解压;创建相应的临时目录然后将运行参数传递给MapReduce程序运行。源码定位到org.apache.hadoop.util.RunJar类的main方法

  /** Run a Hadoop job jar.  If the main class is not in the jar's manifest,     * then it must be provided on the command line. */    public static void main(String[] args) throws Throwable {      String usage = "RunJar jarFile [mainClass] args...";      if (args.length < 1) {        System.err.println(usage);        System.exit(-1);      }      int firstArg = 0;      String fileName = args[firstArg++];      File file = new File(fileName);      String mainClassName = null;      JarFile jarFile;      try {        jarFile = new JarFile(fileName);      } catch(IOException io) {        throw new IOException("Error opening job jar: " + fileName)          .initCause(io);      }      Manifest manifest = jarFile.getManifest();      if (manifest != null) {        mainClassName = manifest.getMainAttributes().getValue("Main-Class");      }      jarFile.close();      if (mainClassName == null) {        if (args.length < 2) {          System.err.println(usage);          System.exit(-1);        }        mainClassName = args[firstArg++];      }          //进行相应的一些目录处理工作      mainClassName = mainClassName.replaceAll("/", ".");      File tmpDir = new File(new Configuration().get("hadoop.tmp.dir"));      tmpDir.mkdirs();      if (!tmpDir.isDirectory()) {        System.err.println("Mkdirs failed to create " + tmpDir);        System.exit(-1);      }      final File workDir = File.createTempFile("hadoop-unjar", "", tmpDir);      workDir.delete();      workDir.mkdirs();      if (!workDir.isDirectory()) {        System.err.println("Mkdirs failed to create " + workDir);        System.exit(-1);      }      Runtime.getRuntime().addShutdownHook(new Thread() {          public void run() {            try {              FileUtil.fullyDelete(workDir);            } catch (IOException e) {            }          }        });      unJar(file, workDir);//解压jar包      ArrayList<URL> classPath = new ArrayList<URL>();      classPath.add(new File(workDir+"/").toURL());      classPath.add(file.toURL());      classPath.add(new File(workDir, "classes/").toURL());      File[] libs = new File(workDir, "lib").listFiles();      if (libs != null) {        for (int i = 0; i < libs.length; i++) {          classPath.add(libs.toURL());        }      }      ClassLoader loader =        new URLClassLoader(classPath.toArray(new URL[0]));      Thread.currentThread().setContextClassLoader(loader);      Class<?> mainClass = Class.forName(mainClassName, true, loader);      Method main = mainClass.getMethod("main", new Class[] {        Array.newInstance(String.class, 0).getClass()      });      String[] newArgs = Arrays.asList(args)        .subList(firstArg, args.length).toArray(new String[0]);      try {        main.invoke(null, new Object[] { newArgs });      } catch (InvocationTargetException e) {        throw e.getTargetException();      }    }  

用户在提交MapReduce程序是已经设置好了各种参数,像作业名称、ReduceTask和MapTask类等,最终调用JobClient类的runJob方法,如果是新API则调用waitForCompletion(true)方法进行作业的提交,之后经过以下步骤作业将达到JobTracker端。

wKiom1LJaWHyM-mcAABTRx3RMPE210.jpg

(2)作业文件上传过程

作业从JobClient提交到JobTracker之前,首先会将作业文件和生成的Split文件上传到HDFS中(生成Split文件将会在下面第三小节说),上传动作主要在JobClient类中的submitJobInternal(job)中完成,下面来仔细看看源代码:

  /**     * Internal method for submitting jobs to the system.     * @param job the configuration to submit     * @return a proxy object for the running job     * @throws FileNotFoundException     * @throws ClassNotFoundException     * @throws InterruptedException     * @throws IOException     */    public    RunningJob submitJobInternal(final JobConf job                                 ) throws FileNotFoundException,                                          ClassNotFoundException,                                          InterruptedException,                                          IOException {      /*       * configure the command line options correctly on the submitting dfs       */      return ugi.doAs(new PrivilegedExceptionAction<RunningJob>() {        public RunningJob run() throws FileNotFoundException,        ClassNotFoundException,        InterruptedException,        IOException{          JobConf jobCopy = job;          Path jobStagingArea = JobSubmissionFiles.getStagingDir(JobClient.this,              jobCopy);          JobID jobId = jobSubmitClient.getNewJobId();          Path submitJobDir = new Path(jobStagingArea, jobId.toString());          jobCopy.set("mapreduce.job.dir", submitJobDir.toString());          JobStatus status = null;          try {            populateTokenCache(jobCopy, jobCopy.getCredentials());            copyAndConfigureFiles(jobCopy, submitJobDir);            // get delegation token for the dir            TokenCache.obtainTokensForNamenodes(jobCopy.getCredentials(),                                                new Path [] {submitJobDir},                                                jobCopy);            Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);            int reduces = jobCopy.getNumReduceTasks();            InetAddress ip = InetAddress.getLocalHost();            if (ip != null) {              job.setJobSubmitHostAddress(ip.getHostAddress());              job.setJobSubmitHostName(ip.getHostName());            }            JobContext context = new JobContext(jobCopy, jobId);            // Check the output specification            if (reduces == 0 ? jobCopy.getUseNewMapper() :              jobCopy.getUseNewReducer()) {              org.apache.hadoop.mapreduce.OutputFormat<?,?> output =                ReflectionUtils.newInstance(context.getOutputFormatClass(),                    jobCopy);              output.checkOutputSpecs(context);            } else {              jobCopy.getOutputFormat().checkOutputSpecs(fs, jobCopy);            }            jobCopy = (JobConf)context.getConfiguration();            // Create the spl
微软私有云Azure Pack实践系列之三创建虚拟机角色  阅读原文»

在Azure Pack for Windows(以下及以后简称WAP)版本中,通过租户门户部署的虚拟机有两种类型,一种是独立虚拟机,另外一种是虚拟机角色。

独立虚拟机类型,和在SCVMM中使用的虚拟机模板一样,模板中有什么就部署什么,而且一般只是操作系统,可定制性较差,部署后的扩展性基本上没有。

虚拟机角色类型,则类似于SCVMM中的服务模板一样,可以根据企业或个人需要进行定制,或者安装一些软件或应用,让用户在使用上有了更大的灵活性,更好的是在安装完成后,可以扩展,尤其用于WEB服务器、SQL实例等这样的场景。

clip_image002

本文有点跨度,直接进入WAP虚拟机角色的使用阶段,实际上,前期要有一些准备工作如:在SCVMM中在设置虚拟机角色使用的磁盘文件(操作系统类型、FamilyName、Tag等),并导入资源扩展包,并且要把磁盘文件放在共享库中等。在WAP中,要导入对应的库项资源定义包等。这些准备工作,我将另文详写说明如何实现。

clip_image004

Ok,接下来进行设置,如何创建虚拟机角色及一些扩展的使用。

1. 使用租户登录WAP的租户门户,在虚拟机项,依次点击新建―虚拟机角色―从库中,并点击"从库中创建虚拟机角色",会出现虚拟机创建及设置的向导。

clip_image005

2.在"虚拟机角色配置"界面,选择之前创建或是导入的库项资源定义文件,本实践中只有一个"domain controller Windows 2012",(备注:如何创作或是导入,将在系列二中讲述),点选右下角的"右向箭头"。

clip_image007

3.在"提供虚拟机角色设置"界面,在"名称"栏后输入名称,注意的是,这个名称只是虚拟机的角色的名称。并非在HyperV和SCVMM中显示的名称,更非计算机名称。"版本"后的版本号为创建和导入时设置,引处无需修改,且不能修改(在VMM中,导入资源扩展包时自定义)。点选右下角的"右向箭头"。

clip_image009

4. 在"Virtual Machine properties"界面,需设置要创建虚拟机的属性,包括:CPU核心数及内存大小、操作系统盘使用的磁盘文件、本地管理员密码、时区等(注意中间的灰色条状下拉后,还有显示,但不需要设置)。具体设置在以下步骤中详细描述,注意的是,均在本界面配置。

clip_image011

4-1. 选择新建虚拟机的硬件配置大小:

clip_image013

4-2. 选择需要使用的磁盘文件:

clip_image015

4-3. 设定本地管理员(因是创建域,为域管理员)密码及设定相应的时区,计算机命名、使用的网络等。

计算机命名规则可以修改,###表示数字,是从001开始的,可以根据实际需要选择是几个#来表示。

本界面设置完成,点选右下角的"右向箭头"。

clip_image017

5. 在"Active Directory Properties"界面,输入你要建的域名及安全模式的密码。点选右下角的"对号",开始虚拟机角色的创建进程。

clip_image019

6. 下图是在WAP中的进展状态,很不好的就是,提供的信息太简单了,只有短短的两行字。但愿在新版中,能有所改进,能把在VMM中详细的进展状态给搞过来(不过,太多了是不是会影响体验呢,哈哈。纠结吧)

clip_image021

7. 这是在VMM控制台中的进展状态,这个比较详细哟。

clip_image023

8. 在HyperV管理器中的状态显示

clip_image025

9. 在这里,以两张图来说明建域的过程:

在VMM控制台中,作业状态显示为99%时,会有一段时间的耗用,后经观察,从99%至完成,是升级独立服务器至域控制器的时间。

clip_image027

clip_image029

10. 显示创建完成

clip_image031

11. 下第二图显示了,角色创建完成后,此角色的使用概览(点击10中的角色名称进入,仪表盘,就能显示,注意,不是虚拟机的使用概览)。

clip_image033

clip_image035

12. 点击11中的第1个图示中的"实例",此处显示的就是虚拟机的名称了。

可以对此进行所谓生命周期的管理:创建、连接、删除、启动和停止、修改(通过缩放)等。 <

阅读更多内容

没有评论:

发表评论