Package inca :: Module GlobusUnitReporter
[hide private]
[frames] | no frames]

Source Code for Module inca.GlobusUnitReporter

  1  import os 
  2  import re 
  3  import signal 
  4  import socket 
  5  import string 
  6  import tempfile 
  7  import time 
  8   
  9  from inca.SimpleUnitReporter import SimpleUnitReporter 
 10   
11 -class GlobusUnitReporter(SimpleUnitReporter):
12 """GlobusUnitReporter - Convenience module for creating simple unit reporters 13 that submit a test via globus:: 14 15 from inca.GlobusUnitReporter import GlobusUnitReporter 16 reporter = GlobusUnitReporter( 17 name = 'Reporter Name', 18 version = 0.1, 19 description = 'A really helpful reporter description', 20 url = 'http://url.to.more.reporter.info' 21 unit_name = 'What this reporter tests' 22 ) 23 24 This module is a subclass of SimpleUnitReporter that provides convenience 25 methods for submitting a unit test via globus. 26 """ 27
28 - def __init__(self, **attributes):
29 """Class constructor that returns a new GlobusUnitReporter object. See 30 SimpleUnitReporter for parameters. 31 """ 32 SimpleUnitReporter.__init__(self, **attributes) 33 self.addDependency('inca.GlobusUnitReporter') 34 self.addDependency('inca.GridProxyReporter')
35
36 - def submitCSource(self, **attrs):
37 """Submit a small C program to execute via a local GRAM. In addition to 38 the parameters recognized by submitJob, the required attrs['code'] 39 specifies the source to compile. 40 """ 41 if not attrs.has_key('code') or attrs['code'] == None: 42 return (None, 'No code passed to submitCSource') 43 44 clean = 0 45 if attrs.has_key('cleanup') and attrs['cleanup'] != None: 46 clean = attrs['cleanup'] 47 dir = tempfile.mkdtemp() 48 flavor = '' 49 if attrs.has_key('flavor') and attrs['flavor'] != None: 50 flavor = '--flavor=' + attrs['flavor'] 51 os.chdir(dir) 52 53 cc = '$(GLOBUS_CC)' 54 ld = '$(GLOBUS_LD)' 55 if attrs.has_key('mpi') and attrs['mpi']: 56 cc = 'mpicc' 57 ld = 'mpicc' 58 (status, makeFile) = \ 59 self.loggedCommandStatusOutput('globus-makefile-header ' + flavor) 60 if status != 0: 61 return (None, 'globus-makefile-header failed: ' + makeFile + '\n') 62 makeFile += ''' 63 64 all: 65 ''' + cc + ''' $(GLOBUS_CFLAGS) $(GLOBUS_INCLUDES) -c gh.c 66 ''' + ld + ''' -o gh gh.o $(GLOBUS_LDFLAGS) $(GLOBUS_PKG_LIBS) $(GLOBUS_LIBS) 67 ''' 68 69 out = open('Makefile', 'w') 70 if not out: 71 return (None, 'Unable to write Makefile\n') 72 out.write(makeFile) 73 out.close() 74 out = open('gh.c', 'w') 75 if not out: 76 return (None, 'Unable to write source\n') 77 out.write(attrs['code']) 78 out.close() 79 80 (status, output) = self.loggedCommandStatusOutput('make') 81 if status != 0: 82 error = 'make failed: ' + output + '\n' 83 output = None 84 else: 85 if attrs.has_key('env') and attrs['env'] != None and attrs['env'] != '': 86 attrs['env'] += ':' 87 else: 88 attrs['env'] = '' 89 attrs['env'] += 'LD_LIBRARY_PATH='+os.environ['GLOBUS_LOCATION']+'/lib' 90 attrs['executable'] = dir + '/gh' 91 attrs['remote'] = 0 92 (output, error) = self.submitJob(**attrs) 93 if clean: 94 os.chdir(os.environ['HOME']) 95 self.loggedCommandOutput('/bin/rm -fr ' + dir) 96 return (output, error)
97
98 - def submitJob(self, **attrs):
99 """Submit a job to execute a command via Globus. 100 Recognized parameters:: 101 102 arguments 103 arguments to pass to executable; default '' 104 105 check 106 poll job for completion every this many seconds; default 30 107 108 cleanup 109 remove temporary files after run; default true 110 111 count 112 number of hosts to use; default 1 113 114 debug 115 log the submision command and the result with -dumprsl; default false 116 117 duroc 118 add (resourceManagerContact=xx) to rsl; default false 119 120 executable 121 the program to run; required 122 123 env 124 environment variable to set; default '' 125 126 host 127 host where run takes place; default localhost 128 129 mpi 130 execute as an MPI program; default false 131 132 queue 133 name of batch queue to submit job to; default none 134 135 remote 136 executable is already on the jobmanager resource; default true 137 138 service 139 the Globus service to invoke; default to Globus default 140 141 timeout 142 kill the job and report an error after this many seconds; default 143 3600 (1 hr) 144 145 """ 146 if not attrs.has_key('executable') or attrs['executable'] == None: 147 return (None, 'No executable supplied to submitJob\n') 148 149 clean = 1 150 if attrs.has_key('cleanup') and attrs['cleanup'] != None: 151 clean = attrs['cleanup'] 152 if attrs.has_key('host') and attrs['host'] != None: 153 contact = attrs['host'] 154 else: 155 contact = socket.gethostname() 156 if attrs.has_key('service') and attrs['service'] != None: 157 contact += '/' + attrs['service'] 158 count = 1 159 if attrs.has_key('count') and attrs['count'] != None: 160 count = attrs['count'] 161 debug = '' 162 if attrs.has_key('debug') and attrs['debug']: 163 debug = '-dumprsl' 164 err = os.environ['HOME'] + '/.inca.tmp.' + str(os.getpid()) + '.err' 165 extraRsl = '(host_count=' + str(count) + ')' 166 if attrs.has_key('duroc') and attrs['duroc'] != None: 167 extraRsl += '(resourceManagerContact=' + contact + ')' 168 if attrs.has_key('mpi') and attrs['mpi']: 169 extraRsl += '(jobtype=mpi)' 170 out = os.environ['HOME'] + '/.inca.tmp.' + str(os.getpid()) + '.out' 171 pollTime = 30 172 if attrs.has_key('check') and attrs['check'] != None: 173 pollTime = attrs['check'] 174 timeout = 3600 175 if attrs.has_key('timeout') and attrs['timeout'] != None: 176 timeout = attrs['timeout'] 177 178 cmd = \ 179 'globus-job-submit ' + debug + ' -stderr -s ' + err + ' -stdout -s ' + \ 180 out + ' ' + contact + ' -count ' + str(count) + ' -maxtime ' + \ 181 str(int(int(timeout) / 60)) + " -x '" + extraRsl + "'" 182 if attrs.has_key('env') and attrs['env'] != None: 183 env = attrs['env'] 184 env = re.sub(r'^|:', ' -env ', env) 185 cmd += env 186 if attrs.has_key('queue') and attrs['queue'] != None: 187 cmd += ' -q ' + attrs['queue'] 188 if attrs.has_key('executable') and attrs['executable'] != None: 189 if attrs.has_key('remote') and attrs['remote']: 190 cmd += ' -l ' + attrs['executable'] 191 else: 192 cmd += ' -s ' + attrs['executable'] 193 if attrs.has_key('arguments') and attrs['arguments'] != None: 194 cmd += ' ' + attrs['arguments'] 195 196 (status, jobId) = self.loggedCommandStatusOutput(cmd) 197 if status != 0: 198 return (None, "call to '" + cmd + "' failed: " + jobId + "\n") 199 if not re.search('https', jobId): 200 return (None, "invalid job id returned: '" + jobId + "'\n") 201 202 oldHandler = signal.signal(signal.SIGALRM, self._timeoutException) 203 try: 204 jobStatus = 'ACTIVE' 205 signal.alarm(int(timeout)) 206 while re.match('(PENDING|ACTIVE|UNSUBMITTED)$', jobStatus): 207 time.sleep(int(pollTime)) 208 jobStatus = self.loggedCommandOutput('globus-job-status ' + jobId) 209 signal.alarm(0) 210 except (KeyboardInterrupt, SystemExit): 211 self.loggedCommandOutput('globus-job-cancel -f ' + jobId) 212 return (None, 'job did not complete within '+str(timeout)+' seconds\n') 213 signal.signal(signal.SIGALRM, oldHandler) 214 215 output = None 216 file = open(out, 'r') 217 if file: 218 lines = file.readlines() 219 file.close() 220 output = string.join(lines, '\n') 221 if clean: 222 os.unlink(out) 223 error = None 224 file = open(err, 'r') 225 if file: 226 lines = file.readlines() 227 file.close() 228 error = string.join(lines, '\n') 229 if clean: 230 os.unlink(err) 231 232 return (output, error)
233