00001 __copyright__ = """
00002 Copyright 2008 Sean Ross-Ross
00003 """
00004 __license__ = """
00005 This file is part of SLIMpy .
00006
00007 SLIMpy is free software: you can redistribute it and/or modify
00008 it under the terms of the GNU Lesser General Public License as published by
00009 the Free Software Foundation, either version 3 of the License, or
00010 (at your option) any later version.
00011
00012 SLIMpy is distributed in the hope that it will be useful,
00013 but WITHOUT ANY WARRANTY; without even the implied warranty of
00014 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
00015 GNU Lesser General Public License for more details.
00016
00017 You should have received a copy of the GNU Lesser General Public License
00018 along with SLIMpy . If not, see <http://www.gnu.org/licenses/>.
00019 """
00020
00021
00022
00023 from subprocess import Popen, PIPE as __PIPE__
00024 from slimpy_base.Core.Command.Drivers.Unix_Pipes import Unix_Pipes
00025 from os.path import join
00026 from pdb import set_trace
00027 import re
00028
00029
00030
00031
00032 from slimpy_base.Environment.InstanceManager import InstanceManager
00033
00034 SCALAR = re.compile( r'(\$\{SCALAR\[\d*\]\})' ).findall
00035
00036 class RemoteDriver( object ):
00037 """
00038 used to chain ooc_driver commands together
00039 """
00040 env = InstanceManager()
00041 _stderr_logfile = "stderr_logfile.log"
00042
00043 def _get_stderr_logfile(self):
00044 return join( self.env['slimvars']['localtmpdir'], self._stderr_logfile )
00045
00046 stderr_logfile = property( _get_stderr_logfile )
00047
00048 def __init__( self ):
00049 self.__cmnd = []
00050 self.__Target = None
00051 self.__Source = None
00052 self._node_name = None
00053 return
00054
00055 def get_num_proc( self ):
00056
00057 numbers = [com.get_nub_proc() for com in self.getCmnd() ]
00058
00059 return max(numbers)
00060
00061
00062 def getCmnd( self ):
00063 return self.__cmnd
00064
00065 def get_command_list(self):
00066 return self.__cmnd
00067
00068 def addCommand( self, command ):
00069 self.setTarget( None )
00070 return self.__cmnd.append( command )
00071
00072 def is_muti_proc(self):
00073 for cmd in self.get_command_list():
00074 if cmd.command_type == 'multi_process':
00075 return True
00076 return False
00077
00078 def setSource( self, src ):
00079 assert not self.getCmnd(), "command already has attributes to it, cannot add a source"
00080 self.__Source = src
00081 return
00082
00083 def getSource( self ):
00084 return self.__Source
00085
00086
00087 def setTarget( self, tgt ):
00088 self.__Target = tgt
00089 return
00090 def getTarget( self ):
00091 return self.__Target
00092
00093 def get_node_name(self):
00094 return self._node_name
00095
00096 def set_node_name(self, val):
00097 self._node_name = val
00098 for cmd in self.get_command_list():
00099 cmd.node_name = val
00100
00101 source = property(getSource, setSource )
00102 target = property(getTarget, setTarget )
00103 node_name = property(get_node_name,set_node_name )
00104
00105 def get_targets(self):
00106 tgts = set()
00107 if self.target:
00108 tgts.add(self.target)
00109
00110 for com in self.__cmnd:
00111 tgts.update( com._get_target_cont() )
00112 return tgts
00113
00114 def make_locals_list(self):
00115 lst = []
00116 push = lst.append
00117
00118 for com in self.get_command_list():
00119 push(None)
00120 for target in com._get_target_cont():
00121 if target.is_global:
00122 lst[-1] = False
00123 break
00124 else:
00125 lst[-1] = True
00126
00127 if lst[-1] is False:
00128 pass
00129 elif self.target:
00130
00131 is_local = bool( self.node_name != 'localhost' )
00132 lst[-1] = is_local
00133
00134 return lst
00135
00136 def make_tmp_list(self):
00137
00138 lst = []
00139 push = lst.append
00140
00141 for com in self.__cmnd:
00142 push(None)
00143 for target in com._get_target_cont():
00144 if target.istmp():
00145 lst[-1] = True
00146 break
00147 else:
00148 lst[-1] = False
00149
00150 if self.target:
00151 lst[-1] = self.target.istmp()
00152
00153 return lst
00154
00155 def get_sources(self):
00156 srcs = set()
00157 if self.source:
00158 srcs.add(self.source)
00159 for com in self.__cmnd:
00160 srcs.update( com._get_source_cont() )
00161 return srcs
00162
00163
00164 def __str__( self ):
00165
00166 format = [com.func.format( self.node_name,com.params, com.kparams ) for com in self.getCmnd()]
00167
00168 command = " | ".join( format )
00169 if self.source:
00170 command = "< %s %s" %(self.source, command)
00171 if self.target:
00172 command = "%s > %s" %(command, self.target)
00173
00174 return command
00175
00176
00177 def __call__( self, *params, **kparams ):
00178 """
00179 runs driver with pipe. uses the list of
00180 """
00181 return self.run()
00182
00183 def run(self):
00184
00185 if self.node_name is None:
00186 raise Exception( "job host not set" )
00187
00188 log = self.env['record'](5,'cmd','err')
00189
00190
00191
00192 try:
00193 self._run()
00194 except IOError:
00195
00196 print >> log, "command failed, running diagnostic"
00197 if self.env['slimvars']['run_again']:
00198 if self.diagnose():
00199 print >> log, "All diagnostics passed: running command again"
00200 self._run()
00201 else:
00202 print >> log, "diagnostic failed: raising exception"
00203 raise
00204 else:
00205 raise
00206 return
00207
00208 def diagnose(self):
00209
00210 for source in self.get_sources():
00211 if not source.diagnostic( ):
00212
00213 return False
00214
00215 return True
00216
00217 def add_node_name_to_targets( self ):
00218
00219 if self.target:
00220 if self.target.is_local:
00221 self.target.add_node_name( self.node_name )
00222 else:
00223 self.target.add_node_name( 'localhost' )
00224
00225 for cmd in self.get_command_list():
00226 cmd.add_node_name_to_targets( )
00227
00228 return
00229
00230 def copy_sources(self):
00231 """
00232 recursively copy source nodes if necessary
00233 """
00234
00235 if self.source and self.source.is_local:
00236 if self.is_muti_proc():
00237 self.source.node_copy( 'localhost' )
00238 else:
00239 self.source.node_copy( self.node_name )
00240
00241 for cmd in self.get_command_list():
00242 cmd.copy_sources( )
00243
00244
00245 def _run(self):
00246 center = self.env['center']
00247
00248 self.copy_sources( )
00249
00250 center.acquire()
00251 cmd = self.pipe_format()
00252
00253 log = self.env['record']
00254 print >> log(5,'cmd'), cmd
00255 p3 = Popen(cmd , stderr=__PIPE__ ,shell=True)
00256 pid = p3.pid
00257 center.add_pid( pid )
00258
00259 center.release()
00260
00261 try:
00262 err = p3.wait()
00263 print >> log(5), "finished::", self.node_name
00264
00265 except OSError, e:
00266 if e.errno is 10:
00267 err = p3.returncode
00268 else:
00269 raise
00270
00271 center.acquire()
00272
00273 self.add_node_name_to_targets()
00274
00275 center.remove_pid(pid)
00276 lines = p3.stderr.readlines()
00277 last_line = "".join(lines)
00278
00279 if err:
00280
00281 node = self.node_name
00282 center.release()
00283 raise IOError( err, "process %(pid)s on node %(node)s, returned %(err)s: \n%(last_line)s\nCommand:%(cmd)s" % vars() )
00284 else:
00285 center.release()
00286
00287 if last_line:
00288 print >> log(5,'cmd','err'), last_line
00289
00290 p3.stderr.close()
00291 return
00292
00293 def pipe_format(self):
00294
00295
00296
00297 clist = self.format()
00298
00299
00300 is_local = self.make_locals_list()
00301 is_tmp = self.make_tmp_list()
00302
00303 if self.source is None:
00304 clist.insert(0,'true')
00305 is_local.insert( 0 , None )
00306 is_tmp.insert( 0 , None )
00307
00308 if self.is_muti_proc():
00309 nodename = 'localhost'
00310 else:
00311 nodename = self.node_name
00312
00313 source = self.source
00314 if source:
00315 source = source.get_data( self.node_name )
00316
00317 target = self.target
00318 if target:
00319 target = target.get_data( self.node_name )
00320
00321 pipecommand = Unix_Pipes.CreateComand( clist,
00322 node_name = nodename,
00323 source = source,
00324 target = target,
00325 is_local=is_local,
00326 is_tmp=is_tmp,
00327 )
00328
00329 all_scalars = SCALAR( pipecommand )
00330 scalars_map = self.env['table'].scalars_map
00331 for scal in all_scalars:
00332 scalar_value = str(scalars_map[scal])
00333 pipecommand = pipecommand.replace(scal, scalar_value )
00334
00335 return pipecommand
00336
00337
00338 def set_work_node(self , node_name ):
00339 self.node_name = node_name
00340
00341 def format( self ):
00342 """
00343 creates a dictionary with the key "cmnd"
00344 which is a string of all the
00345 """
00346
00347 cmnd = [com.func.format( self.node_name, *com.do_runtime_map() ) for com in self.getCmnd()]
00348
00349 return cmnd