00001 '''
00002 posting class for worker threads to get info about new jobs
00003 '''
00004
00005
00006 __copyright__ = """
00007 Copyright 2008 Sean Ross-Ross
00008 """
00009 __license__ = """
00010 This file is part of SLIMpy .
00011
00012 SLIMpy is free software: you can redistribute it and/or modify
00013 it under the terms of the GNU Lesser General Public License as published by
00014 the Free Software Foundation, either version 3 of the License, or
00015 (at your option) any later version.
00016
00017 SLIMpy is distributed in the hope that it will be useful,
00018 but WITHOUT ANY WARRANTY; without even the implied warranty of
00019 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
00020 GNU Lesser General Public License for more details.
00021
00022 You should have received a copy of the GNU Lesser General Public License
00023 along with SLIMpy . If not, see <http://www.gnu.org/licenses/>.
00024 """
00025
00026 from threading import RLock,Event
00027 from slimpy_base.Core.MutiProcessUtils.JobRecord import JobRecord
00028 from slimpy_base.Environment.InstanceManager import InstanceManager
00029 import time
00030
00031 class JobPosting( object ):
00032 '''
00033 posting class for worker threads to get info about new jobs
00034 Like a message board.
00035 '''
00036 env = InstanceManager()
00037
00038 def __init__( self, name, processor ):
00039
00040 self.lock = RLock()
00041 self.event = Event()
00042 self.lock.acquire()
00043
00044 self.name = name
00045 self.processor = processor
00046 self.finished_jobs = set()
00047 self.current = set()
00048 self.todo = set()
00049 self._is_waiting = False
00050
00051 self._first_wait = True
00052 self.start_time = 0
00053 self.stop_time = 0
00054 self.current_rec = None
00055 self.finished_rec = set()
00056
00057 self.lock.release()
00058
00059 def _get_waiting(self):
00060 'true if worker is waiting for a job'
00061 return self._is_waiting
00062
00063 def _set_waiting(self,val):
00064 self._is_waiting = val
00065
00066 waiting = property( _get_waiting, _set_waiting )
00067
00068 def get_time_since_start(self):
00069 'get the time from the start of the job'
00070 if self.current_rec is None:
00071 return 0
00072 else:
00073 return time.time() - self.current_rec.created
00074
00075 time_passed = property( get_time_since_start )
00076
00077 def notify(self):
00078 'notify the listening worker'
00079 self.event.set()
00080
00081 def new_todo( self, job ):
00082 'add a job to the message board'
00083 self.lock.acquire()
00084 self.todo.add( job )
00085 self.lock.release()
00086
00087 def get( self ):
00088 'get a job from the todo pile'
00089 self.lock.acquire()
00090 job = self.todo.pop()
00091 self.current.add( job )
00092 self.current_rec = JobRecord( job ,self.name, self.processor)
00093 self.current_rec.start()
00094
00095 self.env['record'].add_job( self.current_rec )
00096
00097 self.lock.release()
00098 return job
00099
00100 def finished( self, job ):
00101 '''
00102 adds current job to the finished pile
00103
00104 '''
00105 self.lock.acquire()
00106 self.current.remove( job )
00107 self.finished_jobs.add( str(job) )
00108
00109 rec = self.current_rec
00110 rec.stop()
00111 self.finished_rec.add( rec )
00112 print >> self.env['record'](10,'time'), "Command execution time: %0.4f seconds" %(rec.finished-rec.created)
00113 self.current_rec = None
00114
00115 self.lock.release()
00116
00117 def is_working( self ):
00118 'true if the worker is working on a job'
00119 return len( self.current )
00120
00121 def has_todo( self ):
00122 'returns the len of the todo pile'
00123 return len( self.todo )
00124
00125 def busy( self ):
00126 'true if job is working or has stuff todo'
00127 self.lock.acquire()
00128 ret = self.is_working() or self.has_todo()
00129 self.lock.release()
00130 return ret
00131
00132 def post(self,job):
00133 'post a job to the todo'
00134 self.lock.acquire()
00135
00136 self.todo.add( job )
00137 self.event.set()
00138
00139 self.lock.release()
00140
00141 def start_timer(self):
00142 'starts timer '
00143 self.start_time = time.time()
00144
00145 def stop_timer(self):
00146 'stops timer '
00147 self.stop_time = time.time()
00148
00149 def wait_for_job(self):
00150 "wait for slef.notify to be called"
00151 self.event.wait()
00152 self.event.clear()
00153 return
00154
00155 def acquire(self):
00156 self.lock.acquire()
00157
00158 def release(self):
00159 self.lock.release()
00160
00161 def total_time(self):
00162 "total time between start and stop"
00163 return self.stop_time - self.start_time
00164
00165 def time_idol(self):
00166 'total time spent idle'
00167 tot = self.total_time()
00168 for jobrecord in self.finished_rec:
00169 tot -= jobrecord.total_time()
00170 return tot
00171