00001 """
00002 Central thread scheduler info center a common ground for
00003 Worker nodes and main scheduler node to sync
00004 """
00005
00006 from __future__ import with_statement
00007
00008 __copyright__ = """
00009 Copyright 2008 Sean Ross-Ross
00010 """
00011 __license__ = """
00012 This file is part of SLIMpy .
00013
00014 SLIMpy is free software: you can redistribute it and/or modify
00015 it under the terms of the GNU Lesser General Public License as published by
00016 the Free Software Foundation, either version 3 of the License, or
00017 (at your option) any later version.
00018
00019 SLIMpy is distributed in the hope that it will be useful,
00020 but WITHOUT ANY WARRANTY; without even the implied warranty of
00021 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
00022 GNU Lesser General Public License for more details.
00023
00024 You should have received a copy of the GNU Lesser General Public License
00025 along with SLIMpy . If not, see <http://www.gnu.org/licenses/>.
00026 """
00027
00028
00029
00030 from slimpy_base.Core.MutiProcessUtils.JobPosting import JobPosting
00031 from slimpy_base.Environment.InstanceManager import InstanceManager
00032 from slimpy_base.Environment.Singleton import Singleton
00033 from os import kill
00034 from os.path import isfile
00035 from signal import SIGALRM, alarm, signal, SIGKILL
00036 from threading import RLock, Event, BoundedSemaphore
00037 from time import sleep
00038 from pdb import set_trace
00039 from threading import currentThread
00040 import pdb
00041 import sys
00042
00043
00044 def action( signum, stack_frame ):
00045 '''
00046 action to catch alarm
00047 '''
00048 print
00049 print "WallTime exceded"
00050 print
00051 env = InstanceManager()
00052 env['center'].error = True
00053
00054
00055 signal( SIGALRM, action )
00056
00057 class EmploymentCenter( Singleton ):
00058 """
00059 Singleton class to store the state of the multi core runner
00060 communicates with the workers and scheduler
00061 """
00062
00063 def __new_instance__( self, name ):
00064 '''
00065 create a new instance
00066 '''
00067 Singleton.__new_instance__( self, name )
00068
00069 self.lock = RLock()
00070
00071 with self:
00072 self._jobs_posted = 0
00073 self._jobs_finished = 0
00074 self._done = False
00075 self._error = False
00076 self._error_message = None
00077 self._pids = set()
00078 self._aborted_pids = set()
00079 self._waiting_list = set()
00080 self._event = Event()
00081 self.__node_names = [ ]
00082 self.nodes = {}
00083 self._idle = set()
00084 self._fin = set()
00085 self._head_node = ( "__main__", "__main__" )
00086
00087 self.env = InstanceManager()
00088
00089 self._semaphore = None
00090
00091 def _get_semaphore(self):
00092 if self._semaphore is None:
00093 self._semaphore = BoundedSemaphore( len(self.node_names) )
00094 return self._semaphore
00095
00096 semaphore = property( _get_semaphore )
00097
00098 def reset( self ):
00099 '''
00100 set self.done and self.error to false
00101 '''
00102 self.done = False
00103 self.error = False
00104
00105 def set_alarm( self ):
00106 '''
00107 set the alarm for the amount of time
00108 walltime given minus the time of the
00109 longest running job
00110 '''
00111 with self:
00112 if self._jobs_posted == self._jobs_finished:
00113 aval = 0
00114 else:
00115 max_time = 0
00116 walltime = self.env['slimvars']['walltime']
00117 for procs in self.nodes.itervalues():
00118 for proc in procs.itervalues():
00119 time_passed = proc.time_passed
00120 if time_passed > max_time:
00121
00122 max_time = int( time_passed )
00123 aval = walltime - max_time
00124
00125 log = self.env['record']
00126 print >> log( 2, 'alarm' ), "alarm set for", aval
00127 alarm( aval )
00128 return
00129
00130 def add_to_waiting_list( self, name, proc ):
00131 """
00132 add a worker to wait for a job from the
00133 scheduler
00134 """
00135
00136
00137 self._waiting_list.add( ( name, proc ) )
00138
00139 if self._head_node in self._waiting_list:
00140 all = set( [self._head_node] )
00141 for name, val in self.nodes.iteritems():
00142 for proc in val.iterkeys():
00143 all.add( ( name, proc ) )
00144 assert all.issuperset( self._waiting_list )
00145 if all == self._waiting_list:
00146 pass
00147
00148 else:
00149 pass
00150
00151 return
00152
00153 def remove_from_waiting_list( self, name, proc ):
00154 """
00155 remove a worker from the waiting_list
00156 """
00157
00158 self._waiting_list.remove( ( name, proc ) )
00159
00160
00161 def add_pid( self, pid ):
00162 """
00163 add the pid of a running job to the
00164 set of pids so they may be killed upon
00165 error
00166 """
00167 with self:
00168 self._pids.add( pid )
00169
00170 def remove_pid( self, pid ):
00171 """
00172 remove a pid from set
00173 """
00174 with self:
00175
00176 self._pids.remove( pid )
00177
00178 def abort_all( self, kill_signal=SIGKILL ):
00179 """
00180 abort all of the pids of working jobs by the
00181 kill command
00182 """
00183 log = self.env['record']( 1, 'thread' )
00184 print >> log , "Abort All:"
00185 with self.lock:
00186 print >> log, 'killing', self._pids
00187 for pid in self._pids.copy():
00188 try:
00189 kill( pid, kill_signal )
00190 except OSError:
00191 pass
00192 else:
00193
00194
00195 print >> log , "removing" , pid
00196 self.remove_pid( pid )
00197
00198 return
00199
00200 def _set_error( self, val ):
00201 """
00202 set the error value
00203 if true then all listeners will be notified
00204 """
00205 log = self.env['record'](2,'err')
00206 with self:
00207 if isinstance( val, tuple ):
00208 msg = val[1]
00209 val = val[0]
00210 else:
00211 msg = None
00212
00213 self._error_message = msg
00214
00215 if val and not self._error:
00216 print >> log, "Fatal Error Occured, Waiting for all nodes to finish", str( msg )
00217 self.set_event()
00218 self.notify()
00219
00220
00221
00222 self._error = val
00223
00224
00225 def _get_error( self ):
00226 """
00227 True if error occurred
00228 """
00229 return self._error
00230
00231
00232
00233
00234 def _get_nodenames( self ):
00235 """
00236 retuns a list of nodenames from
00237 slimvars['NODELIST'] or slimvars['NODEFILE']
00238 """
00239
00240 if 'NODELIST' in self.env['slimvars']:
00241 node_list =self.env['slimvars']['NODELIST']
00242 else:
00243 node_list = None
00244
00245 if node_list:
00246 assert isinstance( node_list, ( list, tuple ) )
00247 return self.env['slimvars']['NODELIST']
00248
00249 elif self.env['slimvars']['NODEFILE']:
00250 PBS_NODEFILE = self.env['slimvars']['NODEFILE']
00251 if isfile( PBS_NODEFILE ):
00252
00253 lines = open( PBS_NODEFILE , 'r' ).read()
00254 nodes = str.split( lines )
00255
00256 self.env['slimvars']['NODELIST'] = nodes
00257 return nodes
00258 else:
00259 raise EnvironmentError( "nodefile '%s' does not exist" %PBS_NODEFILE )
00260
00261 else:
00262 raise Exception( "no nodefile or nodelist" )
00263
00264
00265
00266
00267 def idle_add( self, name, proc ):
00268 """
00269 add idle worker
00270 """
00271 self._idle.add( ( name, proc ) )
00272
00273 def idle_discard( self, name, proc ):
00274 "discard worker from idle"
00275
00276 self._idle.discard( ( name, proc ) )
00277
00278 def set_event( self ):
00279 "notify schedular"
00280
00281 self._event.set()
00282
00283 node_names = property( _get_nodenames )
00284 error = property( _get_error, _set_error )
00285
00286 def wait_for_avail( self ):
00287 """
00288 wait for center.set_event()
00289 """
00290 self.release( )
00291
00292 self.add_to_waiting_list( "__main__", "__main__" )
00293 while not self._event.isSet():
00294 sleep( 0.01 )
00295
00296 self.remove_from_waiting_list( "__main__", "__main__" )
00297 self._event.clear()
00298
00299 self.acquire( )
00300
00301 return
00302
00303
00304 def wait_for_job( self, name, proc ):
00305 """
00306 non synchronous: worker node wait for job to be posted or error
00307 """
00308 self.release( )
00309
00310 mypost = self[name, proc]
00311 if mypost.event.isSet():
00312 pass
00313 else:
00314 self.add_to_waiting_list( name, proc )
00315 mypost.wait_for_job( )
00316 self.remove_from_waiting_list( name, proc )
00317
00318 self.acquire( )
00319
00320 return
00321
00322 def __enter__(self):
00323 self.acquire()
00324 return self
00325
00326 def __exit__( self, t, v, tb ):
00327
00328 if self.lock._RLock__owner != currentThread( ):
00329 pass
00330 else:
00331 self.release( )
00332 return
00333
00334 def subscribe( self, name, processor ):
00335 """
00336 subscribe worker to the jobs list
00337 """
00338 with self:
00339
00340 procs = self.nodes.setdefault( name , {} )
00341 procs[processor] = JobPosting( name, processor )
00342 self.idle_add( name, processor )
00343
00344 def _set_done( self, val ):
00345 """
00346 notify all that all jobs are done
00347 """
00348 self._done = val
00349 self.notify()
00350
00351 def _get_done( self ):
00352 """
00353 returns True if scheduler is done
00354 """
00355 return self._done
00356
00357 done = property( _get_done , _set_done )
00358
00359 def notify( self, nodename=None, processor=None ):
00360 """
00361 if nodename and processor are not None set then notify that
00362 worker, otherwise notify all
00363 """
00364
00365 with self:
00366
00367 if nodename is None:
00368 for node in self.nodes.values():
00369 for processor in node.values():
00370 processor.notify()
00371 else:
00372 node = self[nodename, processor]
00373 node.notify()
00374
00375 return
00376
00377 def finished( self, name, processor, job ):
00378 """
00379 add a job to the finished list and
00380 add a worker to the idle set
00381 notify the scheduler
00382 """
00383 with self:
00384 self._fin.add( job )
00385 self[name, processor].finished( job )
00386 self.idle_add( name, processor )
00387 self.set_event()
00388 self._jobs_finished += 1
00389
00390 self.set_alarm()
00391
00392 def dump_finished( self ):
00393 """
00394 return all finished jobs
00395 """
00396 with self:
00397 ret = self._fin
00398 self.__class__._fin = set()
00399
00400 return ret
00401
00402
00403
00404
00405
00406
00407 def __getitem__( self, ( name, processor ) ):
00408 """
00409 return a worker's job posting from name,processor
00410 """
00411 with self:
00412
00413 node = self.nodes[name][processor]
00414
00415 return node
00416
00417 def post( self, name, processor, job ):
00418 """
00419 post a job to the job posting for the
00420 (name, processor) worker node
00421 """
00422
00423 with self:
00424
00425 self[name, processor].post( job )
00426 self.idle_discard( name, processor )
00427 self._jobs_posted += 1
00428 self.set_alarm()
00429
00430
00431 def has_idle( self ):
00432 """
00433 returns true if the idle set is
00434 non empty
00435 """
00436 with self:
00437 ret = len( self._idle )
00438 return ret
00439
00440 def pop( self ):
00441 '''
00442 pop a (name, proc) pair from the idle list
00443 '''
00444 with self:
00445
00446 name, proc = list( self._idle.__iter__().next() )
00447
00448 return name, proc
00449
00450 def _get_idle(self):
00451
00452 return self._idle
00453
00454 idle = property( _get_idle )
00455
00456 def acquire( self ):
00457 """
00458 acquire self.lock
00459 """
00460 self.lock.acquire()
00461
00462 def release( self ):
00463 'release self.lock'
00464 if self.lock._RLock__owner != currentThread( ):
00465 raise Exception("cannot wait: this thread does not own lock")
00466 self.lock.release()
00467
00468
00469 def prettyprint( self ):
00470 """
00471 print info to screen
00472 """
00473 d_spacer = "+".join( ['-'*20]*5 )
00474 eqspacer = "+".join( ['='*20]*5 )
00475
00476 print "*"*22
00477 print "<Current Job Postings>"
00478 print len( self._idle ) , "processors are idle"
00479 print eqspacer
00480
00481 catagories = ['*Name*', '*Event*', '*doing*', '*todo*', '*finished*']
00482 title = "|".join( [ s.center( 20 ) for s in catagories] )
00483 print title
00484 print d_spacer
00485
00486
00487 for node in self.nodes.values():
00488 for proc in node.values():
00489 print self.format_poc( proc )
00490 print d_spacer
00491 print eqspacer
00492
00493 def format_poc( self, proc ):
00494 l = []
00495
00496 push = l.append
00497 push( proc.name )
00498 push( proc.event.isSet() )
00499 doing = map( str, list( proc.current ) )
00500 if proc.current:
00501 push( str( list( proc.current )[0] ) )
00502 else:
00503 push( 'None' )
00504
00505 num_todo = len( proc.todo )
00506 if num_todo > 0:
00507 tdstr = list( proc.todo )[0]
00508 if num_todo > 1:
00509 tdstr = tdstr+' ...'
00510 push( tdstr )
00511 else:
00512 push( 'None' )
00513
00514 push( len( proc.finished_jobs ) )
00515 return "|".join( [ str( s ).center( 20 ) for s in l] )
00516
00517 def display_stats( self ):
00518 '''
00519 display stats of finish scheduler
00520 '''
00521 col_space = 20
00522 d_spacer = "+".join( ['-'*col_space]*5 )
00523 eqspacer = "+".join( ['='*col_space]*5 )
00524 catagories = ['*Node Name*', '*Num Jobs*', '*Total Time*', '*Time Idle*', '*Utility*']
00525 title = "|".join( [ s.center( col_space ) for s in catagories] )
00526
00527 print "*"*col_space
00528 print "Job Post Stats"
00529 print eqspacer
00530 print title
00531 num_jobs = 0
00532 totaltime = 0
00533 totalidol = 0
00534
00535 for node in self.nodes.values():
00536 for proc in node.values():
00537 print d_spacer
00538 nj, tt, ti, out = self.pocstat( proc )
00539 num_jobs += nj
00540 totaltime += tt
00541 totalidol += ti
00542 print out
00543 print d_spacer
00544
00545 print "|".join( [ str( s ).center( col_space ) for s in
00546 ['Total',
00547 "%d" % num_jobs,
00548 "%.2f" % totaltime,
00549 "%.2f" % totalidol,
00550 "%.2f %%" % ( ( 1 - totalidol/totaltime )*100 ) ]] )
00551 print eqspacer
00552
00553 def pocstat( self, proc ):
00554 ls = []
00555 push = ls.append
00556
00557 push( proc.name +":" +str( proc.processor ) )
00558 nj = len( proc.finished_rec )
00559 push( nj )
00560 tt = proc.total_time()
00561 push( "%.2f sec" % tt )
00562 ti = proc.time_idol()
00563 push( "%.2f sec" % ti )
00564 push( "%.2f %%" %( 100 - proc.time_idol() / proc.total_time() *100 ) )
00565
00566 return nj, tt, ti, "|".join( [ str( s ).center( 20 ) for s in ls] )
00567
00568 def get_my_job( self, name, proc ):
00569 'returns the job of a worker'
00570 with self:
00571 job_id = self[name, proc].get()
00572 return job_id
00573
00574 def has_todo( self, name, proc ):
00575 """
00576 true if there is a job waiting for the (name,proc) worker
00577 """
00578 with self:
00579 val = self[name, proc].has_todo()
00580 return val
00581
00582 def has_nothing_todo( self, name, proc ):
00583 '''
00584 returns 'not has_todo'
00585 '''
00586 return not self.has_todo( name, proc )
00587
00588
00589