00001 """
00002 Vector Interface to RSF.
00003 """
00004
00005 __copyright__ = """
00006 Copyright 2008 Sean Ross-Ross
00007 """
00008 __license__ = """
00009 This file is part of SLIMpy .
00010
00011 SLIMpy is free software: you can redistribute it and/or modify
00012 it under the terms of the GNU Lesser General Public License as published by
00013 the Free Software Foundation, either version 3 of the License, or
00014 (at your option) any later version.
00015
00016 SLIMpy is distributed in the hope that it will be useful,
00017 but WITHOUT ANY WARRANTY; without even the implied warranty of
00018 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
00019 GNU Lesser General Public License for more details.
00020
00021 You should have received a copy of the GNU Lesser General Public License
00022 along with SLIMpy . If not, see <http://www.gnu.org/licenses/>.
00023 """
00024
00025 from pdb import set_trace
00026 from slimpy_base.Core.Command.Drivers.Unix_Pipes import gethostname
00027
00028 from slimpy_base.Core.Command.Drivers.Unix_Pipes import Unix_Pipes, is_localhost
00029 from slimpy_base.Core.Interface.ContainerBase import DataContainer
00030 from slimpy_base.Environment.InstanceManager import InstanceManager
00031 from numpy import int32, float32, float64, complex64
00032 from numpy import product, fromfile
00033 from os import tempnam, system, getpid
00034 from os.path import isfile, join, dirname, abspath, basename, exists, split
00035 from rsfScalar import Scalar as ScalarMethods
00036 from sfCommandFactory import rsfCommandFactory
00037 from string import Template
00038 from subprocess import Popen, PIPE as __PIPE__, STDOUT as _STDOUT
00039 from sys import modules
00040
00041
00042
00043
00044
00045 class rsf_data_container( DataContainer ):
00046 """
00047 rsf_data_container - to keep track of "out of core" vectors
00048 corresponding binary files on disk.
00049 """
00050 suffix = ".rsf"
00051 psuffix = ".vpl"
00052 name = "rsf"
00053
00054
00055
00056 env = InstanceManager()
00057
00058 COUNT = 0
00059 sfFactory = rsfCommandFactory()
00060
00061
00062 _scalar_methods = ScalarMethods
00063
00064
00065 @classmethod
00066 def isCompatibleWith( cls, obj ):
00067 '''
00068 statict method to determine if 'obj' is an rsf file
00069 @param obj:
00070 @type obj: any object that would be
00071 contained in a datacontainer class
00072 '''
00073
00074 obj = str(obj)
00075
00076 if obj.endswith( cls.suffix ):
00077 if not isfile( obj ):
00078 raise Exception, "the file %s can not be found" %( obj )
00079 return True
00080 if isfile( obj+cls.suffix ):
00081 return True
00082
00083 return False
00084
00085 def __init__( self , data=None , parameters=None, command=None , tmp=None, nodenames=None, target_node=None):
00086
00087
00088 self.SFRM = join( self.env['slimvars']['RSFBIN'], 'sfrm' )
00089 self.SFGREY = join( self.env['slimvars']['RSFBIN'], 'sfgrey' )
00090 self.SFMV = join( self.env['slimvars']['RSFBIN'], 'sfmv' )
00091 self.XTPEN = join( self.env['slimvars']['RSFBIN'], 'xtpen' )
00092 self.SFATTR = join( self.env['slimvars']['RSFBIN'], 'sfattr' )
00093 self.SFDD = join( self.env['slimvars']['RSFBIN'], 'sfdd' )
00094
00095 self.PLOTCMD = Template( "< ${data} ${plotcmd} | ${viewcmd} &" )
00096 self.MVCMD = Template( "${mv} ${From} ${To}" )
00097 self.RMCMD = Template( "${rm} ${file}" )
00098
00099 self.ATTRMD = Template( "${attr} < ${file} want=${want} lval=${lval} " )
00100
00101
00102 node_info = None
00103
00104 if data is not None:
00105 data = str(data)
00106 data = abspath( data )
00107 if not data.endswith( self.suffix ):
00108 data = data + self.suffix
00109
00110 dname = dirname( data )
00111 data = basename(data)
00112 node_info = dict( location=dname )
00113
00114 DataContainer.__init__( self, data=data,
00115 parameters=parameters ,
00116 command=command ,tmp=tmp,
00117 nodenames=nodenames,
00118 node_info=node_info,
00119 target_node=target_node )
00120
00121
00122
00123 def __setitem__( self, item, value ):
00124 """
00125 """
00126 raise NotImplementedError
00127
00128
00129
00130 def makeparams( self ):
00131 '''
00132 returns a param object created from the data contained
00133 ie. the rsf header info
00134 '''
00135 par = DataContainer.makeparams( self )
00136
00137
00138
00139 try:
00140 format, data_type = par['data_format'].split( '_' )
00141 except KeyError:
00142 print self, 'has no key "data_format"'
00143 raise
00144
00145 par['data_type'] = data_type
00146
00147 return par
00148
00149
00150 def genname_helper(self, string):
00151 """
00152 formatting and better output files, no more tempname just 'slim.%prog.%cmd.%id.rsf'
00153 where %prog is the main programs name and %cmd is the last command
00154 on the pipe that created the data and
00155 %id is a unique incremented identifier
00156 """
00157 main_file = "ineractive"
00158
00159 main = modules['__main__']
00160 if hasattr(main, '__file__'):
00161 main_file = main.__file__
00162
00163 prog = basename( main_file )[:-3]
00164
00165 join_dot = lambda *args:".".join(args)
00166
00167 pid = str(getpid())
00168 cur_env = self.env.current_env
00169
00170 make = lambda fcn, cnt: join_dot( 'slim',
00171 pid, cur_env,
00172 prog[:6],
00173 fcn[:5],
00174 "%.5d" % cnt) + self.suffix
00175
00176 if exists(string):
00177 string = split(string)[1]
00178 self.__class__.COUNT += 1
00179 filename = make( string, self.COUNT )
00180 while exists( filename ):
00181 self.__class__.COUNT +=1
00182 filename = make(string, self.COUNT )
00183 return filename
00184
00185
00186 def genName( self, command=None):
00187 """
00188 generate a random name if command is given
00189 then generate a unique but formatted name
00190 'slim.%prog.%cmd.%id.rsf'
00191 """
00192 if command is not None and hasattr(command, 'tag') and isinstance(command.tag, str):
00193 filename = self.genname_helper(command.tag)
00194 elif command is not None and isinstance(command, str):
00195 filename = self.genname_helper( command )
00196 else:
00197
00198 td = self.get_tmp_dir()
00199 filename = tempnam( td ) + self.suffix
00200
00201 return filename
00202
00203 def get_tmp_dir(self):
00204 if self.is_local:
00205 return self.env['slimvars']['localtmpdir']
00206 else:
00207 return self.env['slimvars']['globaltmpdir']
00208
00209
00210
00211
00212 def isempty( self ):
00213 """
00214 check if this data has bee built
00215 """
00216 local_data = self.get_data('localhost')
00217 if isfile( local_data ):
00218 return False
00219
00220 if self.data:
00221 if 'localhost' in self.nodenames:
00222 return not isfile( local_data )
00223 else:
00224 return not len( self.nodenames )
00225
00226
00227
00228
00229
00230
00231
00232
00233
00234 @classmethod
00235 def get_converter(self ,command ):
00236
00237 if isinstance(command, str ):
00238 tag = command
00239 else:
00240 tag = command.tag
00241 return self.sfFactory[tag]
00242
00243 def __str__( self ):
00244 """Adds the current lib's suffix to the end of filename
00245 note: if no lib is set then self.plugin.suffix returns ''
00246 """
00247 if self.data != None:
00248 return str( self.data )
00249 else:
00250 return "None"
00251
00252 def __repr__( self ):
00253 return str( self )
00254
00255 def path( self ):
00256 """
00257 returns the absolute pathname to the file
00258 """
00259 pathstr = abspath( str( self ) )
00260 pathstr = dirname( pathstr )
00261 return pathstr
00262
00263 def getName( self ):
00264 'returns the name of the data contained'
00265 return self.data
00266
00267
00268 def plot( self, command=None, plotcmd=None ):
00269 """
00270 plot returns the path the the plotfile
00271 """
00272
00273 if command is None:
00274
00275 if plotcmd is None:
00276 plotcmd = self.SFGREY
00277
00278 command = self.PLOTCMD.substitute( data=self.local_data,
00279 plotcmd=plotcmd,
00280 viewcmd=self.XTPEN )
00281
00282
00283 print >> self.env['record']( 1, 'plot' ), command
00284 system( command )
00285
00286 return None
00287
00288 def _get_local_data(self):
00289 return self.get_data("localhost" )
00290
00291 local_data = property( _get_local_data )
00292
00293 def _get_any_node(self):
00294 if 'localhost' in self.nodenames or not self.nodenames:
00295 node = 'localhost'
00296 else:
00297 node = list(self.nodenames)[0]
00298
00299 return node, self.get_data(node)
00300
00301
00302 def setName( self, newname, path=None ):
00303 """wrapped by SLIMpy.serial_vector.setname"""
00304 newname = str(newname)
00305
00306 if newname.endswith( self.suffix ):
00307 newname = newname
00308 else:
00309 newname = newname + self.suffix
00310
00311 if not path:
00312 newname = abspath(newname)
00313 path = dirname( newname )
00314 newname = basename( newname )
00315
00316 if self.isfull():
00317 self.move( newname )
00318
00319 self.updateData( newname )
00320 ninfo =self._node_info.setdefault( 'localhost', {} )
00321 self.add_node_name( 'localhost' )
00322 ninfo['location'] = path
00323
00324 self.tmp( False )
00325
00326
00327 def updateData( self, newname ):
00328 '''
00329 @param newname: string to rename data to
00330 @type newname:
00331 '''
00332 self.data = newname
00333
00334 def getHeader( self ):
00335 """
00336 return open header, data must exist
00337 """
00338
00339 node,data = self._get_any_node()
00340 if node == 'localhost':
00341 return open( data )
00342
00343 else:
00344 cat = Unix_Pipes.CreateComand( ['cat'], node_name=node, source=data )
00345 p0 = Popen( cat, shell=True, stdout=__PIPE__ )
00346
00347 return p0.stdout
00348
00349 def move( self, newname ):
00350 "move data file on disk"
00351
00352 mv = self.MVCMD.substitute( mv=self.SFMV, From=self.local_data, To=abspath( newname ) )
00353
00354 err = system( mv )
00355 if err is not 0:
00356 raise Exception( "commmand %(mv)s failed " %vars() )
00357
00358 def rm( self ):
00359 """
00360 removes the file on disc
00361 """
00362
00363 print >> self.env['record']( 10, 'cleaner' ), "call to remove %s:" %self.data
00364 print >> self.env['record']( 11, 'cleaner' ), "\tistmp=%s , isfull=%s" %( self.istmp() , self.isfull() )
00365
00366 if not self.istmp():
00367 return False
00368 if self.isempty():
00369 return False
00370
00371 err = 0
00372 cmd_log1 = self.env['record'](1,'cmd')
00373
00374
00375 synccmd = self.env['slimvars']['SYNC_CMD']
00376 do_sync = self.env['slimvars']['sync_disk_om_rm']
00377
00378 def sync(synccmd):
00379
00380 sync_command = Unix_Pipes.CreateComand([synccmd], node)
00381 print >> cmd_log1 , sync_command
00382 p0 = Popen( sync_command, shell=True, stderr=__PIPE__)
00383 ret = p0.wait( )
00384 if ret:
00385 print >> self.env['record'] , 'error running %(sync_command)s ' %vars()
00386 print >> self.env['record'] , 'try running SLIMpy with "sync_disk_om_rm=False" ' %vars()
00387
00388 for node in self.nodenames.copy():
00389
00390 data = self.get_data( node )
00391 sfrm = self.RMCMD.substitute( rm=self.SFRM, file=data )
00392 rm = self.RMCMD.substitute( rm='rm', file=data )
00393 if do_sync:
00394 sync(synccmd)
00395
00396 command = Unix_Pipes.CreateComand([sfrm], node)
00397 print >> cmd_log1 , command
00398 p0 = Popen( command, shell=True, stderr=__PIPE__)
00399 ret = p0.wait()
00400 print >> self.env['record'](2) , "finished::",node,'rm'
00401 if ret:
00402 err += 1
00403 msg = p0.stderr.read( )
00404 p0.stderr.close()
00405 print >> self.env['record'] , 'error %(ret)s on %(command)s: removeing header file:\n%(msg)s' %vars()
00406 command = Unix_Pipes.CreateComand([rm], node)
00407 print >> cmd_log1 , command
00408 p0 = Popen( command, shell=True, stderr=__PIPE__)
00409 ret = p0.wait()
00410 if ret:
00411 msg = p0.stderr.read()
00412 print >> cmd_log1 ,"could not 'sfrm' or rm 'data'\n%(msg)s" % vars()
00413 else:
00414 self.nodenames.remove(node)
00415 else:
00416 self.nodenames.remove(node)
00417
00418 p0.stderr.close()
00419 return not err
00420
00421 def node_copy(self, node_name):
00422 """
00423 copy from one node to another
00424 """
00425 if self.is_global or self.isempty():
00426 return
00427 if node_name in self.nodenames:
00428 return
00429
00430
00431
00432 tmddir = self.env['slimvars']['localtmpdir']
00433 from_cmd = "%s form=xdr" % self.SFDD
00434 to_cmd = "%s form=native" % self.SFDD
00435
00436 from_node = list(self.nodenames)[0]
00437
00438 from_data = self.get_data( from_node )
00439 to_data = self.get_data( node_name )
00440
00441 is_tmp = self.istmp()
00442 to_cmd = Unix_Pipes.CreateComand([to_cmd], node_name=node_name,
00443 target=to_data ,is_local=[True],
00444 is_tmp=[is_tmp] ,ssh_localhost=True)
00445 frm_node = list(self.nodenames)[0]
00446
00447
00448 self.add_node_name( node_name )
00449
00450 cmd = Unix_Pipes.CreateComand( [from_cmd,to_cmd],
00451 is_local=[None,None],
00452 is_tmp=[is_tmp,is_tmp],
00453 node_name=frm_node,
00454 source=from_data,
00455 ssh_localhost=False)
00456
00457 print >> self.env['record'](1,'cmd'), cmd
00458 assert frm_node != node_name, "can not copy from same node to same node"
00459 p0 = Popen(cmd, shell=True, stderr=__PIPE__ )
00460 p0.wait()
00461 print >> self.env['record'](2,'cmd'), "finished:: cp",node_name
00462 if p0.returncode:
00463 out = p0.stderr.read()
00464 raise IOError("copying from node to node retuned error %s\n\n%s" %(p0.returncode,out) )
00465
00466
00467 def available_from( self, nodename ):
00468 if nodename in self.nodenames:
00469 return True
00470 elif 'localhost' in self.nodenames:
00471 return True
00472 return False
00473
00474
00475 def readattr( self ):
00476 'returns rsf header info as a dict object'
00477 header = self.getHeader()
00478 lines = header.readlines()
00479
00480 header.close()
00481
00482 file_dict = {}
00483 for line in lines:
00484 if '=' in line:
00485 line = line[1:-1]
00486 line = line.split( '=' )
00487 try:
00488 line[1] = eval(line[1])
00489 except:
00490 pass
00491 file_dict[line[0]] = line[1]
00492
00493 self.BinFile = file_dict.pop( 'in' )
00494
00495
00496
00497
00498
00499 return file_dict
00500
00501 def readbin( self, start=0, len=-1 ):
00502 '''
00503 @precondition: data must exist.
00504 @return: a numpy array.
00505
00506 @param start: element in data to start from.
00507 @type start: int
00508
00509 @param: len: lenght of data to read into array
00510 if -1 reads until the end of data.
00511 @type len:int
00512 '''
00513
00514 l = lambda x : ( ( x == 'native_int' and int32 ) or
00515 ( x == 'native_float' and float32 ) or
00516 ( x == 'native_double' and float64 ) or
00517 ( x == 'native_complex' and complex64 ) )
00518
00519 file_dict = self.readattr()
00520 typeflag = l( file_dict['data_format'] )
00521
00522
00523 fd = open( self.BinFile )
00524 fd.seek( start*typeflag().nbytes )
00525
00526 return fromfile( fd, typeflag, len )
00527
00528 def writebin( self, data, start=0, size=0 ):
00529 """
00530 write data to file
00531 not tested
00532 """
00533 header = self.path()
00534 start = start*4
00535
00536 vector_dict, file_dict = self.readattr( header )
00537 if size == 0:
00538 size = product( vector_dict['dim'] )
00539 binfile = file_dict['in']
00540 fd = open( binfile, 'r+' )
00541
00542 fd.seek( start )
00543 data[:, :size].tofile( fd )
00544 fd.close()
00545
00546 def book( self, header ):
00547 'not tested'
00548 header = str( header )
00549 f = open( header )
00550 L = f.read()
00551 return L
00552
00553 def writeattr( self, header, vector_dict, file_dict ):
00554 'not tested'
00555 file_dict = dict( file_dict )
00556 if file_dict.has_key( 'in' ):
00557 del file_dict['in']
00558 if file_dict.has_key( 'data_format' ):
00559 del file_dict['data_format']
00560 for i in range( 1, len( vector_dict['dim'] )+1 ):
00561 file_dict['n%s'%i] = vector_dict['dim'][i-1]
00562 fd=self.readattr( header )
00563 for item in fd.items():
00564 try:
00565 if file_dict[item[0]] == item[1]:
00566 del file_dict[item[0]]
00567 except:
00568 pass
00569 book=''
00570 for item in file_dict.items():
00571 book += "\t%s=%s\n" %( item[0], item[1] )
00572 fd = open( header, 'r+' )
00573 fd.seek( 0, 2 )
00574 fd.write( book )
00575 fd.close()
00576
00577 def write( self, header, data, book ):
00578 'not tested'
00579 header = str( header )
00580 fd = open( header, 'w' )
00581 fd.write( book )
00582 fd.close()
00583 dict = self.readattr( header )
00584 fd = open( dict['in'], 'w' )
00585 fd.close()
00586 self.writebin( header, data )
00587
00588 def get_data( self, nodename ):
00589
00590 data = self.data
00591 if self.is_global:
00592 loc = self.get_data_loc( 'localhost' )
00593 else:
00594 loc = self.get_data_loc( nodename )
00595
00596 return join( loc, data )
00597
00598
00599 def get_data_loc(self, nodename):
00600
00601 if nodename == 'localhost':
00602 path = self.env['slimvars']['globaltmpdir']
00603 else:
00604 path = self.env['slimvars']['localtmpdir']
00605
00606 node_info = self._node_info.setdefault(nodename,{})
00607 path = node_info.setdefault( 'location',path )
00608
00609 return path
00610
00611 def diagnostic(self, nodename=None ):
00612 """
00613 run a check if this data is valid
00614 """
00615 log = self.env['record'](1,'cmd','err')
00616 log2 = self.env['record'](2,'diagnostic')
00617 print >> log
00618 print >> log, "Runnind diagnostic on data %s" %self
00619 print >> log
00620
00621 acmd = self.ATTRMD.substitute( attr=self.SFATTR,
00622 file=str(self),
00623 want='all',
00624 lval=2 )
00625
00626 if nodename is None:
00627 nodename = list(self.nodenames)
00628 elif isinstance(nodename, str):
00629 nodename = [nodename]
00630
00631 did_run = False
00632 for node in nodename:
00633 did_run = True
00634 attr_command = Unix_Pipes.CreateComand([acmd], node)
00635 print >> log2 , attr_command
00636 p0 = Popen( attr_command, shell=True, stderr=_STDOUT,stdout=__PIPE__)
00637 ret = p0.wait( )
00638
00639 lines = p0.stdout.read( )
00640 print >> log2, lines
00641 if ret:
00642 print >> log , 'Error running Diagnostic on data "%(self)s"' %vars( )
00643 return False
00644 else:
00645 print >> log , 'File "%(self)s" is OK.' %vars( )
00646
00647 return did_run
00648
00649