Package inca :: Module GlobusUnitReporter
[hide private]
[frames] | no frames]

Source Code for Module inca.GlobusUnitReporter

  1  import os 
  2  import socket 
  3  import string 
  4  import tempfile 
  5  import time 
  6   
  7  from inca.SimpleUnitReporter import SimpleUnitReporter 
  8   
9 -class GlobusUnitReporter:
10 """GlobusUnitReporter - Convenience module for creating simple unit reporters 11 that submit a test via globus. 12 13 from inca.GlobusUnitReporter import GlobusUnitReporter 14 reporter = GlobusUnitReporter( 15 name = 'Reporter Name', 16 version = 0.1, 17 description = 'A really helpful reporter description', 18 url = 'http://url.to.more.reporter.info' 19 unit_name = 'What this reporter tests' 20 ) 21 22 This module is a subclass of SimpleUnitReporter that provides convenience 23 methods for submitting a unit test via globus. 24 """ 25
26 - def __init__(self, **attributes):
27 """Class constructor that returns a new GlobusUnitReporter object. See 28 SimpleUnitReporter for parameters. 29 """ 30 SimpleUnitReporter.__init__(self, **attributes) 31 self.addDependency('inca.GlobusUnitReporter') 32 self.addDependency('inca.GridProxy')
33
34 - def submitCSource(self, **attrs):
35 """Submit a small C program to execute via a local GRAM. In addition to 36 the parameters recognized by submitJob, the required attrs['code'] 37 specifies the source to compile. 38 """ 39 if not attrs.has_key('code'): 40 return (None, 'No code passed to submitCSource') 41 42 clean = 0 43 if attrs.has_key['cleanup']: 44 clean = attrs['cleanup'] 45 dir = tempfile.gettempdir() 46 flavor = '' 47 if attrs.has_key['flavor']: 48 flavor = '--flavor=' + attrs['flavor'] 49 os.chdir(dir); 50 51 cc = None 52 if os.environ.has_key('GLOBUS_CC'): 53 cc = os.environ['GLOBUS_CC'] 54 ld = None 55 if os.environ.has_key('GLOBUS_LD'): 56 ld = os.environ['GLOBUS_LD'] 57 if attrs.has_key['mpi'] and attrs['mpi'] != None: 58 cc = 'mpicc'; 59 ld = 'mpicc'; 60 makeFile = self.loggedCommand('globus-makefile-header ' + flavor) 61 if self.getLastExitCode(): 62 return (None, 'globus-makefile-header failed: ' + makeFile + '\n') 63 makeFile += ''' 64 65 all: 66 ''' + cc + ''' $(GLOBUS_CFLAGS) $(GLOBUS_INCLUDES) -c gh.c 67 ''' + ld + ''' -o gh gh.o $(GLOBUS_LDFLAGS) $(GLOBUS_PKG_LIBS) $(GLOBUS_LIBS) 68 ''' 69 70 out = open('Makefile', 'w') 71 if not out: 72 return (None, 'Unable to write Makefile\n') 73 out.write(makefile) 74 out.close() 75 out = open('gh.c', 'w') 76 if not out: 77 return (None, 'Unable to write source\n') 78 out.write(attrs['code']) 79 out.close() 80 81 output = self.loggedCommand('make') 82 if self.getLastExitCode(): 83 output = None 84 error = 'make failed: ' + output + '\n' 85 else: 86 attrs['remote'] = 0 87 (output, error) = self.submitJob( 88 executable = dir + '/gh', remote = 0, 89 env = 'LD_LIBRARY_PATH=' + os.environ['GLOBUS_LOCATION'] + '/lib', 90 **attrs 91 ) 92 if clean: 93 os.chdir(os.environ['HOME']) 94 exit_code = self.getLastExitCode() 95 self.loggedCommand('/bin/rm -fr ' + dir) 96 self.setLastExitCode(exit_code) 97 return (output, error);
98
99 - def submitJob(self, **atts):
100 """Submit a job to execute a command via Globus. Recognized parameters: 101 102 arguments 103 arguments to pass to executable; default '' 104 105 check 106 poll job for completion every this many seconds; default 30 107 108 cleanup 109 remove temporary files after run; default true 110 111 count 112 number of hosts to use; default 1 113 114 debug 115 log the submision command and the result with -dumprsl; default false 116 117 duroc 118 add (resourceManagerContact=xx) to rsl; default false 119 120 executable 121 the program to run; required 122 123 env 124 environment variable to set; default '' 125 126 host 127 host where run takes place; default localhost 128 129 mpi 130 execute as an MPI program; default false 131 132 queue 133 name of batch queue to submit job to; default none 134 135 remote 136 executable is already on the jobmanager resource; default 1 137 138 service 139 the Globus service to invoke; default to Globus default 140 141 timeout 142 kill the job and report an error after this many seconds; default 143 3600 (1 hr) 144 145 """ 146 if not attrs.has_key('executable'): 147 return (None, 'No executable supplied to submitJob\n') 148 149 clean = 1 150 if attrs.has_key('cleanup'): 151 clean = attrs['cleanup'] 152 if attrs.has_key('host'): 153 contact = attrs['host']; 154 else: 155 contact = socket.gethostname() 156 if attrs.has_key('service'): 157 contact += '/' + attrs['service'] 158 count = 1 159 if attrs.has_key('count'): 160 count = attrs['count'] 161 debug = '' 162 if attrs.has_keys('debug') and attrs['debug']: 163 debug = '-dumprsl' 164 err = os.environ['HOME'] + '/.inca.tmp.' + os.getpid() + '.err' 165 extraRsl = '(host_count=' + count + ')' 166 if attrs.has_keys('duroc'): 167 extraRsl += '(resourceManagerContact=' + contact + ')' 168 if attrs.has_keys('mpi'): 169 extraRsl += '(jobtype=mpi)' 170 out = os.environ['HOME'] + '/.inca.tmp.' + os.getpid() + '.out' 171 pollTime = 30 172 if attrs.has_keys('check'): 173 pollTime = attrs['check'] 174 timeout = 3600 175 if attrs.has_keys('timeout'): 176 timeout = attrs['timeout'] 177 178 cmd = \ 179 'globus-job-submit ' + debug + ' -stderr -s ' + err + ' -stdout -s ' + \ 180 out + ' ' + contact + ' -count ' + count + ' -maxtime ' + \ 181 int(timeout / 60) + " -x '" + extraRsl + "'" 182 if attrs.has_key('env'): 183 env = attrs['env']; 184 env = re.sub(r'^|:', ' -env ', env) 185 cmd += env 186 if attrs.has_key('queue'): 187 cmd += ' -q ' . attrs['queue'] 188 if attrs.has_key('executable'): 189 if attrs.has_key('remote') and attrs['remote']: 190 cmd += ' -l ' + attrs['executable'] 191 else: 192 cmd += ' -s ' + attrs['executable'] 193 if attrs.has_key('arguments'): 194 cmd += ' ' + attrs['arguments'] 195 196 jobId = self.loggedCommand(cmd); 197 if self.getLastExitCode(): 198 return (None, "call to '" + cmd + "' failed: " + jobId + "\n") 199 if not re.search('https', jobId): 200 return (None, "invalid job id returned: '" + jobId + "'\n") 201 202 oldHandler = signal.signal(signal.SIGALRM, self._timeoutException) 203 try: 204 status = 'ACTIVE' 205 signal.alarm(int(timeout)) 206 while re.match('(PENDING|ACTIVE|UNSUBMITTED)$', status): 207 time.sleep(int(pollTime)) 208 status = self.loggedCommand('globus-job-status ' + jobId) 209 signal.alarm(0) 210 except SystemExit: 211 self.loggedCommand('globus-job-cancel -f ' + jobId) 212 return (None, 'job did not complete within '+str(timeout)+' seconds\n') 213 signal.signal(signal.SIGALRM, oldHandler) 214 215 output = None 216 file = open(out, 'r') 217 if file: 218 lines = file.readlines() 219 file.close() 220 output = string.join(lines, '\n') 221 if clean: 222 os.unlink(out) 223 error = None 224 file = open(err, 'r') 225 if file: 226 lines = file.readlines() 227 file.close() 228 error = string.join(lines, '\n') 229 if clean: 230 os.unlink(err) 231 232 return (output, error);
233