00001 ''' 00002 posting class for worker threads to get info about new jobs 00003 ''' 00004 00005 00006 __copyright__ = """ 00007 Copyright 2008 Sean Ross-Ross 00008 """ 00009 __license__ = """ 00010 This file is part of SLIMpy . 00011 00012 SLIMpy is free software: you can redistribute it and/or modify 00013 it under the terms of the GNU Lesser General Public License as published by 00014 the Free Software Foundation, either version 3 of the License, or 00015 (at your option) any later version. 00016 00017 SLIMpy is distributed in the hope that it will be useful, 00018 but WITHOUT ANY WARRANTY; without even the implied warranty of 00019 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00020 GNU Lesser General Public License for more details. 00021 00022 You should have received a copy of the GNU Lesser General Public License 00023 along with SLIMpy . If not, see <http://www.gnu.org/licenses/>. 00024 """ 00025 00026 from threading import RLock,Event 00027 from slimpy_base.Core.MutiProcessUtils.JobRecord import JobRecord 00028 from slimpy_base.Environment.InstanceManager import InstanceManager 00029 import time 00030 00031 class JobPosting( object ): 00032 ''' 00033 posting class for worker threads to get info about new jobs 00034 Like a message board. 00035 ''' 00036 env = InstanceManager() 00037 00038 def __init__( self, name, processor ): 00039 00040 self.lock = RLock() 00041 self.event = Event() 00042 self.lock.acquire() 00043 00044 self.name = name 00045 self.processor = processor 00046 self.finished_jobs = set() 00047 self.current = set() 00048 self.todo = set() 00049 self._is_waiting = False 00050 00051 self._first_wait = True 00052 self.start_time = 0 00053 self.stop_time = 0 00054 self.current_rec = None 00055 self.finished_rec = set() 00056 00057 self.lock.release() 00058 00059 def _get_waiting(self): 00060 'true if worker is waiting for a job' 00061 return self._is_waiting 00062 00063 def _set_waiting(self,val): 00064 self._is_waiting = val 00065 00066 waiting = property( _get_waiting, _set_waiting ) 00067 00068 def get_time_since_start(self): 00069 'get the time from the start of the job' 00070 if self.current_rec is None: 00071 return 0 00072 else: 00073 return time.time() - self.current_rec.created 00074 00075 time_passed = property( get_time_since_start ) 00076 00077 def notify(self): 00078 'notify the listening worker' 00079 self.event.set() 00080 00081 def new_todo( self, job ): 00082 'add a job to the message board' 00083 self.lock.acquire() 00084 self.todo.add( job ) 00085 self.lock.release() 00086 00087 def get( self ): 00088 'get a job from the todo pile' 00089 self.lock.acquire() 00090 job = self.todo.pop() 00091 self.current.add( job ) 00092 self.current_rec = JobRecord( job ,self.name, self.processor) 00093 self.current_rec.start() 00094 00095 self.env['record'].add_job( self.current_rec ) 00096 00097 self.lock.release() 00098 return job 00099 00100 def finished( self, job ): 00101 ''' 00102 adds current job to the finished pile 00103 00104 ''' 00105 self.lock.acquire() 00106 self.current.remove( job ) 00107 self.finished_jobs.add( str(job) ) 00108 00109 rec = self.current_rec 00110 rec.stop() 00111 self.finished_rec.add( rec ) 00112 print >> self.env['record'](10,'time'), "Command execution time: %0.4f seconds" %(rec.finished-rec.created) 00113 self.current_rec = None 00114 00115 self.lock.release() 00116 00117 def is_working( self ): 00118 'true if the worker is working on a job' 00119 return len( self.current ) 00120 00121 def has_todo( self ): 00122 'returns the len of the todo pile' 00123 return len( self.todo ) 00124 00125 def busy( self ): 00126 'true if job is working or has stuff todo' 00127 self.lock.acquire() 00128 ret = self.is_working() or self.has_todo() 00129 self.lock.release() 00130 return ret 00131 00132 def post(self,job): 00133 'post a job to the todo' 00134 self.lock.acquire() 00135 00136 self.todo.add( job ) 00137 self.event.set() 00138 00139 self.lock.release() 00140 00141 def start_timer(self): 00142 'starts timer ' 00143 self.start_time = time.time() 00144 00145 def stop_timer(self): 00146 'stops timer ' 00147 self.stop_time = time.time() 00148 00149 def wait_for_job(self): 00150 "wait for slef.notify to be called" 00151 self.event.wait() 00152 self.event.clear() 00153 return 00154 00155 def acquire(self): 00156 self.lock.acquire() 00157 00158 def release(self): 00159 self.lock.release() 00160 00161 def total_time(self): 00162 "total time between start and stop" 00163 return self.stop_time - self.start_time 00164 00165 def time_idol(self): 00166 'total time spent idle' 00167 tot = self.total_time() 00168 for jobrecord in self.finished_rec: 00169 tot -= jobrecord.total_time() 00170 return tot 00171