Changeset created on Mon Apr 19 14:28:35 CEST 2010 by Seek You Too Description: Timeout adapted to work with "Complete in one attempt" Process timeout is now per harvestiteration instead of the whole process. Baseline version: meresco-harvester/tags/version_6.0 diff --unidirectional-new-file '--exclude=.svn' '--exclude=*.pyc' '--exclude=applied' --recursive --unified version_6.0/merescoharvester/harvester/startharvester.py version_6.1/merescoharvester/harvester/startharvester.py --- version_6.0/merescoharvester/harvester/startharvester.py 2010-03-15 16:57:48.000000000 +0100 +++ version_6.1/merescoharvester/harvester/startharvester.py 2010-04-19 14:27:01.000000000 +0200 @@ -34,21 +34,22 @@ from eventlogger import EventLogger, NilEventLogger, CompositeLogger, StreamEventLogger from harvester import Harvester from virtualuploader import LoggingUploader -import sys, os, optparse from saharaget import SaharaGet from time import sleep import traceback from timedprocess import TimedProcess from urllib import urlopen from os.path import join -from sys import stderr, stdout +from sys import stderr, stdout, exit, argv +from optparse import OptionParser +AGAIN_EXITCODE = 42 class StartHarvester(object): def __init__(self): - if len(sys.argv[1:]) == 0: - sys.argv.append('-h') - self.parser = optparse.OptionParser() + if len(argv[1:]) == 0: + argv.append('-h') + self.parser = OptionParser() args = self.parse_args() self.__dict__.update(args.__dict__) @@ -64,8 +65,92 @@ self.repository = self.repositoryId and self.saharaget.getRepository(self.domainId, self.repositoryId) + + def parse_args(self): + self.parser.add_option("-d", "--domain", + dest="domainId", + help="Mandatory argument denoting the domain.", + metavar="DOMAIN") + self.parser.add_option("-s", "--saharaurl", + dest="saharaurl", + help="The url of the SAHARA web interface, e.g. https://username:password@sahara.example.org", + default="http://localhost") + self.parser.add_option("-r", "--repository", + dest="repositoryId", + help="Process a single repository within the given domain. Defaults to all repositories from the domain.", + metavar="REPOSITORY") + self.parser.add_option("-t", "--set-process-timeout", + dest="processTimeout", + type="int", + default=60*60, + metavar="TIMEOUT", + help="Subprocess will be timed out after amount of seconds.") + self.parser.add_option("--logDir", "", + dest="_logDir", + help="Override the logDir in the apache configuration.", + metavar="DIRECTORY", + default=None) + self.parser.add_option("--stateDir", + dest="_stateDir", + help="Override the stateDir in the apache configuration.", + metavar="DIRECTORY", + default=None) + self.parser.add_option("--uploadLog", "", + dest="uploadLog", + help="Set the mockUploadLogFile to which the fields are logged instead of uploading to a server.", + metavar="FILE") + self.parser.add_option("--force-target", "", + dest="forceTarget", + help="Overrides the repository's target", + metavar="TARGETID") + self.parser.add_option("--force-mapping", "", + dest="forceMapping", + help="Overrides the repository's mapping", + metavar="MAPPINGID") + self.parser.add_option("--no-action-done", "", + action="store_false", + dest="setActionDone", + default=True, + help="Do not set SAHARA's actions", + metavar="TARGETID") + self.parser.add_option("--runOnce", "", + dest="runOnce", + action="store_true", + default=False, + help="Prevent harvester from looping (if combined with --repository)") + + (options, args) = self.parser.parse_args() + return options + + def start(self): if not self.repository: - self.restartWithLoop(self.domainId, self.processTimeout) + self._restartWithLoop() + elif not self.runOnce: + self._startRepositoryWithChild() + else: + self._startRepository() + + def _restartWithLoop(self): + for key in self.saharaget.getRepositoryIds(self.domainId): + self._startChildProcess(['--repository='+key, '--runOnce']) + + def _startRepositoryWithChild(self): + self._startChildProcess(['--runOnce']) + + def _startChildProcess(self, extraArgs): + args = argv[:1] + extraArgs + argv[1:] + exitstatus = AGAIN_EXITCODE + while exitstatus == AGAIN_EXITCODE: + t = TimedProcess() + try: + SIG_INT = 2 + exitstatus = t.executeScript(args, self.processTimeout, SIG_INT) + except KeyboardInterrupt, e: + t.terminate() + raise + + + def _startRepository(self): if self.forceTarget: self.repository.targetId = self.forceTarget @@ -81,51 +166,11 @@ if self.uploadLog: self.repository.mockUploader = LoggingUploader(EventLogger(self.uploadLog)) - def parse_args(self): - self.parser.add_option("-d", "--domain", dest="domainId", - help="Mandatory argument denoting the domain.", metavar="DOMAIN") - self.parser.add_option("-s", "--saharaurl", dest="saharaurl", - help="The url of the SAHARA web interface, e.g. https://username:password@sahara.example.org", default="http://localhost") - self.parser.add_option("-r", "--repository", dest="repositoryId", - help="Process a single repository within the given domain. Defaults to all repositories from the domain.", metavar="REPOSITORY") - self.parser.add_option("-t", "--set-process-timeout", dest="processTimeout", - type="int", default=60*60, metavar="TIMEOUT", - help="Subprocess will be timed out after amount of seconds.") - self.parser.add_option("--logDir", "", dest="_logDir", - help="Override the logDir in the apache configuration.", metavar="DIRECTORY", default=None) - self.parser.add_option("--stateDir", dest="_stateDir", - help="Override the stateDir in the apache configuration.", metavar="DIRECTORY", default=None) - self.parser.add_option("--uploadLog", "", dest="uploadLog", - help="Set the mockUploadLogFile to which the fields are logged instead of uploading to a server.", metavar="FILE") - self.parser.add_option("--force-target", "", dest="forceTarget", - help="Overrides the repository's target", metavar="TARGETID") - self.parser.add_option("--force-mapping", "", dest="forceMapping", - help="Overrides the repository's mapping", metavar="MAPPINGID") - self.parser.add_option("--no-action-done", "", action="store_false", - dest="setActionDone", default=True, - help="Do not set SAHARA's actions", metavar="TARGETID") - - (options, args) = self.parser.parse_args() - return options - - def restartWithLoop(self, domainId, processTimeout=60*60): - for key in self.saharaget.getRepositoryIds(domainId): - args = sys.argv[:1] + ['--repository='+key] + sys.argv[1:] - t = TimedProcess() - try: - SIG_INT = 2 - t.executeScript(args, processTimeout, SIG_INT) - except KeyboardInterrupt, e: - t.terminate() - raise - sys.exit() - - def start(self): - again = True - while again: - messageIgnored, again = self.repository.do( - stateDir=join(self._stateDir, self.domainId), - logDir=join(self._logDir, self.domainId), - generalHarvestLog=self._generalHarvestLog) + messageIgnored, again = self.repository.do( + stateDir=join(self._stateDir, self.domainId), + logDir=join(self._logDir, self.domainId), + generalHarvestLog=self._generalHarvestLog) sleep(1) + if again: + exit(AGAIN_EXITCODE) diff --unidirectional-new-file '--exclude=.svn' '--exclude=*.pyc' '--exclude=applied' --recursive --unified version_6.0/merescoharvester/harvester/timedprocess.py version_6.1/merescoharvester/harvester/timedprocess.py --- version_6.0/merescoharvester/harvester/timedprocess.py 2010-03-15 16:57:48.000000000 +0100 +++ version_6.1/merescoharvester/harvester/timedprocess.py 2010-04-19 14:27:01.000000000 +0200 @@ -29,13 +29,10 @@ # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA # ## end license ## -# -# (c) 2005 Seek You Too B.V. -# -# $Id: timedprocess.py 4825 2007-04-16 13:36:24Z TJ $ -# -import os, sys +from os import spawnvp, waitpid, kill, P_NOWAIT +from sys import executable + from threading import Timer class TimedProcess(object): @@ -57,19 +54,22 @@ def terminate(self): if self._pid != -1: - os.kill(self._pid, self._signal) + kill(self._pid, self._signal) self._wasTimeout = True self.timer.cancel() def executeScript(self, args, timeout, signal=9): self._signal = signal - self._pid = os.spawnvp(os.P_NOWAIT, sys.executable, - [sys.executable] + args) + self._pid = spawnvp(P_NOWAIT, executable, + [executable] + args) self.timer = Timer(timeout, self.terminate) self.timer.start() - os.waitpid(self._pid, 0) + resultpid, status = waitpid(self._pid, 0) + exitstatus = status >> 8 if not self._wasTimeout: self.timer.cancel() self._wasSuccess = True self._wasTimeout = False + return exitstatus + diff --unidirectional-new-file '--exclude=.svn' '--exclude=*.pyc' '--exclude=applied' --recursive --unified version_6.0/test/timedprocesstest.py version_6.1/test/timedprocesstest.py --- version_6.0/test/timedprocesstest.py 2010-03-15 16:57:47.000000000 +0100 +++ version_6.1/test/timedprocesstest.py 2010-04-19 14:27:00.000000000 +0200 @@ -7,7 +7,7 @@ # Seek You Too B.V. (CQ2) http://www.cq2.nl # Copyright (C) 2006-2007 SURFnet B.V. http://www.surfnet.nl # Copyright (C) 2007-2008 SURF Foundation. http://www.surf.nl -# Copyright (C) 2007-2009 Seek You Too (CQ2) http://www.cq2.nl +# Copyright (C) 2007-2010 Seek You Too (CQ2) http://www.cq2.nl # Copyright (C) 2007-2009 Stichting Kennisnet Ict op school. # http://www.kennisnetictopschool.nl # Copyright (C) 2009 Tilburg University http://www.uvt.nl @@ -30,45 +30,43 @@ # ## end license ## -import unittest, tempfile, os +from cq2utils import CQ2TestCase from merescoharvester.harvester.timedprocess import TimedProcess +from os.path import join -class TimedProcessTest(unittest.TestCase): - - def setUp(self): - fd, self.filename = tempfile.mkstemp() - - def tearDown(self): - os.remove(self.filename) +class TimedProcessTest(CQ2TestCase): def testSuccess(self): - fd = open(self.filename,'w') + fd = open(self.tempfile,'w') try: - fd.write('pass') + fd.write("""import sys +sys.exit(42)""") finally: fd.close() tp = TimedProcess() - tp.executeScript([self.filename], 10) - self.assert_(not tp.wasTimeout()) - self.assert_(tp.wasSuccess()) + exitstatus = tp.executeScript([self.tempfile], 10) + self.assertFalse(tp.wasTimeout()) + self.assertTrue(tp.wasSuccess()) + self.assertEquals(42, exitstatus) def testSuccessParameters(self): - fd = open(self.filename,'w') + fd = open(self.tempfile,'w') try: fd.write("""import sys -#print len(sys.argv[1:]) -""") +open('%s', 'w').write(str(len(sys.argv[1:]))) +""" % join(self.tempdir, 'output.txt')) finally: fd.close() tp = TimedProcess() - tp.executeScript([self.filename, 'it','is','difficult'], 10) - self.assert_(not tp.wasTimeout()) - self.assert_(tp.wasSuccess()) + tp.executeScript([self.tempfile, 'it','is','difficult'], 10) + self.assertFalse(tp.wasTimeout()) + self.assertTrue(tp.wasSuccess()) + self.assertEquals('3', open(join(self.tempdir, 'output.txt')).read()) def testTimeout(self): - fd = open(self.filename,'w') + fd = open(self.tempfile,'w') try: fd.write("""while True: pass @@ -77,7 +75,7 @@ fd.close() tp = TimedProcess() - tp.executeScript([self.filename], 1) - self.assert_(tp.wasTimeout()) - self.assert_(not tp.wasSuccess()) + tp.executeScript([self.tempfile], 1) + self.assertTrue(tp.wasTimeout()) + self.assertFalse(tp.wasSuccess())