6fbe7097d21b9e5244bcea16066fda82
从零实现机器学习参数服务器框架(七)

简介

序号 名称 简介
1 框架架构 介绍整体框架架构、涉及模块
2 模型、数据计算并行 实现参数服务器中的Worker与Server
3 节点间通信接口 实现应用常用接口,使用ZeroMQ实现节点间通信
4 一致性控制 在Server端实现常见的一致性控制模式:BSP/ASP/SSP
5 容错实现 使用CheckPoint Rollback、心跳监测等方式实现计算容错
6 Yarn资源管理系统支持 编写Yarn Application以使用Yarn进行资源分配
7 机器学习算法实现 实现Logistic Regression与KMeans两种算法

通过前面的6篇文章,一个具备基础功能的参数服务器框架已经成型,这一篇文章我将通过之前的框架实现常见的两个分类、集群算法:逻辑回归与K-Means。

算法实现

Logistic Regression

逻辑回归是通过脚本来启动的,主要思路在于:

  1. 指定参数,包括hdfs路径、一致性模型类型、数据维度、存储类型等
  2. 通过分配的节点id、ip与port来依次启动节点

scripts/logistic_regression.py

# 通过local的flag来判断是在本地环境启动还是集群环境
local_debug = True if len(sys.argv) >= 2 and (sys.argv[1] == "local" or sys.argv[1] == "relocal") else False
relaunch = True if len(sys.argv) >= 2 and (sys.argv[1] == "relaunch" or sys.argv[1] == "relocal") else False

failed_node_id = -1
if relaunch:
    failed_node_id = sys.argv[2]

# 指定节点信息,可执行程序信息
hostfile = "config/localnodes" if local_debug else "config/clusternodes"
progfile = ("cmake-build-debug" if local_debug else "debug") + "/LRExample"

script_path = os.path.realpath(__file__)
proj_dir = dirname(dirname(script_path))

# 初始化基本操作
params = {
    "hdfs_namenode": "localhost" if local_debug else "proj10",
    "hdfs_namenode_port": 9000,
    "assigner_master_port": 18011,
    "input": "hdfs:///a2a" if local_debug else "hdfs:///datasets/classification/webspam",
    "kStaleness": 0,
    "kSpeculation": 5,
    "kModelType": "SSP",  # {ASP/SSP/BSP/SparseSSP}
    "kSparseSSPRecorderType": "Vector",  # {Vector/Map}
    "num_dims": 123 if local_debug else 16609143,
    "batch_size": 1,
    "num_workers_per_node": 2,
    "num_servers_per_node": 1,
    "num_local_load_thread": 2 if local_debug else 100,
    "num_iters": 1000,
    "alpha": 0.1,  # learning rate
    "with_injected_straggler": 1,  # {0/1}
    "kStorageType": "Vector",  # {Vector/Map}
    "checkpoint_toggle": True,
    "use_weight_file": False,
    "init_dump": True if local_debug else False,
    "weight_file_prefix": "",
    "heartbeat_interval": 10 if local_debug else 15, # join(proj_dir, "local/dump_")
    "checkpoint_file_prefix": "hdfs://localhost:9000/dump/dump_" if local_debug else "hdfs://proj10:9000/ybai/dump_",
    "checkpoint_raw_prefix": "hdfs:///dump/dump_" if local_debug else "hdfs:///ybai/dump_",
    "relaunch_cmd": relaunch_cmd, # hdfs://localhost:9000/dump/dump_
    "report_prefix": join(proj_dir, "local/report_lr_webspam.txt"),
    "report_interval": -1,
}

env_params = (
    "GLOG_logtostderr=true "
    "GLOG_v=-1 "
    "GLOG_minloglevel=0 "
)

# this is to enable hdfs short-circuit read (disable the warning info)
# change this path accordingly when we use other cluster
# the current setting is for proj5-10
if (local_debug is False):
    env_params += "LIBHDFS3_CONF=/data/opt/course/hadoop/etc/hadoop/hdfs-site.xml"

# 依次使用ssh启动节点
if relaunch is False:
    launch_util(progfile, hostfile, env_params, params, sys.argv)
else:
    relaunch_nodes(progfile, hostfile, env_params, params, failed_node_id)

apps/lr_example.cpp

```cpp
void Training(Node &my_node, std::vector &nodes, Node &master_node) {
// 1. 通过HDFSManager加载数据
std::vector data;
HDFSManager::Config config;
config.url = FLAGS_input;
config.worker_host = my_node.hostname;
config.worker_port = my_node.port;
config.master_port = FLAGS_assigner_master_port;
config.master_host = nodes[0].hostname;
config.hdfs_namenode = FLAGS_hdfs_namenode;
config.hdfs_namenode_port = FLAGS_hdfs_namenode_port;
config.num_local_load_thread = FLAGS_num_local_load_thread;

lib::Parser<SVMItem> parser;
std::function<SVMItem(boost::string_ref)> parse = [parser](boost::string_ref line) {
    // parse data
    return parser.parse_libsvm(line);
};
lib::AbstractDataLoader<SVMItem, std::vector<SVMItem>> loader;
loader.load(config, my_node, nodes, parse, data);

LOG(INFO) << "Finished loading data on node " << my_node.id;

// 2. 启动engine
Engine engine(my_node, nodes, master_node);
engine.StartEverything();

// 3. 创建kv_table,用来存储模型参数
nodes = engine.getNodes();
std::vector<third_party::Range> range = engine.getRanges();

uint32_t kTableId = engine.CreateTable<double>(range, model_type, storage_type, FLAGS_kStaleness);

// 4. 构建机器学习任务
MLTask task;
std::vector<WorkerAlloc> worker_alloc;
for (auto &node : nodes) {
    worker_alloc.push_back({node.id, (uint32_t) FLAGS_num_workers_per_node});
}
task.SetWorkerAlloc(worker_alloc);
task.SetTables({kTableId});

// 逻辑回归的具体逻辑
task.SetLambda([kTableId, &data, &engine, &recovering](const Info &info) {
    if (info.worker_id == 0) {
        LOG(INFO) << "Start Logistic Regression Training...";
    }

    // 根据迭代次数将数据分批
    BatchDataSampler<SVMItem> batch_data_sampler(data, FLAGS_batch_size);
    ...
    // 迭代进行训练
top Created with Sketch.