A2cb5da1a2ca2908a3cd8a8b27a97a4a
从零实现机器学习参数服务器框架(六)

简介

序号 名称 简介
1 框架架构 介绍整体框架架构、涉及模块
2 模型、数据计算并行 实现参数服务器中的Worker与Server
3 节点间通信接口 实现应用常用接口,使用ZeroMQ实现节点间通信
4 一致性控制 在Server端实现常见的一致性控制模式:BSP/ASP/SSP
5 容错实现 使用CheckPoint Rollback、心跳监测等方式实现计算容错
6 Yarn资源管理系统支持 编写Yarn Application以使用Yarn进行资源分配
7 机器学习算法实现 实现Logistic Regression与KMeans两种算法

在之前的框架实现中,启动一个机器学习算法应用需要使用到指定的节点ip地址与port列表,随后使用python脚本启动多个节点的任务。

然而作为一个占用整体集群节点资源的计算任务,最好是能够通过一个资源管理系统来启动,这样系统就可以统一调配整个集群的资源。Hadoop Yarn正是这样一个系统,如下图所示,一些比较著名的大数据处理框架任务都是运行在Yarn的管理之下的。我们的参数服务器框架也将运行在Yarn上,要实现这个的前提是使用Java写一个Yarn应用。

Hadoop Yarn

Hadoop Yarn

Yarn Application分为主要的两部分:ApplicationMaster与Client:

  1. 通过脚本运行预先编译的Java程序,使用Client提交任务
  2. ApplicationMaster随后将单独运行在一个container中,通过AMRMClientAsync.CallbackHandler回调启动集群任务在不同container中。

Yarn Application

Yarn 应用的启动需要使用gradle的编译,生成jar包后使用python脚本进行启动。

yarn/build.gradle

apply plugin: 'java'

version = '0.5'

repositories {
    jcenter()
    mavenCentral()
}

dependencies {
// 依赖
  compile 'org.slf4j:jcl-over-slf4j:1.7.2'
    compile 'commons-io:commons-io:2.1'
    compile 'org.apache.hadoop:hadoop-yarn-server-tests:2.4.0'
    compile 'org.apache.hadoop:hadoop-common:2.4.0'
    compile 'org.apache.hadoop:hadoop-mapreduce-client-core:2.4.0'
    compile 'org.apache.hadoop:hadoop-yarn-client:2.4.0'
    compile 'junit:junit:4.10'
}

使用gradle可以方便的为Java程序添加依赖。当前Android官方开发中也是使用这种方法进行Android程序的编译的。

使用命令gradle build将会生成yarn-0.5.jar的可执行程序到yarn/build/libs目录下。

Client

yarn/local/launch_on_yarn.py

#!/usr/bin/env python

import os
from os.path import dirname
from os.path import join
import time

app_dir = dirname(dirname(dirname(os.path.realpath(__file__))))
proj_dir = dirname(app_dir)

# 参数设置
params = {
    "jar": join(proj_dir, "yarn/build/libs/yarn-0.5.jar")
    , "launch_script_path": join(app_dir, "yarn/local/yarn_example.py")
    , "container_memory": 500
    , "container_vcores": 1
    , "master_memory": 350
    , "priority": 10
    , "num_nodes": 1
}

# 使用hadooop 启动Client
cmd = "hadoop jar "
cmd += join(proj_dir, "yarn/build/libs/yarn-0.5.jar") + " "
cmd += "cn.edu.buaa.act.petuumOnYarn.Client"
cmd += "".join([" --%s %s" % (k,v) for k,v in params.items()])
print cmd
os.system(cmd)

yarn/Client.java

public boolean run() throws IOException, YarnException {
        ...
        appContext.setAMContainerSpec(amContainer);

        Priority pri = Priority.newInstance(amPriority);
        appContext.setPriority(pri);
        appContext.setQueue(amQueue);

        LOG.info("Submitting application to ASM");

        yarnClient.submitApplication(appContext);
}

通过各类参数初始化,最终Client将会提交任务到Yarn系统上。

ApplicationMaster

yarn/ApplicationMaster.java

```java
public void run() throws YarnException, IOException {
LOG.info("Starting ApplicationMaster");

...
String appSubmitterUserName = System
    .getenv(ApplicationConstants.Environment.USER.name());
appSubmitterUgi = UserGroupInformation
    .createRemoteUser(appSubmitterUserName);
appSubmitterUgi.addCredentials(credentials);

// 设置ResourceManager回调
AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
amRMClient.init(conf);
amRMClient.start();

containerListener = createNMCallbackHandler();
nmClientAsync = new NMClientAsyncImpl(containerListener);
nmClientAsync.init(conf);
nmClientAsync.start();

appMasterHostname = NetUtils.getHostname();
RegisterApplicationMasterResponse response = amRMClient
    .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
                               appMasterTrackingUrl);
...
top Created with Sketch.