00001 """
00002 Container classes to work on commands on separate threads
00003 """
00004 from __future__ import with_statement
00005
00006 __copyright__ = """
00007 Copyright 2008 Sean Ross-Ross
00008 """
00009 __license__ = """
00010 This file is part of SLIMpy .
00011
00012 SLIMpy is free software: you can redistribute it and/or modify
00013 it under the terms of the GNU Lesser General Public License as published by
00014 the Free Software Foundation, either version 3 of the License, or
00015 (at your option) any later version.
00016
00017 SLIMpy is distributed in the hope that it will be useful,
00018 but WITHOUT ANY WARRANTY; without even the implied warranty of
00019 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
00020 GNU Lesser General Public License for more details.
00021
00022 You should have received a copy of the GNU Lesser General Public License
00023 along with SLIMpy . If not, see <http://www.gnu.org/licenses/>.
00024 """
00025
00026
00027
00028
00029 from slimpy_base.Core.Runners.RunnerBase import Runner
00030 from slimpy_base.Environment.InstanceManager import InstanceManager
00031 from random import randint
00032 from sys import _current_frames
00033 from thread import get_ident
00034 from threading import Thread, Lock
00035 import traceback
00036
00037
00038 printlock = Lock()
00039
00040
00041 class Worker( Thread , Runner ):
00042 """
00043 Worker runs jobs
00044 """
00045 env = InstanceManager()
00046
00047 def __init__( self, name, master_id , processor=0 ):
00048
00049 self.master_id = master_id
00050
00051 self.processor = processor
00052 self.env['center'].subscribe( name, processor )
00053 Thread.__init__( self, name=name )
00054 self.pid = 0
00055
00056 self.thread_state = "init"
00057
00058 def _get_ts(self):
00059 return self._thread_state
00060
00061 def _set_ts(self,val):
00062 self._thread_state = val
00063 return
00064
00065
00066 thread_state = property( _get_ts, _set_ts )
00067
00068
00069 def status(self):
00070 """
00071 print status of thread
00072 """
00073 if self.isAlive():
00074 f = _current_frames()[self._ident]
00075 traceback.print_stack(f)
00076 else:
00077 print "Not Alive"
00078
00079 def run( self ):
00080 '''
00081 run method needed by thread class
00082 calls self.main_loop
00083 '''
00084 self.thread_state = "running"
00085 self._ident = get_ident( )
00086 self.env.current_env = self.master_id
00087
00088 try:
00089 self.main_loop()
00090 except Exception, msg:
00091
00092 self.env['center'].error = True, ( str( self ), str( msg ) )
00093 self.safe_release_lock()
00094
00095 self.thread_state = "exit error"
00096 raise
00097
00098 self.thread_state = "exit"
00099
00100 return
00101
00102 def safe_release_lock( self ):
00103 '''
00104 class will release the all nested locks
00105 in the recursive lock
00106 '''
00107
00108 rlock = self.env['center'].lock
00109
00110 if rlock._is_owned() and rlock._RLock__owner == self:
00111
00112
00113 for _ in range( rlock._RLock__count ):
00114 rlock.release()
00115 return
00116 else:
00117 return
00118
00119
00120
00121 def getMyPost( self ):
00122 'get this workers job post'
00123 return self.env['center'][self.getName(), self.processor]
00124
00125 mypost = property( getMyPost )
00126
00127 def __str__( self ):
00128
00129 return "<Worker: node %s, processor %s, %%s jobs todo>" % ( self.getName(), self.processor)
00130
00131 def print_( self, *val ):
00132 'print using print lock'
00133
00134 log = self.env['record']( 10, 'thread' )
00135 with printlock:
00136 print >> log, self, " ".join( map( str, val ) )
00137
00138
00139 def main_loop( self ):
00140 'run until error or until done'
00141 center = self.env['center']
00142 self.thread_state = "main loop"
00143
00144 self.thread_state = "got center"
00145 name = self.getName()
00146 proc = self.processor
00147 mypost = center[name, proc]
00148 mypost.start_timer( )
00149
00150 with center:
00151
00152
00153 while 1:
00154
00155 self.print_( "waiting for job... " )
00156
00157 if center.done or center.error:
00158 self.print_( "Ending work term: done=", center.done, "error=", center.error )
00159 mypost.stop_timer( )
00160 break
00161
00162
00163 if center.has_nothing_todo( name, proc ):
00164 self.thread_state = "waiting for job"
00165 center.wait_for_job( name, proc )
00166 self.thread_state = "finished waiting for job"
00167
00168
00169 elif mypost.has_todo():
00170 self.print_( " ... working" )
00171
00172 job_id = center.get_my_job( name, proc )
00173
00174 self.thread_state = "running job"
00175
00176 center.release( )
00177 self.run_job( job_id )
00178 center.acquire( )
00179
00180 self.thread_state = "finished running job"
00181
00182 center.finished( name, proc, job_id )
00183 self.print_( " ... finished" )
00184 else:
00185 center[name, proc].event.clear()
00186
00187 def run_job( self, job_id ):
00188 '''
00189 run a job
00190 '''
00191
00192 name = self.getName()
00193 proc = self.processor
00194 ri = randint( 1, 5 )
00195
00196 if not isinstance( job_id, tuple ):
00197 raise TypeError( "job_id should be a tuple" )
00198
00199 table = self.env['table']
00200 items = [ table[item] for item in job_id]
00201
00202
00203 job = self.add( items )
00204
00205
00206
00207 with printlock:
00208 log = self.env['record']( 1, 'runner' )
00209 print >> log, '%(name)s-%(proc)d ::' % vars(), job.nice_str()
00210
00211 center = self.env['center']
00212 if self.env['slimvars']['runtype'] == 'dryrun':
00213 pass
00214 else:
00215 num_proc = job.num_proc
00216 if num_proc == 'all':
00217 num_proc = center.semaphore._initial_value
00218
00219 for i in xrange(num_proc):
00220 if center.semaphore._Semaphore__value == 0:
00221 num = num_proc - i
00222 print "waiting for %s more proccesors" %num
00223 if center.error:
00224 return
00225 center.semaphore.acquire( )
00226
00227
00228 job.node_name = self.getName()
00229
00230 for i in xrange(num_proc):
00231 center.semaphore.release( )
00232 job.run()
00233
00234