说明:本文基于Hadoop1.0.0版本源码进行分析。
(一)概述
本文基于Hadoop1.0.0版本的源代进行分析,研究用户从输入作业提交命令到作业提交到jobTracker的整个流程,其中涉及到的组件JobClient和JobTracker的具体工作细节。
(二)具体分析
从源代码来看,hadoop作业的提交过程是比较简单的,主要包含了几个过程:运行提交作业脚本、创建目录、上传作业文件以及产生InputSplit文件。
(1)提交作业命令过程
假设用户用java写了一个MapReduce程序,并且打包成了一个jar文件,wordCount.jar,然后运行下面命令进行作业的提交操作:
$HADOOP_HOME/bin/hadoop jar xx.jar \
-D mapred.job.name="wordCount" \
-D mapred.reduce.tasks=5 \
-files=resources1.txt,resources2.txt \
-libjars=depend.jar \
-archives=dictionary.zip \
-input /test/input \
-output /test/output
然后,我们在来看看$HADOOP_HOME/bin/hadoop 脚本对作业提交jar命令处理,调用了org.apache.hadoop.util.RunJar类。
在RunJar类中通过unJar(File jarFile, File toDir)方法对jar进行解压;创建相应的临时目录然后将运行参数传递给MapReduce程序运行。源码定位到org.apache.hadoop.util.RunJar类的main方法:
/** Run a Hadoop job jar. If the main class is not in the jar's manifest, * then it must be provided on the command line. */ public static void main(String[] args) throws Throwable { String usage = "RunJar jarFile [mainClass] args..."; if (args.length < 1) { System.err.println(usage); System.exit(-1); } int firstArg = 0; String fileName = args[firstArg++]; File file = new File(fileName); String mainClassName = null; JarFile jarFile; try { jarFile = new JarFile(fileName); } catch(IOException io) { throw new IOException("Error opening job jar: " + fileName) .initCause(io); } Manifest manifest = jarFile.getManifest(); if (manifest != null) { mainClassName = manifest.getMainAttributes().getValue("Main-Class"); } jarFile.close(); if (mainClassName == null) { if (args.length < 2) { System.err.println(usage); System.exit(-1); } mainClassName = args[firstArg++]; } //进行相应的一些目录处理工作 mainClassName = mainClassName.replaceAll("/", "."); File tmpDir = new File(new Configuration().get("hadoop.tmp.dir")); tmpDir.mkdirs(); if (!tmpDir.isDirectory()) { System.err.println("Mkdirs failed to create " + tmpDir); System.exit(-1); } final File workDir = File.createTempFile("hadoop-unjar", "", tmpDir); workDir.delete(); workDir.mkdirs(); if (!workDir.isDirectory()) { System.err.println("Mkdirs failed to create " + workDir); System.exit(-1); } Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { try { FileUtil.fullyDelete(workDir); } catch (IOException e) { } } }); unJar(file, workDir);//解压jar包 ArrayList<URL> classPath = new ArrayList<URL>(); classPath.add(new File(workDir+"/").toURL()); classPath.add(file.toURL()); classPath.add(new File(workDir, "classes/").toURL()); File[] libs = new File(workDir, "lib").listFiles(); if (libs != null) { for (int i = 0; i < libs.length; i++) { classPath.add(libs.toURL()); } } ClassLoader loader = new URLClassLoader(classPath.toArray(new URL[0])); Thread.currentThread().setContextClassLoader(loader); Class<?> mainClass = Class.forName(mainClassName, true, loader); Method main = mainClass.getMethod("main", new Class[] { Array.newInstance(String.class, 0).getClass() }); String[] newArgs = Arrays.asList(args) .subList(firstArg, args.length).toArray(new String[0]); try { main.invoke(null, new Object[] { newArgs }); } catch (InvocationTargetException e) { throw e.getTargetException(); } } 用户在提交MapReduce程序是已经设置好了各种参数,像作业名称、ReduceTask和MapTask类等,最终调用JobClient类的runJob方法,如果是新API则调用waitForCompletion(true)方法进行作业的提交,之后经过以下步骤作业将达到JobTracker端。
(2)作业文件上传过程
作业从JobClient提交到JobTracker之前,首先会将作业文件和生成的Split文件上传到HDFS中(生成Split文件将会在下面第三小节说),上传动作主要在JobClient类中的submitJobInternal(job)中完成,下面来仔细看看源代码:
/** * Internal method for submitting jobs to the system. * @param job the configuration to submit * @return a proxy object for the running job * @throws FileNotFoundException * @throws ClassNotFoundException * @throws InterruptedException * @throws IOException */ public RunningJob submitJobInternal(final JobConf job ) throws FileNotFoundException, ClassNotFoundException, InterruptedException, IOException { /* * configure the command line options correctly on the submitting dfs */ return ugi.doAs(new PrivilegedExceptionAction<RunningJob>() { public RunningJob run() throws FileNotFoundException, ClassNotFoundException, InterruptedException, IOException{ JobConf jobCopy = job; Path jobStagingArea = JobSubmissionFiles.getStagingDir(JobClient.this, jobCopy); JobID jobId = jobSubmitClient.getNewJobId(); Path submitJobDir = new Path(jobStagingArea, jobId.toString()); jobCopy.set("mapreduce.job.dir", submitJobDir.toString()); JobStatus status = null; try { populateTokenCache(jobCopy, jobCopy.getCredentials()); copyAndConfigureFiles(jobCopy, submitJobDir); // get delegation token for the dir TokenCache.obtainTokensForNamenodes(jobCopy.getCredentials(), new Path [] {submitJobDir}, jobCopy); Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir); int reduces = jobCopy.getNumReduceTasks(); InetAddress ip = InetAddress.getLocalHost(); if (ip != null) { job.setJobSubmitHostAddress(ip.getHostAddress()); job.setJobSubmitHostName(ip.getHostName()); } JobContext context = new JobContext(jobCopy, jobId); // Check the output specification if (reduces == 0 ? jobCopy.getUseNewMapper() : jobCopy.getUseNewReducer()) { org.apache.hadoop.mapreduce.OutputFormat<?,?> output = ReflectionUtils.newInstance(context.getOutputFormatClass(), jobCopy); output.checkOutputSpecs(context); } else { jobCopy.getOutputFormat().checkOutputSpecs(fs, jobCopy); } jobCopy = (JobConf)context.getConfiguration(); // Create the spl微软私有云Azure Pack实践系列之三创建虚拟机角色 阅读原文» 在Azure Pack for Windows(以下及以后简称WAP)版本中,通过租户门户部署的虚拟机有两种类型,一种是独立虚拟机,另外一种是虚拟机角色。
独立虚拟机类型,和在SCVMM中使用的虚拟机模板一样,模板中有什么就部署什么,而且一般只是操作系统,可定制性较差,部署后的扩展性基本上没有。
虚拟机角色类型,则类似于SCVMM中的服务模板一样,可以根据企业或个人需要进行定制,或者安装一些软件或应用,让用户在使用上有了更大的灵活性,更好的是在安装完成后,可以扩展,尤其用于WEB服务器、SQL实例等这样的场景。
本文有点跨度,直接进入WAP虚拟机角色的使用阶段,实际上,前期要有一些准备工作如:在SCVMM中在设置虚拟机角色使用的磁盘文件(操作系统类型、FamilyName、Tag等),并导入资源扩展包,并且要把磁盘文件放在共享库中等。在WAP中,要导入对应的库项资源定义包等。这些准备工作,我将另文详写说明如何实现。
Ok,接下来进行设置,如何创建虚拟机角色及一些扩展的使用。
1. 使用租户登录WAP的租户门户,在虚拟机项,依次点击新建―虚拟机角色―从库中,并点击"从库中创建虚拟机角色",会出现虚拟机创建及设置的向导。
2.在"虚拟机角色配置"界面,选择之前创建或是导入的库项资源定义文件,本实践中只有一个"domain controller Windows 2012",(备注:如何创作或是导入,将在系列二中讲述), 点选右下角的"右向箭头"。
3.在"提供虚拟机角色设置"界面,在"名称"栏后输入名称,注意的是,这个名称只是虚拟机的角色的名称。并非在HyperV和SCVMM中显示的名称,更非计算机名称。"版本"后的版本号为创建和导入时设置,引处无需修改,且不能修改(在VMM中,导入资源扩展包时自定义)。点选右下角的"右向箭头"。
4. 在"Virtual Machine properties"界面,需设置要创建虚拟机的属性,包括:CPU核心数及内存大小、操作系统盘使用的磁盘文件、本地管理员密码、时区等(注意中间的灰色条状下拉后,还有显示,但不需要设置)。具体设置在以下步骤中详细描述,注意的是,均在本界面配置。
4-1. 选择新建虚拟机的硬件配置大小:
4-2. 选择需要使用的磁盘文件:
4-3. 设定本地管理员(因是创建域,为域管理员)密码及设定相应的时区,计算机命名、使用的网络等。
计算机命名规则可以修改,###表示数字,是从001开始的,可以根据实际需要选择是几个#来表示。
本界面设置完成,点选右下角的"右向箭头"。
5. 在"Active Directory Properties"界面,输入你要建的域名及安全模式的密码。点选右下角的"对号",开始虚拟机角色的创建进程。
6. 下图是在WAP中的进展状态,很不好的就是,提供的信息太简单了,只有短短的两行字。但愿在新版中,能有所改进,能把在VMM中详细的进展状态给搞过来(不过,太多了是不是会影响体验呢,哈哈。纠结吧)
7. 这是在VMM控制台中的进展状态,这个比较详细哟。
8. 在HyperV管理器中的状态显示
9. 在这里,以两张图来说明建域的过程:
在VMM控制台中,作业状态显示为99%时,会有一段时间的耗用,后经观察,从99%至完成,是升级独立服务器至域控制器的时间。
10. 显示创建完成
11. 下第二图显示了,角色创建完成后,此角色的使用概览(点击10中的角色名称进入,仪表盘,就能显示,注意,不是虚拟机的使用概览)。
12. 点击11中的第1个图示中的"实例",此处显示的就是虚拟机的名称了。
可以对此进行所谓生命周期的管理:创建、连接、删除、启动和停止、修改(通过缩放)等。 <
没有评论:
发表评论