Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 40 additions & 2 deletions adaptdl/adaptdl/torch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@
import portpicker
import requests
import torch.distributed

import adaptdl.collective
import adaptdl.env
from .epoch import current_epoch, finished_epochs, remaining_epochs_until
from .data import current_dataloader, AdaptiveDataLoader, ElasticSampler
from .parallel import AdaptiveDataParallel
from .accumulator import Accumulator

import os
import getpass
logging.basicConfig(level=logging.INFO)
LOG = logging.getLogger(__name__)
LOG.setLevel(logging.INFO)
Expand Down Expand Up @@ -66,6 +66,44 @@ def init_process_group(backend):
LOG.info("torch.distributed initialized")


def write_config():
url = adaptdl.env.supervisor_url()
if url:
key = adaptdl.env.job_id()
group = adaptdl.env.num_restarts()
while True:
response = requests.get(url=f"{url}/discover_gpu/{key}/{group}")
if response.status_code != 408: # Timeout.
break
response.raise_for_status()
master_addr = response.json()[0][0]
else:
raise ValueError("supervisor url not found.")
# write to the share path
path = os.path.join(adaptdl.env.share_path(), "resource_spec.yml")
LOG.info(f"writing to {path}")

f = open(path, "w")
f.write("nodes: \n")
num_nodes = len(response.json())
for i in range(num_nodes):
f.write(f" - address: {response.json()[i][0]} \n")
f.write(f" gpus: {list(range(response.json()[i][1]))} \n")
if i == 0: # chief
master_addr = response.json()[i][0]
f.write(" chief: true \n")
else:
f.write(" ssh_config: conf \n")
f.write("ssh: \n")
f.write(" conf: \n")
f.write(f" username: '{getpass.getuser()}' \n")
f.write(" key_file: '/root/.ssh/id_rsa' \n")
f.close()
# Initialize collective module.
master_port = adaptdl.env.master_port()
adaptdl.collective.initialize(master_addr, master_port)
Comment thread
DachengLi1 marked this conversation as resolved.
Outdated


__all__ = [
"init_process_group",
"current_epoch",
Expand Down
96 changes: 96 additions & 0 deletions examples/integration/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# Copyright 2020 Petuum, Inc. All Rights Reserved.
Comment thread
DachengLi1 marked this conversation as resolved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


FROM python:3.6.12-buster
WORKDIR /root

FROM pytorch/pytorch:1.4-cuda10.1-cudnn7-runtime
WORKDIR /root
Comment thread
DachengLi1 marked this conversation as resolved.

FROM tensorflow/tensorflow:2.2.0-gpu

# Set default shell to /bin/bash
# SHELL ["/bin/bash", "-cu"]

# RUN rm -rf /etc/bash.bashrc

# Install apps
COPY adaptdl adaptdl
COPY examples/requirements.txt .

RUN cd adaptdl && python3 setup.py bdist_wheel

ARG ADAPTDL_VERSION=0.0.0
RUN ADAPTDL_VERSION=${ADAPTDL_VERSION} pip install adaptdl/dist/*.whl
RUN pip install -r requirements.txt

RUN rm -rf adaptdl/dist
WORKDIR /root
COPY examples examples_adaptdl
#COPY examples examples
#RUN apt-get update && apt-get install -y --no-install-recommends apt-utils

# autodist env
SHELL ["/bin/bash", "-cu"]

RUN rm -rf /etc/bash.bashrc

RUN apt-get update && apt-get install -y --allow-downgrades --allow-change-held-packages --no-install-recommends \
build-essential \
git \
curl \
vim \
wget \
unzip

RUN curl -O https://bootstrap.pypa.io/get-pip.py && \
python get-pip.py && \
rm get-pip.py

WORKDIR /root
COPY bert_config.json bert_config.json
COPY tf_examples.tfrecord tf_examples.tfrecord
COPY autodist autodist

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of these COPY commands cannot work in a fresh clone of the AdaptDL repo? Can you make sure this example can work in that setting? Maybe git clone autodist instead of assuming it exists locally?

RUN cd autodist
RUN pip install tensorflow_hub
RUN wget https://github.com/protocolbuffers/protobuf/releases/download/v3.11.0/protoc-3.11.0-linux-x86_64.zip
COPY autodist/protoc-3.11.0-linux-x86_64.zip protoc-3.11.0-linux-x86_64.zip
RUN unzip protoc-3.11.0-linux-x86_64.zip
RUN PROTOC=autodist/bin/protoc python autodist/setup.py build
WORKDIR autodist
RUN rm ./examples/resource_spec.yml
RUN pip install -e .[dev]

# setup ssh
# Install OpenSSH to communicate between containers
RUN apt-get install -y --no-install-recommends openssh-client openssh-server && \
mkdir -p /var/run/sshd

WORKDIR /root
RUN mkdir /root/.ssh
RUN ssh-keygen -f /root/.ssh/id_rsa && cat /root/.ssh/id_rsa.pub | cat >> /root/.ssh/authorized_keys
RUN chown -R root /root/.ssh
RUN chmod 700 /root/.ssh
RUN chmod 600 /root/.ssh/authorized_keys

RUN sed -i 's/#PermitRootLogin prohibit-password/PermitRootLogin yes/' /etc/ssh/sshd_config
RUN sed -i 's/#PubkeyAuthentication yes/PubkeyAuthentication yes/' /etc/ssh/sshd_config

# Allow OpenSSH to talk to containers without asking for confirmation
RUN cat /etc/ssh/ssh_config | grep -v StrictHostKeyChecking > /etc/ssh/ssh_config.new && \
echo " StrictHostKeyChecking no" >> /etc/ssh/ssh_config.new && \
mv /etc/ssh/ssh_config.new /etc/ssh/ssh_config

ENV PYTHONUNBUFFERED=true
25 changes: 25 additions & 0 deletions examples/integration/adaptdljob.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
apiVersion: adaptdl.petuum.com/v1
kind: AdaptDLJob
metadata:
generateName: integration-
spec:
minReplicas: 2
template:
spec:
containers:
- name: main
command:
- python3
- /root/autodist/examples/benchmark/bert.py
- -input_files=/root/tf_examples.tfrecord
- --bert_config_file=/root/bert_config.json
- --num_train_epochs=1
- --num_steps_per_epoch=1000
- --learning_rate=5e-5
- --steps_per_loop=1
- --autodist_strategy=PS
resources:
limits:
nvidia.com/gpu: 1


13 changes: 13 additions & 0 deletions examples/integration/bert_config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"attention_probs_dropout_prob": 0.1,
"hidden_act": "gelu",
"hidden_dropout_prob": 0.1,
"hidden_size": 1024,
"initializer_range": 0.02,
"intermediate_size": 4096,
"max_position_embeddings": 512,
"num_attention_heads": 16,
"num_hidden_layers": 24,
"type_vocab_size": 2,
"vocab_size": 30522
}
Binary file added examples/integration/tf_examples.tfrecord
Binary file not shown.
41 changes: 41 additions & 0 deletions sched/adaptdl_sched/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,45 @@ async def _handle_discover(self, request):
return web.json_response(pod_ip_list)
return web.json_response(status=408) # Timeout.

async def _handle_discover_gpu(self, request):
Comment thread
DachengLi1 marked this conversation as resolved.
Outdated
# Long-polling endpoint used for discoverin
# pod IPs and GPU for a given job.
namespace = request.match_info["namespace"]
name = request.match_info["name"]
group = request.match_info["group"]
timeout = int(request.query.get("timeout", "30"))
pod_ip_list = None
pod_gpu_list = None
async with kubernetes.watch.Watch() as w:
stream = w.stream(self._core_api.list_namespaced_pod, namespace,
label_selector="adaptdl/job={}".format(name),
field_selector="status.podIP!=",
timeout_seconds=timeout)
async for event in stream:
pod = event["object"]
replicas = int(pod.metadata.annotations["adaptdl/replicas"])
rank = int(pod.metadata.annotations["adaptdl/rank"])
if pod.metadata.annotations["adaptdl/group"] == group:
if pod_ip_list is None:
pod_ip_list = [None] * replicas
pod_ip_list[rank] = pod.status.pod_ip
if pod_gpu_list is None:
pod_gpu_list = [None] * replicas
container = pod.spec.containers
assert len(container) == 1
pod_gpu_list[rank] = \
int(container[0].resources.requests[
'nvidia.com/gpu'])
if all(pod_gpu is not None for pod_gpu in pod_gpu_list)\
and all(pod_ip is not None
for pod_ip in pod_ip_list):
assert len(pod_ip_list) == len(pod_gpu_list)
return_list = [(pod_ip_list[i], pod_gpu_list[i])
for i in range(len(pod_ip_list))]
LOG.info(return_list)
return web.json_response(return_list)
return web.json_response(status=408) # Timeout.

async def _handle_report(self, request):
namespace = request.match_info['namespace']
name = request.match_info['name']
Expand All @@ -85,6 +124,8 @@ def run(self):
web.get('/healthz', self._handle_healthz),
web.get('/discover/{namespace}/{name}/{group}',
self._handle_discover),
web.get('/discover_gpu/{namespace}/{name}/{group}',
self._handle_discover_gpu),
web.put('/hints/{namespace}/{name}', self._handle_report),
])
LOG.info("%s %s", self._host, self._port)
Expand Down