1 import os
2 import socket
3 import string
4 import tempfile
5 import time
6
7 from inca.SimpleUnitReporter import SimpleUnitReporter
8
10 """GlobusUnitReporter - Convenience module for creating simple unit reporters
11 that submit a test via globus.
12
13 from inca.GlobusUnitReporter import GlobusUnitReporter
14 reporter = GlobusUnitReporter(
15 name = 'Reporter Name',
16 version = 0.1,
17 description = 'A really helpful reporter description',
18 url = 'http://url.to.more.reporter.info'
19 unit_name = 'What this reporter tests'
20 )
21
22 This module is a subclass of SimpleUnitReporter that provides convenience
23 methods for submitting a unit test via globus.
24 """
25
33
35 """Submit a small C program to execute via a local GRAM. In addition to
36 the parameters recognized by submitJob, the required attrs['code']
37 specifies the source to compile.
38 """
39 if not attrs.has_key('code'):
40 return (None, 'No code passed to submitCSource')
41
42 clean = 0
43 if attrs.has_key['cleanup']:
44 clean = attrs['cleanup']
45 dir = tempfile.gettempdir()
46 flavor = ''
47 if attrs.has_key['flavor']:
48 flavor = '--flavor=' + attrs['flavor']
49 os.chdir(dir);
50
51 cc = None
52 if os.environ.has_key('GLOBUS_CC'):
53 cc = os.environ['GLOBUS_CC']
54 ld = None
55 if os.environ.has_key('GLOBUS_LD'):
56 ld = os.environ['GLOBUS_LD']
57 if attrs.has_key['mpi'] and attrs['mpi'] != None:
58 cc = 'mpicc';
59 ld = 'mpicc';
60 makeFile = self.loggedCommand('globus-makefile-header ' + flavor)
61 if self.getLastExitCode():
62 return (None, 'globus-makefile-header failed: ' + makeFile + '\n')
63 makeFile += '''
64
65 all:
66 ''' + cc + ''' $(GLOBUS_CFLAGS) $(GLOBUS_INCLUDES) -c gh.c
67 ''' + ld + ''' -o gh gh.o $(GLOBUS_LDFLAGS) $(GLOBUS_PKG_LIBS) $(GLOBUS_LIBS)
68 '''
69
70 out = open('Makefile', 'w')
71 if not out:
72 return (None, 'Unable to write Makefile\n')
73 out.write(makefile)
74 out.close()
75 out = open('gh.c', 'w')
76 if not out:
77 return (None, 'Unable to write source\n')
78 out.write(attrs['code'])
79 out.close()
80
81 output = self.loggedCommand('make')
82 if self.getLastExitCode():
83 output = None
84 error = 'make failed: ' + output + '\n'
85 else:
86 attrs['remote'] = 0
87 (output, error) = self.submitJob(
88 executable = dir + '/gh', remote = 0,
89 env = 'LD_LIBRARY_PATH=' + os.environ['GLOBUS_LOCATION'] + '/lib',
90 **attrs
91 )
92 if clean:
93 os.chdir(os.environ['HOME'])
94 exit_code = self.getLastExitCode()
95 self.loggedCommand('/bin/rm -fr ' + dir)
96 self.setLastExitCode(exit_code)
97 return (output, error);
98
100 """Submit a job to execute a command via Globus. Recognized parameters:
101
102 arguments
103 arguments to pass to executable; default ''
104
105 check
106 poll job for completion every this many seconds; default 30
107
108 cleanup
109 remove temporary files after run; default true
110
111 count
112 number of hosts to use; default 1
113
114 debug
115 log the submision command and the result with -dumprsl; default false
116
117 duroc
118 add (resourceManagerContact=xx) to rsl; default false
119
120 executable
121 the program to run; required
122
123 env
124 environment variable to set; default ''
125
126 host
127 host where run takes place; default localhost
128
129 mpi
130 execute as an MPI program; default false
131
132 queue
133 name of batch queue to submit job to; default none
134
135 remote
136 executable is already on the jobmanager resource; default 1
137
138 service
139 the Globus service to invoke; default to Globus default
140
141 timeout
142 kill the job and report an error after this many seconds; default
143 3600 (1 hr)
144
145 """
146 if not attrs.has_key('executable'):
147 return (None, 'No executable supplied to submitJob\n')
148
149 clean = 1
150 if attrs.has_key('cleanup'):
151 clean = attrs['cleanup']
152 if attrs.has_key('host'):
153 contact = attrs['host'];
154 else:
155 contact = socket.gethostname()
156 if attrs.has_key('service'):
157 contact += '/' + attrs['service']
158 count = 1
159 if attrs.has_key('count'):
160 count = attrs['count']
161 debug = ''
162 if attrs.has_keys('debug') and attrs['debug']:
163 debug = '-dumprsl'
164 err = os.environ['HOME'] + '/.inca.tmp.' + os.getpid() + '.err'
165 extraRsl = '(host_count=' + count + ')'
166 if attrs.has_keys('duroc'):
167 extraRsl += '(resourceManagerContact=' + contact + ')'
168 if attrs.has_keys('mpi'):
169 extraRsl += '(jobtype=mpi)'
170 out = os.environ['HOME'] + '/.inca.tmp.' + os.getpid() + '.out'
171 pollTime = 30
172 if attrs.has_keys('check'):
173 pollTime = attrs['check']
174 timeout = 3600
175 if attrs.has_keys('timeout'):
176 timeout = attrs['timeout']
177
178 cmd = \
179 'globus-job-submit ' + debug + ' -stderr -s ' + err + ' -stdout -s ' + \
180 out + ' ' + contact + ' -count ' + count + ' -maxtime ' + \
181 int(timeout / 60) + " -x '" + extraRsl + "'"
182 if attrs.has_key('env'):
183 env = attrs['env'];
184 env = re.sub(r'^|:', ' -env ', env)
185 cmd += env
186 if attrs.has_key('queue'):
187 cmd += ' -q ' . attrs['queue']
188 if attrs.has_key('executable'):
189 if attrs.has_key('remote') and attrs['remote']:
190 cmd += ' -l ' + attrs['executable']
191 else:
192 cmd += ' -s ' + attrs['executable']
193 if attrs.has_key('arguments'):
194 cmd += ' ' + attrs['arguments']
195
196 jobId = self.loggedCommand(cmd);
197 if self.getLastExitCode():
198 return (None, "call to '" + cmd + "' failed: " + jobId + "\n")
199 if not re.search('https', jobId):
200 return (None, "invalid job id returned: '" + jobId + "'\n")
201
202 oldHandler = signal.signal(signal.SIGALRM, self._timeoutException)
203 try:
204 status = 'ACTIVE'
205 signal.alarm(int(timeout))
206 while re.match('(PENDING|ACTIVE|UNSUBMITTED)$', status):
207 time.sleep(int(pollTime))
208 status = self.loggedCommand('globus-job-status ' + jobId)
209 signal.alarm(0)
210 except SystemExit:
211 self.loggedCommand('globus-job-cancel -f ' + jobId)
212 return (None, 'job did not complete within '+str(timeout)+' seconds\n')
213 signal.signal(signal.SIGALRM, oldHandler)
214
215 output = None
216 file = open(out, 'r')
217 if file:
218 lines = file.readlines()
219 file.close()
220 output = string.join(lines, '\n')
221 if clean:
222 os.unlink(out)
223 error = None
224 file = open(err, 'r')
225 if file:
226 lines = file.readlines()
227 file.close()
228 error = string.join(lines, '\n')
229 if clean:
230 os.unlink(err)
231
232 return (output, error);
233