个性化阅读
专注于IT技术分析

从本地计算机到具有Terraform的Dask集群

本文概述

为了不断改进回购并将其转变为市场领导者, 我们最近决定解决客户服务代理商的挑战。第一步, 创建了带标签的电子邮件转储, 并设定了第一个目标:构建一个POC, 该POC自动标记电子邮件。为此, 必须使用NLP, 并且必须执行冗长的(和贪婪的)网格搜索。如此冗长, 以至于笔记本的4个核心工作了几个小时而没有结果。这就是我决定探索dask及其同级对象时的重点。

在本教程中, 你将探索如何将使用Scikit-Learn进行网格搜索的本地代码带到AWS(EC2)节点的集群中。

从本地开始:Dask和LocalCluster

你从数据加载和网格搜索超参数的最小示例开始。该项目的结构可能是:

.
├── data
├── models
└── src

在./src中, 你可能会包含一些想要在项目中或更复杂的管道中使用的特殊工具, 函数和类。本教程稍后将显示如何在分布式环境中包括这些工具。

请注意, 它具有Python项目的结构, 并且应在项目的根目录中包含setup.py。

让我们从一个简单的例子开始:

from sklearn.datasets import load_digits
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV
# from src import myfoo # An example included from `src`

param_space = {'C': [1e-4, 1, 1e4], 'gamma': [1e-3, 1, 1e3], 'class_weight': [None, 'balanced']}

model = SVC(kernel='rbf')

digits = load_digits()

search = GridSearchCV(model, param_space, cv=3)
search.fit(digits.data, digits.target)

可以在此处定义的Docker映像中找到该示例的详细说明。你可以通过克隆此存储库并运行以下命令来进行尝试:

docker build . -t dask-example
docker run --rm dask-example ./gridsearch_local.py

到现在为止还挺好。但是, 想象一下数据集更大并且超参数的空间更复杂。事情几乎变成不可能在本地计算机上运行。在这一点上, 至少有两种可能的行动方案:

  1. 使用更多的计算能力
  2. 优化搜索和/或更聪明

在本文中, 你将采用前者。将本地计算机扩展到群集的一种看似简单的方法很简单。首先, 留在本地计算机上, 让我们试用LocalCluster。签出gridsearch_local_dask.py, 你可以尝试

docker run -it --rm dask-example ./gridsearch_local_dask.py

感觉已经快了一点, 不是吗?但是, 你需要横向扩展, 并为此目的, 要拥有一组可以使用的EC2节点。主要有两个步骤:

  1. 将计算环境捆绑在Docker映像中
  2. 运行一个dask集群, 其中每个节点都有计算环境

将计算环境与Docker捆绑在一起

为了使dask集群正常运行, 每个节点必须具有相同的计算环境。 Docker是实现这一目标的直接方法。要做的方法是定义一个Dockerfile:

FROM continuumio/miniconda3

RUN mkdir project

COPY requirements.txt /project/requirements.txt
COPY src/ /project/src
COPY setup.py /project/setup.py
WORKDIR /project
RUN pip install -r requirements.txt

本地requirements.txt和setup.py已加载到映像中。建议在Requirements.txt中包含bokeh;否则dask的网络信息中心将无法正常运行。 Dockerfile可以包括更多步骤, 例如RUN apt-get update && apt-get install -y build-essential freetds-dev或RUN python -m nltk.downloader punkt。如果./src包含所需的类, 函数等, 请确保包含-e之类的内容。或仅仅是。在requirements.txt中;这样, 这些依赖关系将在映像中可用。在Dockerfile中包含计算环境所需的所有组件非常重要!

接下来, 应将映像放置在EC2实例可访问的位置。现在是将映像推送到Docker注册表的时候了。在本教程中, 你将使用AWS服务-ECS, 但可以使用其他选项, 例如DockerHub。我假设你已安装awscli, 并且凭据已知。你可以通过以下方式登录到注册表

# Execute from the project's root
$(aws ecr get-login --no-include-email)
docker build -t image-name .
docker tag image-name:latest repo.url/image-name:latest
docker push repo.url/image-name:latest

现在是时候设置集群的节点了。

定义Dask集群

我们采用声明性方法, 并使用terraform设置群集的节点。请注意, 在此示例中, 你利用了AWS Spots。你可以轻松地更改代码并使用常规的按需实例。这留作练习。你使用两组文件来定义集群:

  • .tf指令:由terraform解析并定义要使用的实例, 标记, 区域等。
  • 设置Shell脚本:在节点上安装所需的工具

.tf文件

使用terraform时, 将读取并连接所有.tf文件。当然还有更多细节。一个好的切入点就是这个。在我们的示例中, 你组织.tf文件的方式如下:

  • terraform.tf:常规设置
  • vars.tf:可以从CLI使用的变量定义
  • Provision.tf:有关如何调用配置脚本的说明
  • resources.tf:资源的定义
  • output.tf:terraform提供的输出的定义

terraform.tf

provider "aws" {
  region = "eu-west-1"
}

vars.tf

variable "instanceType" {
  type    = "string"
  default = "c5.2xlarge"
}

variable "spotPrice" {
  # Not needed for on-demand instances
  default = "0.1"
}

variable "contact" {
  type = "string"
  default = "d.atariah"
}

variable "department" {
  type = "string"
  default = "My wonderful department"
}

variable "subnet" {
  default = "subnet-007"
}

variable "securityGroup" {
  type = "string"
  default = "sg-42"
}

variable "workersNum" {
  default = "4"
}

variable "schedulerPrivateIp" {
  # We predefine a private IP for the scheduler; it will be used by the workers
  default = "172.31.36.190"
}

variable "dockerRegistry" {
  default = ""
}

# By defining the AWS keys as variables we can get them from the command line
# and pass them to the provisioning scripts
variable "awsKey" {}
variable "awsPrivateKey" {}

provision.tf

data "template_file" "scheduler_setup" {
  template = "${file("scheulder_setup.sh")}" # see the shell script bellow
  vars {
    # Use the AWS keys passed from the terraform CLI
    AWS_KEY = "${var.awsKey}"
    AWS_PRIVATE_KEY = "${var.awsPrivateKey}"
    DOCKER_REG = "${var.dockerRegistry}"
  }
}

data "template_file" "worker_setup" {
  template = "${file("worker_setup.sh")}" # see the shell script bellow
  vars {
    AWS_KEY = "${var.awsKey}"
    AWS_PRIVATE_KEY = "${var.awsPrivateKey}"
    DOCKER_REG = "${var.dockerRegistry}"
    SCHEDULER_IP = "${var.schedulerPrivateIp}"
  }
}

resources.tf

这是设置的核心, 在这里你将所有内容放在一起并定义了对AWS Spot的请求。

resource "aws_spot_instance_request" "dask-scheduler" {
  ami                         = "ami-4cbe0935" # [1]
  instance_type               = "${var.instanceType}"
  spot_price                  = "${var.spotPrice}"
  wait_for_fulfillment        = true
  key_name                    = "dask_poc"
  security_groups             = ["${var.securityGroup}"]
  subnet_id                   = "${var.subnet}"
  associate_public_ip_address = true
  private_ip                  = "${var.schedulerPrivateIp}" # [2]
  user_data                   = "${data.template_file.scheduler_setup.rendered}"
  tags {
    Name = "${terraform.workspace}-dask-scheduler", Department = "${var.department}", contact = "${var.contact}"
  }
}

resource "aws_spot_instance_request" "dask-worker" {
  count                       = "${var.workersNum}" # [3]
  ami                         = "ami-4cbe0935" # [1]
  instance_type               = "${var.instanceType}"
  spot_price                  = "${var.spotPrice}"
  wait_for_fulfillment        = true
  key_name                    = "dask_poc"
  subnet_id                   = "${var.subnet}"
  security_groups             = ["${var.securityGroup}"]
  associate_public_ip_address = true
  user_data                   = "${data.template_file.worker_setup.rendered}"
  tags {
    Name = "${terraform.workspace}-dask-worker${count.index}", Department = "${var.department}", contact = "${var.contact}"
  }
}

这里有一些重要的注意事项:

  1. 我使用的AMI是针对eu-west-1的一种, 它是为Docker优化并由Amazon提供的。可以使用其他映像, 但是它们必须支持docker, 这一点很重要。
  2. 定义调度程序的专用IP。在启动工作人员时将需要使用它, 并且比找到IP更容易了解IP。
  3. 指出应雇用多少工人

output.tf

terraform允许定义各种输出。与往常一样, 可以在此处找到更多详细信息。

output "scheduler-info" {
  value = "${aws_spot_instance_request.dask-scheduler.public_ip}"
}

output "workers-info" {
  value = "${join(", ", aws_spot_instance_request.dask-worker.*.public_ip)}"
}

output "scheduler-status" {
  value = "http://${aws_spot_instance_request.dask-scheduler.public_ip}:8787/status"
}

设置脚本

resources.tf中的user_data字段指示应使用哪些脚本在节点上进行供应。你提供了两个脚本模板, 这些模板将填充terraform中所需的变量。一个用于调度程序的脚本, 另一个用于工人的脚本。

#!/bin/bash

# scheduler_setup.sh

exec > >(tee /var/log/user-data.log|logger -t user-data -s 2>/dev/console) 2>&1
set -x

echo "Installing pip"
curl -O https://bootstrap.pypa.io/get-pip.py
python get-pip.py --user
~/.local/bin/pip install awscli --upgrade --user
echo "Logging in to ECS registry"
export AWS_ACCESS_KEY_ID=${AWS_KEY}
export AWS_SECRET_ACCESS_KEY=${AWS_PRIVATE_KEY}
export AWS_DEFAULT_REGION=eu-west-1
$(~/.local/bin/aws ecr get-login --no-include-email)

# Assigning tags to instance derived from spot request
# See https://github.com/hashicorp/terraform/issues/3263#issuecomment-284387578
REGION=eu-west-1
INSTANCE_ID=$(curl -s http://169.254.169.254/latest/meta-data/instance-id)
SPOT_REQ_ID=$(~/.local/bin/aws --region $REGION ec2 describe-instances --instance-ids "$INSTANCE_ID"  --query 'Reservations[0].Instances[0].SpotInstanceRequestId' --output text)
if [ "$SPOT_REQ_ID" != "None" ] ; then
  TAGS=$(~/.local/bin/aws --region $REGION ec2 describe-spot-instance-requests --spot-instance-request-ids "$SPOT_REQ_ID" --query 'SpotInstanceRequests[0].Tags')
  ~/.local/bin/aws --region $REGION ec2 create-tags --resources "$INSTANCE_ID" --tags "$TAGS"
fi

echo "Starting docker container from image"
docker run -d -it --network host ${DOCKER_REG} /opt/conda/bin/dask-scheduler

除了最后一行外, 工作程序和调度程序的脚本是相同的。对于工人, 你应该有

docker run -d -it --network host ${DOCKER_REG} /opt/conda/bin/dask-worker ${SCHEDULER_IP}:8786

请注意, 你启动dask-worker而不是dask-scheduler, 并且专用于调度程序的专用IP。重要的是要注意–network主机。直观地, 这确保了容器的网络及其对应的主机将是相同的, 因此, 不同主机上的不同容器将能够通信。

运行集群

现在, 你可以运行集群。为此, 你需要执行两个命令。首先, terraform init。这是一个准备工具, 并准备好启动节点。接下来, 你必须应用说明。你可以通过调用以下方法进行操作:

TF_VAR_awsKey=YOUR_AWS_KEY \
TF_VAR_awsPrivateKey=YOUR_AWS_PRIVATE_KEY \
terraform apply -var 'workersNum=2' -var 'instanceType="t2.small"' \
-var 'spotPrice=0.2' -var 'schedulerPrivateIp="172.31.36.170"' \
-var 'dockerRegistry="repo.url/image-name:latest"'

请注意, 你将两个环境变量用于AWS密钥。 var.tf中定义的其他变量作为参数传递。完成后, 你可以通过以下方式访问新创建的调度程序节点:ssh -i〜/ .aws / key.pem ec2-user @ $(terraform output scheduler-info)。在集群中, 你可以在/var/log/user-data.log中查看日志。你还可以使用docker ps检查正在运行的Docker容器的状态。最后, 如果一切顺利, 你应该可以访问群集的Web界面。可以通过调用terraform输出调度程序状态来找到其地址。

集群上的Scikit-Learn网格搜索

你一直在等待的时刻:在dask群集上运行超参数网格搜索。为此, 你可以使用类似于./gridsearch_local_dask.py的代码。只需要更改客户的地址:

#!/usr/bin/env python

from sklearn.datasets import load_digits
from sklearn.svm import SVC
from sklearn.model_selection import train_test_split as tts
from sklearn.metrics import classification_report
from distributed import Client, LocalCluster
from dask_searchcv import GridSearchCV
# from src import myfoo # An example included from `src`


def main():
    param_space = {'C': [1e-4, 1, 1e4], 'gamma': [1e-3, 1, 1e3], 'class_weight': [None, 'balanced']}

    model = SVC(kernel='rbf')

    digits = load_digits()

    X_train, X_test, y_train, y_test = tts(digits.data, digits.target, test_size=0.3)

    print("Starting local cluster")
    client = Client(x.y.z.w:8786)
    print(client)

    print("Start searching")
    search = GridSearchCV(model, param_space, cv=3)
    search.fit(X_train, y_train)

    print("Prepare report")
    print(classification_report(
        y_true=y_test, y_pred=search.best_estimator_.predict(X_test))
    )


if __name__ == '__main__':
    main()

运行此脚本将在dask群集上启动网格搜索。可以在Web仪表板上对其进行监视。如果你在x.y.z.w上有一个正在运行的集群, 则可以尝试一下:

docker run -it --rm -p 8786:8786 dask-example ./gridsearch_cluster_dask.py x.y.z.w

尚待讨论

  • 你可能想要探索terraform工作区;这可以帮助你从同一目录运行多个集群。例如, 当同时运行不同的实验时。
  • 使用Jupyter服务器启用节点, 因此不需要本地笔记本
赞(0)
未经允许不得转载:srcmini » 从本地计算机到具有Terraform的Dask集群

评论 抢沙发

评论前必须登录!