00001 __copyright__ = """
00002 Copyright 2008 Henryk Modzelewski
00003 """
00004 __license__ = """
00005 This file is part of SLIMpy .
00006
00007 SLIMpy is free software: you can redistribute it and/or modify
00008 it under the terms of the GNU Lesser General Public License as published by
00009 the Free Software Foundation, either version 3 of the License, or
00010 (at your option) any later version.
00011
00012 SLIMpy is distributed in the hope that it will be useful,
00013 but WITHOUT ANY WARRANTY; without even the implied warranty of
00014 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
00015 GNU Lesser General Public License for more details.
00016
00017 You should have received a copy of the GNU Lesser General Public License
00018 along with SLIMpy . If not, see <http://www.gnu.org/licenses/>.
00019 """
00020
00021 '''
00022 A simple parallel execution of arbitrary commands
00023 '''
00024
00025
00026 import os
00027 import sys
00028 if sys.version_info < (2,4,0):
00029 print "FATAL ERROR: Too old version of python. Use 2.4 or newer."
00030 sys.exit(1)
00031 import time
00032 import subprocess
00033 import threading
00034 from datetime import timedelta,datetime
00035
00036
00037 RSH = 'ssh'
00038 TIMEOUT = timedelta(0,3600L)
00039 TIMECHECK = 10
00040 SHOWTIMES = False
00041 VERBOSE = False
00042 KILL = True
00043
00044 def ExecCMD(nodes,cmd,
00045 RSH=RSH,TIMEOUT=TIMEOUT,TIMECHECK=TIMECHECK,SHOWTIMES=SHOWTIMES,VERBOSE=VERBOSE,KILL=KILL):
00046 '''
00047 Execute a command on the remote nodes
00048
00049 Requires:
00050 Standard python modules.
00051
00052 Parameters:
00053 nodes: list of nodes (user@hostname or hostname)
00054 cmd: a command string (with &&, ||, ;, or pipes)
00055
00056 Keyword parameters (optional):
00057 RSH: command to execute remote shell
00058 TIMEOUT: timedelta object with timeout for each command
00059 TIMECHECK: how often to check for progress in seconds
00060 SHOWTIMES: show start, end, and wall times
00061 VERBOSE: be verbose
00062 KILL: kill the command exceeding the timeout
00063
00064 Returns:
00065 Cumulative status of all nodes (0 is OK)
00066
00067 Notes:
00068 None so far
00069
00070 Bugs:
00071 None so far
00072 '''
00073 if VERBOSE:
00074 print 'Nodes:',nodes
00075 print 'Command:',cmd
00076 print 'RSH:',RSH
00077 print 'TIMEOUT:',TIMEOUT
00078 print 'TIMECHECK:',TIMECHECK
00079 print 'SHOWTIMES:',SHOWTIMES
00080 print 'KILL:',KILL
00081
00082
00083 status = 0
00084 threads = {}
00085
00086
00087 try:
00088
00089 for key in range(len(nodes)):
00090 threads[key] = RemoteCMD(RSH,nodes[key],cmd)
00091 threads[key].start()
00092 if VERBOSE:
00093 while not threads[key].pid: time.sleep(1)
00094 print threads[key].remote, '\n\t@', threads[key].stime,
00095 print 'PPID:', threads[key].ppid, 'PID:', threads[key].pid
00096
00097 while ( check_active(threads,TIMEOUT,KILL) > 0 ):
00098 while ( check_passive(threads,TIMEOUT,KILL) < 1 ): time.sleep(TIMECHECK)
00099 printlist = get_passive(threads)
00100 for key in printlist:
00101 status += printout(threads[key],SHOWTIMES)
00102 del threads[key]
00103 except KeyboardInterrupt:
00104
00105 print '\rException KeyboardInterrupt requested.'
00106 print '\tKilling all active remote threads.'
00107 for key in threads.keys():
00108 print '\t\tKilling ', threads[key].remote
00109 threads[key].abort()
00110
00111
00112 if len(threads.keys()) > 0:
00113 print 'Showing leftovers'
00114 for key in threads.keys():
00115 status += printout(threads[key],SHOWTIMES)
00116 del threads[key]
00117
00118 return status
00119
00120 def ExecCMDs(nodes,cmds,
00121 RSH=RSH,TIMEOUT=TIMEOUT,TIMECHECK=TIMECHECK,SHOWTIMES=SHOWTIMES,VERBOSE=VERBOSE,KILL=KILL):
00122 '''
00123 Execute a set of commands on the remote nodes
00124
00125 Requires:
00126 Standard python modules.
00127
00128 Parameters:
00129 nodes: list of nodes (user@hostname or hostname)
00130 cmds: a list of commands (separated by &&, ||, ;, or pipes) per element
00131
00132 Keyword parameters (optional):
00133 RSH: command to execute remote shell
00134 TIMEOUT: timedelta object with timeout for each command
00135 TIMECHECK: how often to check for progress in seconds
00136 SHOWTIMES: show start, end, and wall times
00137 VERBOSE: be verbose
00138 KILL: kill the command exceeding the timeout
00139
00140 Returns:
00141 Cumulative status of all commands (0 is OK)
00142
00143 Notes:
00144 None so far
00145
00146 Bugs:
00147 None so far
00148 '''
00149 if VERBOSE:
00150 print 'Nodes:',nodes
00151 print 'Commands:',cmds
00152 print 'RSH:',RSH
00153 print 'TIMEOUT:',TIMEOUT
00154 print 'TIMECHECK:',TIMECHECK
00155 print 'SHOWTIMES:',SHOWTIMES
00156 print 'KILL:',KILL
00157
00158
00159 status = 0
00160 passive = list(range(len(nodes)))
00161 passive.reverse()
00162 threads = {}
00163
00164
00165 try:
00166
00167 for i in range(len(cmds)):
00168 try: key=passive.pop()
00169 except:
00170 while ( check_passive(threads,TIMEOUT,KILL) < 1 ): time.sleep(TIMECHECK)
00171 passive.extend(get_passive(threads))
00172 for key in passive:
00173 status += printout(threads[key],SHOWTIMES)
00174 del threads[key]
00175 key=passive.pop()
00176 threads[key] = RemoteCMD(RSH,nodes[key],cmds[i])
00177 threads[key].start()
00178 if VERBOSE:
00179 while not threads[key].pid: time.sleep(1)
00180 print threads[key].remote, '\n\t@', threads[key].stime,
00181 print 'PPID:', threads[key].ppid, 'PID:', threads[key].pid
00182
00183 while ( check_active(threads,TIMEOUT,KILL) > 0 ):
00184 while ( check_passive(threads,TIMEOUT,KILL) < 1 ): time.sleep(TIMECHECK)
00185 printlist = get_passive(threads)
00186 for key in printlist:
00187 status += printout(threads[key],SHOWTIMES)
00188 del threads[key]
00189 except KeyboardInterrupt:
00190
00191 print '\rException KeyboardInterrupt requested.'
00192 print '\tKilling all active remote threads.'
00193 for key in threads.keys():
00194 print '\t\tKilling ', threads[key].remote
00195 threads[key].abort()
00196
00197
00198 if len(threads.keys()) > 0:
00199 print 'Showing leftovers'
00200 for key in threads.keys():
00201 status += printout(threads[key],SHOWTIMES)
00202 del threads[key]
00203
00204 return status
00205
00206 class RemoteCMD(threading.Thread):
00207 '''
00208 Remote command definitions and execution
00209 '''
00210 def __init__(self,rsh,host,cmd):
00211 threading.Thread.__init__(self)
00212 self.host = host
00213 self.contents = ['NULL'+'\n']
00214 self.account = host
00215 if self.host == 'localhost':
00216 self.cmd = cmd
00217 self.remote = self.cmd
00218 else:
00219 self.cmd = '\''+cmd+'\''
00220 self.remote = ' '.join([rsh,self.account,self.cmd])
00221 self.ppid = None
00222 self.pid = None
00223 self.stime = None
00224 self.etime = None
00225 self.status = None
00226 self.contents = None
00227 def run(self):
00228
00229 self.ppid = os.getpid()
00230 self.stime = datetime.now()
00231 pout = os.tmpfile()
00232 sP = subprocess.Popen([self.remote], shell=True, bufsize=-1,
00233 stdin=None, stdout=pout, stderr=subprocess.STDOUT, close_fds=False)
00234
00235 self.pid = sP.pid
00236 self.status = sP.wait()
00237 self.etime = datetime.now()
00238 pout.flush()
00239 pout.seek(0,0)
00240 self.contents = pout.readlines()
00241 pout.close()
00242
00243 def abort(self):
00244 while not self.pid: time.sleep(1)
00245 try: os.kill(self.pid,9)
00246 except: pass
00247 else: self.join(30)
00248
00249 def check_active(t,tout,kill):
00250 '''
00251 returns the number of RSH threads that are active
00252 and kill threads exceeding WALTIME
00253 '''
00254 now = datetime.now()
00255 active = 0
00256 for key in t.keys():
00257 if t[key].isAlive():
00258 if ( now-t[key].stime > tout) :
00259 print 'WALL-TIME EXCEEDED for PID', t[key].pid, ':', t[key].remote
00260 if kill: t[key].abort()
00261 active += 1
00262 return active
00263
00264 def check_passive(t,tout,kill):
00265 '''
00266 returns the number of RSH threads that are passive
00267 and kill threads exceeding WALTIME
00268 '''
00269 now = datetime.now()
00270 passive = 0
00271 for key in t.keys():
00272 passive += 1
00273 if t[key].isAlive():
00274 if ( now-t[key].stime > tout ):
00275 print 'WALL-TIME EXCEEDED for PID', t[key].pid, ':', t[key].remote
00276 if kill: t[key].abort()
00277 passive -= 1
00278 return passive
00279
00280 def get_passive(t):
00281 '''
00282 returns the list of RSH threads that are passive
00283 '''
00284 passive = []
00285 for key in t.keys():
00286 if not t[key].isAlive():
00287 passive.append(key)
00288 return passive
00289
00290 def printout(res,shawtime):
00291 '''
00292 print thread results
00293 '''
00294 if res.isAlive():
00295 print 'WARNING: trying to kill again the process that should be gone by now'
00296 print ' / '.join([res.remote,str(res.ppid),str(res.pid)])
00297 res.abort()
00298 dummy = res.host.split()
00299 if ( len(dummy) > 1 ): dummy = '@'.join(dummy[1:])
00300 else: dummy = dummy[0]
00301 hostname = dummy
00302 hostcmd = res.cmd
00303 hostout = res.contents
00304 timing = res.etime-res.stime
00305 if shawtime:
00306 print
00307 print hostname+':','COMMAND:',hostcmd
00308 print hostname+':','START-TIME:',res.stime
00309 for line in hostout:
00310 print hostname+':',line,
00311 if shawtime:
00312 print hostname+':','END-TIME:',res.etime
00313 print hostname+':','WALL-TIME:',timing
00314 sys.stdout.flush()
00315 return res.status
00316
00317 def get_nodes(nodelist):
00318 '''
00319 get list of nodes from the file
00320 '''
00321 try:
00322 nodes = open(nodelist,'r').readlines()
00323 except:
00324 print 'FATAL ERROR: file',nodelist,'not found'
00325 sys.exit(1)
00326 nodes = (''.join(nodes)).splitlines()
00327
00328 for i in range(len(nodes)):
00329 dummy = nodes[i].split('@')
00330 if ( len(dummy) > 1 ):
00331 nodes[i] = ' '.join(['-l']+dummy)
00332 return nodes
00333
00334 def get_cmds(cmdlist,exedir):
00335 '''
00336 get list of commands from the file
00337 end append common execution directory if not None
00338 '''
00339 try:
00340 cmds = open(cmdlist,'r').readlines()
00341 except:
00342 print 'FATAL ERROR: file',cmdlist,'not found'
00343 sys.exit(1)
00344 cmds = (''.join(cmds)).splitlines()
00345
00346 if exedir:
00347 for i in range(len(cmds)):
00348 cmds[i] = 'cd '+exedir+' && '+cmds[i]
00349 return cmds
00350