1  import os 
  2  import re 
  3  import signal 
  4  import socket 
  5  import string 
  6  import tempfile 
  7  import time 
  8   
  9  from inca.SimpleUnitReporter import SimpleUnitReporter 
 10   
 12    """GlobusUnitReporter - Convenience module for creating simple unit reporters 
 13       that submit a test via globus:: 
 14   
 15         from inca.GlobusUnitReporter import GlobusUnitReporter 
 16         reporter = GlobusUnitReporter( 
 17           name = 'Reporter Name', 
 18           version = 0.1, 
 19           description = 'A really helpful reporter description', 
 20           url = 'http://url.to.more.reporter.info' 
 21           unit_name = 'What this reporter tests' 
 22         ) 
 23   
 24       This module is a subclass of SimpleUnitReporter that provides convenience 
 25       methods for submitting a unit test via globus. 
 26    """ 
 27   
 29      """Class constructor that returns a new GlobusUnitReporter object.  See 
 30         SimpleUnitReporter for parameters. 
 31      """ 
 32      SimpleUnitReporter.__init__(self, **attributes) 
 33      self.addDependency('inca.GlobusUnitReporter') 
 34      self.addDependency('inca.GridProxyReporter') 
  35   
 37      """Submit a small C program to execute via a local GRAM.  In addition to 
 38         the parameters recognized by submitJob, the required attrs['code'] 
 39         specifies the source to compile. 
 40      """ 
 41      if not attrs.has_key('code') or attrs['code'] == None: 
 42        return (None, 'No code passed to submitCSource') 
 43   
 44      clean = 0 
 45      if attrs.has_key('cleanup') and attrs['cleanup'] != None: 
 46        clean = attrs['cleanup'] 
 47      dir = tempfile.mkdtemp() 
 48      flavor = '' 
 49      if attrs.has_key('flavor') and attrs['flavor'] != None: 
 50        flavor = '--flavor=' + attrs['flavor'] 
 51      os.chdir(dir) 
 52   
 53      cc = '$(GLOBUS_CC)' 
 54      ld = '$(GLOBUS_LD)' 
 55      if attrs.has_key('mpi') and attrs['mpi']: 
 56        cc = 'mpicc' 
 57        ld = 'mpicc' 
 58      (status, makeFile) = \ 
 59        self.loggedCommandStatusOutput('globus-makefile-header ' + flavor) 
 60      if status != 0: 
 61        return (None, 'globus-makefile-header failed: ' + makeFile + '\n') 
 62      makeFile += ''' 
 63   
 64  all: 
 65          ''' + cc + ''' $(GLOBUS_CFLAGS) $(GLOBUS_INCLUDES) -c gh.c 
 66          ''' + ld + ''' -o gh gh.o $(GLOBUS_LDFLAGS) $(GLOBUS_PKG_LIBS) $(GLOBUS_LIBS) 
 67  ''' 
 68   
 69      out = open('Makefile', 'w') 
 70      if not out: 
 71        return (None, 'Unable to write Makefile\n') 
 72      out.write(makeFile) 
 73      out.close() 
 74      out = open('gh.c', 'w') 
 75      if not out: 
 76        return (None, 'Unable to write source\n') 
 77      out.write(attrs['code']) 
 78      out.close() 
 79   
 80      (status, output) = self.loggedCommandStatusOutput('make') 
 81      if status != 0: 
 82        error = 'make failed: ' + output + '\n' 
 83        output = None 
 84      else: 
 85        if attrs.has_key('env') and attrs['env'] != None and attrs['env'] != '': 
 86          attrs['env'] += ':' 
 87        else: 
 88          attrs['env'] = '' 
 89        attrs['env'] += 'LD_LIBRARY_PATH='+os.environ['GLOBUS_LOCATION']+'/lib' 
 90        attrs['executable'] = dir + '/gh' 
 91        attrs['remote'] = 0 
 92        (output, error) = self.submitJob(**attrs) 
 93      if clean: 
 94        os.chdir(os.environ['HOME']) 
 95        self.loggedCommandOutput('/bin/rm -fr ' + dir) 
 96      return (output, error) 
  97   
 99      """Submit a job to execute a command via Globus. 
100         Recognized parameters:: 
101   
102           arguments 
103             arguments to pass to executable; default '' 
104   
105           check 
106             poll job for completion every this many seconds; default 30 
107   
108           cleanup 
109             remove temporary files after run; default true 
110   
111           count 
112             number of hosts to use; default 1 
113   
114           debug 
115             log the submision command and the result with -dumprsl; default false 
116   
117           duroc 
118             add (resourceManagerContact=xx) to rsl; default false 
119   
120           executable 
121             the program to run; required 
122   
123           env 
124             environment variable to set; default '' 
125   
126           host 
127             host where run takes place; default localhost 
128   
129           mpi 
130             execute as an MPI program; default false 
131   
132           queue 
133             name of batch queue to submit job to; default none 
134   
135           remote 
136             executable is already on the jobmanager resource; default true 
137   
138           service 
139             the Globus service to invoke; default to Globus default 
140   
141           timeout 
142             kill the job and report an error after this many seconds; default 
143             3600 (1 hr) 
144   
145      """ 
146      if not attrs.has_key('executable') or attrs['executable'] == None: 
147        return (None, 'No executable supplied to submitJob\n') 
148   
149      clean = 1 
150      if attrs.has_key('cleanup') and attrs['cleanup'] != None: 
151        clean = attrs['cleanup'] 
152      if attrs.has_key('host') and attrs['host'] != None: 
153        contact = attrs['host'] 
154      else: 
155        contact = socket.gethostname() 
156      if attrs.has_key('service') and attrs['service'] != None: 
157        contact += '/' + attrs['service'] 
158      count = 1 
159      if attrs.has_key('count') and attrs['count'] != None: 
160        count = attrs['count'] 
161      debug = '' 
162      if attrs.has_key('debug') and attrs['debug']: 
163        debug = '-dumprsl' 
164      err = os.environ['HOME'] + '/.inca.tmp.' + str(os.getpid()) + '.err' 
165      extraRsl = '(host_count=' + str(count) + ')' 
166      if attrs.has_key('duroc') and attrs['duroc'] != None: 
167        extraRsl += '(resourceManagerContact=' + contact + ')' 
168      if attrs.has_key('mpi') and attrs['mpi']: 
169        extraRsl += '(jobtype=mpi)' 
170      out = os.environ['HOME'] + '/.inca.tmp.' + str(os.getpid()) + '.out' 
171      pollTime = 30 
172      if attrs.has_key('check') and attrs['check'] != None: 
173        pollTime = attrs['check'] 
174      timeout = 3600 
175      if attrs.has_key('timeout') and attrs['timeout'] != None: 
176        timeout = attrs['timeout'] 
177   
178      cmd = \ 
179        'globus-job-submit ' + debug + ' -stderr -s ' + err + ' -stdout -s ' + \ 
180        out + ' ' + contact + ' -count ' + str(count) + ' -maxtime ' + \ 
181        str(int(int(timeout) / 60)) + " -x '" + extraRsl + "'" 
182      if attrs.has_key('env') and attrs['env'] != None: 
183        env = attrs['env'] 
184        env = re.sub(r'^|:', ' -env ', env) 
185        cmd += env 
186      if attrs.has_key('queue') and attrs['queue'] != None: 
187        cmd += ' -q ' + attrs['queue'] 
188      if attrs.has_key('executable') and attrs['executable'] != None: 
189        if attrs.has_key('remote') and attrs['remote']: 
190          cmd += ' -l ' + attrs['executable'] 
191        else: 
192          cmd += ' -s ' + attrs['executable'] 
193      if attrs.has_key('arguments') and attrs['arguments'] != None: 
194        cmd += ' ' + attrs['arguments'] 
195   
196      (status, jobId) = self.loggedCommandStatusOutput(cmd) 
197      if status != 0: 
198        return (None, "call to '" + cmd + "' failed: " + jobId + "\n") 
199      if not re.search('https', jobId): 
200        return (None, "invalid job id returned: '" + jobId + "'\n") 
201   
202      oldHandler = signal.signal(signal.SIGALRM, self._timeoutException) 
203      try: 
204        jobStatus = 'ACTIVE' 
205        signal.alarm(int(timeout)) 
206        while re.match('(PENDING|ACTIVE|UNSUBMITTED)$', jobStatus): 
207          time.sleep(int(pollTime)) 
208          jobStatus = self.loggedCommandOutput('globus-job-status ' + jobId) 
209        signal.alarm(0) 
210      except (KeyboardInterrupt, SystemExit): 
211        self.loggedCommandOutput('globus-job-cancel -f ' + jobId) 
212        return (None, 'job did not complete within '+str(timeout)+' seconds\n') 
213      signal.signal(signal.SIGALRM, oldHandler) 
214   
215      output = None 
216      file = open(out, 'r') 
217      if file: 
218        lines = file.readlines() 
219        file.close() 
220        output = string.join(lines, '\n') 
221        if clean: 
222          os.unlink(out) 
223      error = None 
224      file = open(err, 'r') 
225      if file: 
226        lines = file.readlines() 
227        file.close() 
228        error = string.join(lines, '\n') 
229        if clean: 
230          os.unlink(err) 
231   
232      return (output, error) 
  233