00001 """
00002 runner for many processors on the same CPU
00003 """
00004 from __future__ import with_statement
00005
00006 __copyright__ = """
00007 Copyright 2008 Sean Ross-Ross
00008 """
00009 __license__ = """
00010 This file is part of SLIMpy .
00011
00012 SLIMpy is free software: you can redistribute it and/or modify
00013 it under the terms of the GNU Lesser General Public License as published by
00014 the Free Software Foundation, either version 3 of the License, or
00015 (at your option) any later version.
00016
00017 SLIMpy is distributed in the hope that it will be useful,
00018 but WITHOUT ANY WARRANTY; without even the implied warranty of
00019 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
00020 GNU Lesser General Public License for more details.
00021
00022 You should have received a copy of the GNU Lesser General Public License
00023 along with SLIMpy . If not, see <http://www.gnu.org/licenses/>.
00024 """
00025
00026
00027 from slimpy_base.Core.Interface.ContainerBase import DataContainer
00028 from slimpy_base.Core.MutiProcessUtils.WorkerThread import Worker
00029 from slimpy_base.Core.Runners.PresidenceStack import presStack
00030 from slimpy_base.Environment.InstanceManager import InstanceManager
00031 from threading import ThreadError
00032 from threading import currentThread
00033 import pdb
00034
00035 class MultiCoreRunner( presStack ):
00036 """
00037 Runs commands from the graph. uses two stacks to keep
00038 track of dependencies and perhaps run several commands
00039 at the same time.
00040 """
00041 env = InstanceManager()
00042
00043
00044 def __init__( self ):
00045
00046 self.created_nodes = set()
00047 self.nodes = None
00048 presStack.__init__( self )
00049
00050 def set_graph(self, graph):
00051 """
00052 runner.set_graph( graph )
00053 sets the graph to graph
00054 """
00055 presStack.set_graph( self, graph )
00056 center = self.env['center']
00057 center.reset()
00058
00059 self.nodes = { }
00060
00061 if len( center.node_names ) == 0:
00062 raise Exception("No nodes to work on")
00063
00064 for nodename in center.node_names:
00065
00066 proc_list = self.nodes.setdefault(nodename,[])
00067 ppn = len(proc_list)
00068
00069 worker = Worker( nodename, self.env.current_env, processor=ppn )
00070 proc_list.append( worker )
00071
00072 center.set_event( )
00073
00074 assert not center.done
00075
00076 def run(self):
00077 """
00078 required by Thread
00079 """
00080 center = self.env['center']
00081
00082 try:
00083 self.main_loop()
00084 except Exception, msg:
00085 center.error = True, msg
00086
00087 self.safe_release_lock()
00088 center.abort_all( )
00089
00090 self.join_all()
00091 raise
00092
00093 self.join_all( )
00094 center.reset()
00095
00096 def join_all(self):
00097 """
00098 Wait for all to finish
00099 """
00100
00101
00102 for node in self.nodes.values():
00103 for processor in node:
00104 if processor.isAlive():
00105 processor.join()
00106 return
00107
00108 def check_if_job_is_cmd( self, job_id ):
00109 """
00110 The job is a command or data
00111 """
00112 if isinstance( job_id, tuple ):
00113 return job_id
00114 else:
00115 self.created_nodes.add( job_id )
00116 self.pop( job_id )
00117 self.env['center'].set_event()
00118 return None
00119
00120 def safe_release_lock(self):
00121
00122 this = currentThread()
00123 rlock = self.env['center'].lock
00124
00125 while rlock._is_owned() and rlock._RLock__owner == this:
00126 rlock.release( )
00127
00128
00129 def __str__(self):
00130 num_nodes = len(self.nodes)
00131 num_proc = sum( map( len, self.nodes.values()) )
00132 ready = self.has_ready()
00133 working = self.num_working()
00134 return ( "<MutiCoreRunner %s nodes, "
00135 "%s processors, "
00136 "%s ready, "
00137 "%s working>"
00138 % (num_nodes, num_proc,ready,working ) )
00139
00140 def has_work_and_worker(self):
00141 rdy = self.has_ready()
00142 idle = self.env['center'].has_idle()
00143 return ( rdy and idle )
00144
00145 def have_no_work_or_workers(self):
00146 nothing_to_do = not self.has_ready()
00147 nothing_to_do |= not self.env['center'].has_idle()
00148 return nothing_to_do
00149
00150
00151 def main_loop(self):
00152
00153
00154 log = self.env['record']
00155 logt = log(10,'thread')
00156
00157 for node in self.nodes.values():
00158
00159 for processor in node:
00160 print >> logt,"starting",node,processor
00161 processor.start()
00162
00163
00164 with self.env['center'] as center:
00165
00166 while self.has_more_jobs():
00167
00168 print >> logt ,self, "has more jobs and is waiting"
00169
00170 if self.have_no_work_or_workers():
00171 center.wait_for_avail( )
00172
00173 if center.error:
00174 raise ThreadError( "Another thread signaled error" )
00175
00176 for job_id in center.dump_finished():
00177 self.pop( job_id )
00178
00179
00180
00181
00182 while self.has_work_and_worker():
00183
00184 job_id, (node, proc) = self.choose( )
00185
00186
00187 if job_id:
00188
00189 self.pull( job_id )
00190
00191
00192 print >> log(10,'thread') ,self, "posting job"
00193 center.post(node, proc, job_id)
00194
00195
00196
00197
00198 print >> log(10,'thread') ,self, "is done!!!!!"
00199
00200 center.done = True
00201
00202
00203 return
00204
00205 @classmethod
00206 def get_src_from_id(cls,job_id):
00207 jlist = [ cls.env['table'][jid] for jid in job_id]
00208
00209 srcs = []
00210 if isinstance( jlist[0], DataContainer ):
00211 srcs.append( jlist[0] )
00212
00213 sc = lambda job: job.source_containers
00214 hc = lambda job: hasattr( job, 'source_containers' )
00215 slist = [ sc(job) for job in jlist if hc(job) and sc(job) ]
00216
00217 map( srcs.extend , slist )
00218
00219 return srcs
00220
00221
00222 def choose(self):
00223
00224 while self.has_ready_data():
00225 data_id = self.pull_data()
00226 self.created_nodes.add( data_id )
00227 self.pop( data_id )
00228
00229
00230 cmds = [ key for key in self.ready.keys() if isinstance( key, tuple) ]
00231
00232 if cmds:
00233 center = self.env['center']
00234 idle = center.idle
00235 nodelist = [ node for node,_ in idle ]
00236 jid, node = self._choose_node_command( cmds, nodelist )
00237 proc = None
00238 for n,p in idle:
00239 if n == node:
00240 proc = p
00241 break
00242 else:
00243 jid, (node, proc) = None,(None,None)
00244
00245 self.env['center'].set_event( )
00246
00247
00248 return jid, (node, proc)
00249
00250 @classmethod
00251 def _choose_node_command(cls, job_ids, nodelist ):
00252 res = []
00253 for job_id in job_ids:
00254 datalist = cls.get_src_from_id(job_id)
00255 r2 = []
00256 for p in nodelist:
00257 hits = len([ 1 for data in datalist if p in data.nodenames])
00258 miss = len( datalist ) - hits
00259 r2.append( (job_id,p,(hits,miss)) )
00260 r2.sort( mycmp )
00261 res.append( r2[0] )
00262
00263 res.sort( mycmp )
00264 return res[0][:2]
00265
00266
00267
00268 def mycmp(x,y):
00269 ( _,_, ( hits1, miss1 ) ) = x
00270 ( _,_, ( hits2, miss2 ) ) = y
00271 cmp_miss = cmp( miss1, miss2 )
00272 if cmp_miss == 0:
00273 return cmp( hits2, hits1 )
00274 else:
00275 return cmp_miss
00276