[StarCluster] Running MPI over the cluster
Saurav Prakash
sauravpr at usc.edu
Thu Oct 25 04:44:13 EDT 2018
Hi,
I am running a python program for distributed implementation of
gradient descent. The gradient descent step involves an Allreduce
function to obtain the overall gradient at the workers. I have been
setting up clusters earlier without Starcluster, but recently I needed
to use large clusters and had to move to Starcluster. I was surprised
to see that the MPI.Allreduce operation is much faster in a cluster
generated by Starcluster than in a cluster set up by using traditional
methods. I am curious to know if this is an artifact, or Starcluster
optimizes the communication network somehow to enable efficient
Allreduce step. I am attaching the code for a cluster of 43 nodes (1
master and 42 workers), though I have replaced the data loading with
random data initialization to mimic the gradient step. Any insight
regarding this would be extremely helpful.
Thanks,
Saurav.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://mailman.mit.edu/pipermail/starcluster/attachments/20181025/1bedcd93/attachment.html
-------------- next part --------------
from __future__ import print_function
from time import sleep
import time
import sys
import random
import os
import numpy as np
from mpi4py import MPI
from numpy import linalg as LA
from util import *
from generate_data_helpers import *
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
expi = 2
scenario = 4
N = 42
mi = 6300
ni = 5000
# alpha = 0.1
lr = 0.000000001
c7 = lr/mi
rounds = 300
### overall data set dimensions
stragwrk = [0]*N
lclsize= mi/N
if rank == 0:
print ("At master",rank)
masterComm = comm.Split( 0, rank )
timecaliter=np.zeros((rounds,1))
erriter=np.zeros((rounds,1))
timecalt=np.zeros((rounds,1))
beta=np.zeros(ni)
# beta0 = load_data("beta_"+str(case)+"_0.dat")
beta0 = np.random.normal(0,1,(ni,))
for i1 in range(rounds):
print (i1)
comm.Barrier()
recvmsg1 = comm.gather(0, root=0)
# print rank, len(recvmsg1)
timecaliter[i1]=max(recvmsg1[1:])
timecalt[i1]=np.sum(timecaliter[0:(i1+1)])
comm.Irecv([beta, MPI.FLOAT], source=1)
erriter[i1]=pow((LA.norm(beta-beta0)/LA.norm(beta0)),2)
print (timecalt[i1])
print (erriter[i1])
df=np.matrix(timecalt)
#np.savetxt('time_uncoded_master.txt',df)
np.savetxt('AGC_time_%s_%s.txt'%(expi,scenario),df)
df=np.matrix(erriter)
#np.savetxt('time_uncoded_master.txt',df)
np.savetxt('AGC_error_%s_%s.txt'%(expi,scenario),df)
else:
rk=rank
workerComm = comm.Split( 1, rank )
strag=stragwrk[rk-1]
print ("At worker",rk)
Xi = np.random.normal(0,1,(lclsize,ni))
yi = np.random.normal(0,1,(lclsize,))
beta=np.zeros(ni)
for k in range(rounds):
grad=np.zeros(ni)
comm.Barrier()
workerComm.Barrier()
tst=time.time()
# if rank ==1:
# print(Xi.shape)
# print(yi.shape)
# print(beta.shape)
gradi=Xi.T.dot(Xi.dot(beta)-yi)
tr=time.time()-tst
if strag==1:
time.sleep(3*tr)
workerComm.Allreduce(gradi,grad,op=MPI.SUM)
print("worker gradi",gradi,grad)
beta=beta-c7*grad
bpt = time.time()
tlocal=bpt-tst
recvmsg1 = comm.gather(tlocal, root=0)
if rk==1:
sC = comm.Isend([beta, MPI.FLOAT], dest=0)
sC.wait()
# print rk,recvmsg1
More information about the StarCluster
mailing list