1 import os
2 import re
3 import signal
4 import socket
5 import string
6 import tempfile
7 import time
8
9 from inca.SimpleUnitReporter import SimpleUnitReporter
10
12 """GlobusUnitReporter - Convenience module for creating simple unit reporters
13 that submit a test via globus::
14
15 from inca.GlobusUnitReporter import GlobusUnitReporter
16 reporter = GlobusUnitReporter(
17 name = 'Reporter Name',
18 version = 0.1,
19 description = 'A really helpful reporter description',
20 url = 'http://url.to.more.reporter.info'
21 unit_name = 'What this reporter tests'
22 )
23
24 This module is a subclass of SimpleUnitReporter that provides convenience
25 methods for submitting a unit test via globus.
26 """
27
29 """Class constructor that returns a new GlobusUnitReporter object. See
30 SimpleUnitReporter for parameters.
31 """
32 SimpleUnitReporter.__init__(self, **attributes)
33 self.addDependency('inca.GlobusUnitReporter')
34 self.addDependency('inca.GridProxyReporter')
35
37 """Submit a small C program to execute via a local GRAM. In addition to
38 the parameters recognized by submitJob, the required attrs['code']
39 specifies the source to compile.
40 """
41 if not attrs.has_key('code') or attrs['code'] == None:
42 return (None, 'No code passed to submitCSource')
43
44 clean = 0
45 if attrs.has_key('cleanup') and attrs['cleanup'] != None:
46 clean = attrs['cleanup']
47 dir = tempfile.mkdtemp()
48 flavor = ''
49 if attrs.has_key('flavor') and attrs['flavor'] != None:
50 flavor = '--flavor=' + attrs['flavor']
51 os.chdir(dir)
52
53 cc = '$(GLOBUS_CC)'
54 ld = '$(GLOBUS_LD)'
55 if attrs.has_key('mpi') and attrs['mpi']:
56 cc = 'mpicc'
57 ld = 'mpicc'
58 (status, makeFile) = \
59 self.loggedCommandStatusOutput('globus-makefile-header ' + flavor)
60 if status != 0:
61 return (None, 'globus-makefile-header failed: ' + makeFile + '\n')
62 makeFile += '''
63
64 all:
65 ''' + cc + ''' $(GLOBUS_CFLAGS) $(GLOBUS_INCLUDES) -c gh.c
66 ''' + ld + ''' -o gh gh.o $(GLOBUS_LDFLAGS) $(GLOBUS_PKG_LIBS) $(GLOBUS_LIBS)
67 '''
68
69 out = open('Makefile', 'w')
70 if not out:
71 return (None, 'Unable to write Makefile\n')
72 out.write(makeFile)
73 out.close()
74 out = open('gh.c', 'w')
75 if not out:
76 return (None, 'Unable to write source\n')
77 out.write(attrs['code'])
78 out.close()
79
80 (status, output) = self.loggedCommandStatusOutput('make')
81 if status != 0:
82 error = 'make failed: ' + output + '\n'
83 output = None
84 else:
85 if attrs.has_key('env') and attrs['env'] != None and attrs['env'] != '':
86 attrs['env'] += ':'
87 else:
88 attrs['env'] = ''
89 attrs['env'] += 'LD_LIBRARY_PATH='+os.environ['GLOBUS_LOCATION']+'/lib'
90 attrs['executable'] = dir + '/gh'
91 attrs['remote'] = 0
92 (output, error) = self.submitJob(**attrs)
93 if clean:
94 os.chdir(os.environ['HOME'])
95 self.loggedCommandOutput('/bin/rm -fr ' + dir)
96 return (output, error)
97
99 """Submit a job to execute a command via Globus.
100 Recognized parameters::
101
102 arguments
103 arguments to pass to executable; default ''
104
105 check
106 poll job for completion every this many seconds; default 30
107
108 cleanup
109 remove temporary files after run; default true
110
111 count
112 number of hosts to use; default 1
113
114 debug
115 log the submision command and the result with -dumprsl; default false
116
117 duroc
118 add (resourceManagerContact=xx) to rsl; default false
119
120 executable
121 the program to run; required
122
123 env
124 environment variable to set; default ''
125
126 host
127 host where run takes place; default localhost
128
129 mpi
130 execute as an MPI program; default false
131
132 queue
133 name of batch queue to submit job to; default none
134
135 remote
136 executable is already on the jobmanager resource; default true
137
138 service
139 the Globus service to invoke; default to Globus default
140
141 timeout
142 kill the job and report an error after this many seconds; default
143 3600 (1 hr)
144
145 """
146 if not attrs.has_key('executable') or attrs['executable'] == None:
147 return (None, 'No executable supplied to submitJob\n')
148
149 clean = 1
150 if attrs.has_key('cleanup') and attrs['cleanup'] != None:
151 clean = attrs['cleanup']
152 if attrs.has_key('host') and attrs['host'] != None:
153 contact = attrs['host']
154 else:
155 contact = socket.gethostname()
156 if attrs.has_key('service') and attrs['service'] != None:
157 contact += '/' + attrs['service']
158 count = 1
159 if attrs.has_key('count') and attrs['count'] != None:
160 count = attrs['count']
161 debug = ''
162 if attrs.has_key('debug') and attrs['debug']:
163 debug = '-dumprsl'
164 err = os.environ['HOME'] + '/.inca.tmp.' + str(os.getpid()) + '.err'
165 extraRsl = '(host_count=' + str(count) + ')'
166 if attrs.has_key('duroc') and attrs['duroc'] != None:
167 extraRsl += '(resourceManagerContact=' + contact + ')'
168 if attrs.has_key('mpi') and attrs['mpi']:
169 extraRsl += '(jobtype=mpi)'
170 out = os.environ['HOME'] + '/.inca.tmp.' + str(os.getpid()) + '.out'
171 pollTime = 30
172 if attrs.has_key('check') and attrs['check'] != None:
173 pollTime = attrs['check']
174 timeout = 3600
175 if attrs.has_key('timeout') and attrs['timeout'] != None:
176 timeout = attrs['timeout']
177
178 cmd = \
179 'globus-job-submit ' + debug + ' -stderr -s ' + err + ' -stdout -s ' + \
180 out + ' ' + contact + ' -count ' + str(count) + ' -maxtime ' + \
181 str(int(int(timeout) / 60)) + " -x '" + extraRsl + "'"
182 if attrs.has_key('env') and attrs['env'] != None:
183 env = attrs['env']
184 env = re.sub(r'^|:', ' -env ', env)
185 cmd += env
186 if attrs.has_key('queue') and attrs['queue'] != None:
187 cmd += ' -q ' + attrs['queue']
188 if attrs.has_key('executable') and attrs['executable'] != None:
189 if attrs.has_key('remote') and attrs['remote']:
190 cmd += ' -l ' + attrs['executable']
191 else:
192 cmd += ' -s ' + attrs['executable']
193 if attrs.has_key('arguments') and attrs['arguments'] != None:
194 cmd += ' ' + attrs['arguments']
195
196 (status, jobId) = self.loggedCommandStatusOutput(cmd)
197 if status != 0:
198 return (None, "call to '" + cmd + "' failed: " + jobId + "\n")
199 if not re.search('https', jobId):
200 return (None, "invalid job id returned: '" + jobId + "'\n")
201
202 oldHandler = signal.signal(signal.SIGALRM, self._timeoutException)
203 try:
204 jobStatus = 'ACTIVE'
205 signal.alarm(int(timeout))
206 while re.match('(PENDING|ACTIVE|UNSUBMITTED)$', jobStatus):
207 time.sleep(int(pollTime))
208 jobStatus = self.loggedCommandOutput('globus-job-status ' + jobId)
209 signal.alarm(0)
210 except (KeyboardInterrupt, SystemExit):
211 self.loggedCommandOutput('globus-job-cancel -f ' + jobId)
212 return (None, 'job did not complete within '+str(timeout)+' seconds\n')
213 signal.signal(signal.SIGALRM, oldHandler)
214
215 output = None
216 file = open(out, 'r')
217 if file:
218 lines = file.readlines()
219 file.close()
220 output = string.join(lines, '\n')
221 if clean:
222 os.unlink(out)
223 error = None
224 file = open(err, 'r')
225 if file:
226 lines = file.readlines()
227 file.close()
228 error = string.join(lines, '\n')
229 if clean:
230 os.unlink(err)
231
232 return (output, error)
233