提交 83d0c53e 编写于 作者: M Marbin Tan 提交者: Larry Hamel

Add daemon flag to the WorkerPool

* In some test cases, we would like to have the main process shutdown
  without waiting for the pool thread. This is possible with a pool
  thread being a daemon.

- Modified patch from Jimmy
Signed-off-by: NLarry Hamel <lhamel@pivotal.io>
上级 708ecfac
......@@ -1135,6 +1135,7 @@ class AnalyzeWorkerPool(WorkerPool):
self.completed_queue=Queue()
self.should_stop=False
self.num_assigned=0
self.daemonize=False
if items is not None:
for item in items:
self.work_queue.put(item)
......@@ -1154,8 +1155,8 @@ class AnalyzeWorker(Worker):
"""
def __init__(self,name,pool):
Worker.__init__(self, name, pool)
def run(self):
while True:
try:
......@@ -1163,7 +1164,7 @@ class AnalyzeWorker(Worker):
self.cmd = self.pool.getNextWorkItem()
except TypeError:
# misleading exception raised during interpreter shutdown
return
return
# we must have got a command to run here
if self.cmd is None:
......@@ -1198,7 +1199,7 @@ class AnalyzeWorker(Worker):
self.logger.debug("[%s] finished cmd with exception: %s" % (self.name, self.cmd))
self.pool.addFinishedWorkItem(self.cmd)
self.cmd=None
if __name__ == '__main__':
sys.argv[0] = EXECNAME
simple_main(create_parser, AnalyzeDb)
#!/usr/bin/env python
#
# Copyright (c) Greenplum Inc 2008. All Rights Reserved.
# Copyright (c) Greenplum Inc 2008. All Rights Reserved.
#
'''
base.py
base.py
common base for the commands execution framework. Units of work are defined as Operations
as found in other modules like unix.py. These units of work are then packaged up and executed
within a GpCommand. A GpCommand is just a common infrastructure for executing an Operation.
The general idea is that the application developer breaks the problem down into a set of
The general idea is that the application developer breaks the problem down into a set of
GpCommands that need to be executed. This class also provides a queue and set of workers
for executing this set of commands.
for executing this set of commands.
'''
from Queue import Queue,Empty
......@@ -50,56 +50,57 @@ SSH_RETRY_DELAY=.5
class WorkerPool(object):
"""TODO:"""
halt_command='halt command'
def __init__(self,numWorkers=16,items=None):
def __init__(self,numWorkers=16,items=None,daemonize=False):
self.workers=[]
self.should_stop=False
self.work_queue=Queue()
self.completed_queue=Queue()
self.num_assigned=0
self.daemonize=daemonize
if items is not None:
for item in items:
for item in items:
self.work_queue.put(item)
self.num_assigned += 1
for i in range(0,numWorkers):
w = Worker("worker%d" % i,self)
w = Worker("worker%d" % i, self)
self.workers.append(w)
w.start()
self.numWorkers = numWorkers
self.logger = logger
###
def getNumWorkers(self):
return self.numWorkers
def getNextWorkItem(self):
return self.work_queue.get(block=True)
def addFinishedWorkItem(self,command):
self.completed_queue.put(command)
self.completed_queue.put(command)
self.work_queue.task_done()
def markTaskDone(self):
self.work_queue.task_done()
def addCommand(self,cmd):
self.logger.debug("Adding cmd to work_queue: %s" % cmd.cmdStr)
def addCommand(self,cmd):
self.logger.debug("Adding cmd to work_queue: %s" % cmd.cmdStr)
self.work_queue.put(cmd)
self.num_assigned += 1
def wait_and_printdots(self,command_count,quiet=True):
while self.completed_queue.qsize() < command_count:
time.sleep(1)
if not quiet:
sys.stdout.write(".")
sys.stdout.flush()
if not quiet:
print " "
print " "
self.join()
def print_progress(self, command_count):
while True:
num_completed = self.completed_queue.qsize()
......@@ -110,11 +111,11 @@ class WorkerPool(object):
if num_completed >= command_count:
return
time.sleep(10)
def join(self):
self.work_queue.join()
return True
def joinWorkers(self):
for w in self.workers:
w.join()
......@@ -125,41 +126,41 @@ class WorkerPool(object):
while True:
item=self.completed_queue.get(False)
if item is not None:
completedList.append(item)
completedList.append(item)
except Empty:
return completedList
return completedList #just to be sure
def check_results(self):
""" goes through all items in the completed_queue and throws an exception at the
first one that didn't execute successfully
throws ExecutionError
"""
"""
try:
while True:
item=self.completed_queue.get(False)
if not item.get_results().wasSuccessful():
raise ExecutionError("Error Executing Command: ",item)
raise ExecutionError("Error Executing Command: ",item)
except Empty:
return
return
def empty_completed_items(self):
while not self.completed_queue.empty():
self.completed_queue.get(False)
self.completed_queue.get(False)
def isDone(self):
#TODO: not sure that qsize() is safe
return (self.num_assigned == self.completed_queue.qsize())
def haltWork(self):
self.logger.debug("WorkerPool haltWork()")
self.should_stop=True
for w in self.workers:
w.haltWork()
w.haltWork()
self.work_queue.put(self.halt_command)
class OperationWorkerPool(WorkerPool):
......@@ -183,14 +184,14 @@ class Worker(Thread):
cmd=None
name=None
logger=None
def __init__(self,name,pool):
self.name=name
self.pool=pool
self.logger=logger
Thread.__init__(self)
self.daemon=pool.daemonize
def run(self):
while True:
try:
......@@ -198,7 +199,7 @@ class Worker(Thread):
self.cmd = self.pool.getNextWorkItem()
except TypeError:
# misleading exception raised during interpreter shutdown
return
return
# we must have got a command to run here
if self.cmd is None:
......@@ -226,18 +227,18 @@ class Worker(Thread):
self.logger.debug("[%s] finished cmd with exception: %s" % (self.name, self.cmd))
self.pool.addFinishedWorkItem(self.cmd)
self.cmd=None
def haltWork(self):
self.logger.debug("[%s] haltWork" % self.name)
# this was originally coded as
#
#
# if self.cmd is not None:
# self.cmd.interrupt()
# self.cmd.cancel()
#
# but as observed in MPP-13808, the worker thread's run() loop may set self.cmd to None
# past the point where the calling thread checks self.cmd for None, leading to a curious
# past the point where the calling thread checks self.cmd for None, leading to a curious
# "'NoneType' object has no attribute 'cancel' exception" which may prevent the worker pool's
# haltWorkers() from actually halting all the workers.
#
......@@ -245,29 +246,29 @@ class Worker(Thread):
if c is not None and isinstance(c, Command):
c.interrupt()
c.cancel()
"""
TODO: consider just having a single interface that needs to be implemented for
describing work to allow the Workers to use it. This would allow the user
to better provide logic necessary. i.e. even though the user wants to
to better provide logic necessary. i.e. even though the user wants to
execute a unix command... how the results are interpretted are highly
application specific. So we should have a separate level of abstraction
for executing UnixCommands and DatabaseCommands from this one.
other things to think about:
-- how to support cancel
-- how to support progress
-- undo?
-- blocking vs. unblocking
"""
......@@ -275,11 +276,11 @@ TODO: consider just having a single interface that needs to be implemented for
class CommandResult():
""" Used as a way to package up the results from a GpCommand
"""
#rc,stdout,stderr,completed,halt
def __init__(self,rc,stdout,stderr,completed,halt):
self.rc=rc
self.stdout=stdout
......@@ -292,7 +293,7 @@ class CommandResult():
res = "cmd had rc=%d completed=%s halted=%s\n stdout='%s'\n " \
"stderr='%s'" % (self.rc,str(self.completed), str(self.halt), self.stdout, self.stderr)
return res
def wasSuccessful(self):
if self.halt:
return False
......@@ -304,9 +305,9 @@ class CommandResult():
def __str__(self):
return self.printResult()
def split_stdout(self, how=':'):
"""
"""
TODO: AK: This doesn't belong here if it pertains only to pg_controldata.
MPP-16318: Skip over discrepancies in the pg_controldata stdout, as it's
......@@ -320,7 +321,7 @@ class CommandResult():
yield ret
class ExecutionError(Exception):
class ExecutionError(Exception):
def __init__(self,summary,cmd):
self.summary=summary
self.cmd=cmd
......@@ -364,27 +365,27 @@ def createExecutionContext(execution_context_id,remoteHost,stdin, nakedExecution
raise Exception("Programmer Error. Specified NAKED execution context but didn't provide a NakedExecutionInfo")
return NakedExecutionContext(remoteHost, stdin, nakedExecutionInfo)
class ExecutionContext():
""" An ExecutionContext defines where and how to execute the Command and how to
gather up information that are the results of the command.
""" An ExecutionContext defines where and how to execute the Command and how to
gather up information that are the results of the command.
"""
propagate_env_map = {}
"""
"""
Dict. mapping environment variables to their values. See gpcoverage.py for example usage.
"""
def __init__(self):
pass
def execute(self,cmd):
pass
def interrupt(self,cmd):
pass
def cancel(self,cmd):
pass
......@@ -398,23 +399,23 @@ class LocalExecutionContext(ExecutionContext):
ExecutionContext.__init__(self)
self.stdin = stdin
pass
def execute(self, cmd, wait=True):
# prepend env. variables from ExcecutionContext.propagate_env_map
# e.g. Given {'FOO': 1, 'BAR': 2}, we'll produce "FOO=1 BAR=2 ..."
for k, v in self.__class__.propagate_env_map.iteritems():
cmd.cmdStr = "%s=%s %s" % (k, v, cmd.cmdStr)
# also propagate env from command instance specific map
for k, v in cmd.propagate_env_map.iteritems():
cmd.cmdStr = "%s=%s %s" % (k, v, cmd.cmdStr)
# executable='/bin/bash' is to ensure the shell is bash. bash isn't the
# actual command executed, but the shell that command string runs under.
self.proc = gpsubprocess.Popen(cmd.cmdStr, env=None, shell=True,
self.proc = gpsubprocess.Popen(cmd.cmdStr, env=None, shell=True,
executable='/bin/bash',
stdin=subprocess.PIPE,
stderr=subprocess.PIPE,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE, close_fds=True)
if wait:
(rc,stdout_value,stderr_value)=self.proc.communicate2(input=self.stdin)
......@@ -433,7 +434,7 @@ class LocalExecutionContext(ExecutionContext):
self.halt=True
if self.proc:
self.proc.cancel()
##########################################################################
# Naked Execution is used to run commands where ssh keys are not exchanged
class NakedExecutionInfo:
......@@ -558,7 +559,7 @@ class NakedExecutionContext(LocalExecutionContext):
try:
stdin, stdout, stderr = self.client.exec_command(cmd.cmdStr)
rc = stdout.channel.recv_exit_status()
rc = stdout.channel.recv_exit_status()
self.completed=True
cmd.set_results(CommandResult(rc,stdout.readlines(),stderr.readlines(),self.completed, self.halt))
stdin.close()
......@@ -601,25 +602,25 @@ class NakedExecutionContext(LocalExecutionContext):
self.halt=True
self.client.close()
cmd.set_results(CommandResult(1,"","command on host " + self.targetHost + " interrupted ", False, False))
def cancel(self, cmd):
self.client.close()
cmd.set_results(CommandResult(1,"","command on host " + self.targetHost + " canceled ", False, False))
class RemoteExecutionContext(LocalExecutionContext):
trail = set()
"""
Leaves a trail of hosts to which we've ssh'ed, during the life of a particular interpreter.
"""
def __init__(self,targetHost,stdin):
LocalExecutionContext.__init__(self, stdin)
self.targetHost=targetHost
pass
def execute(self,cmd):
# prepend env. variables from ExcecutionContext.propagate_env_map
# e.g. Given {'FOO': 1, 'BAR': 2}, we'll produce "FOO=1 BAR=2 ..."
......@@ -638,7 +639,7 @@ class RemoteExecutionContext(LocalExecutionContext):
if (cmd.get_results().stderr.startswith('ssh_exchange_identification: Connection closed by remote host')):
self.__retry(cmd)
pass
def __retry(self, cmd, count=0):
if count == SSH_MAX_RETRY:
return
......@@ -646,8 +647,8 @@ class RemoteExecutionContext(LocalExecutionContext):
LocalExecutionContext.execute(self, cmd)
if (cmd.get_results().stderr.startswith('ssh_exchange_identification: Connection closed by remote host')):
self.__retry(cmd, count + 1)
class RMIExecutionContext(ExecutionContext):
""" Leave this as a big old TODO: for now. see agent.py for some more details"""
def __init__(self):
......@@ -657,7 +658,7 @@ class RMIExecutionContext(ExecutionContext):
class Command:
""" TODO:
""" TODO:
"""
name=None
cmdStr=None
......@@ -667,10 +668,10 @@ class Command:
def __init__(self,name,cmdStr,ctxt=LOCAL,remoteHost=None,stdin=None,nakedExecutionInfo=None):
self.name=name
self.cmdStr=cmdStr
self.cmdStr=cmdStr
self.exec_context=createExecutionContext(ctxt,remoteHost,stdin=stdin,nakedExecutionInfo=nakedExecutionInfo)
self.remoteHost=remoteHost
def __str__(self):
if self.results:
return "%s cmdStr='%s' had result: %s" % (self.name,self.cmdStr,self.results)
......@@ -692,75 +693,75 @@ class Command:
else:
# simulate error
self.results = CommandResult(1,'Fault Injection','Fault Injection' ,False,True)
if validateAfter:
self.validate()
pass
def set_results(self,results):
self.results=results
def get_results(self):
return self.results
def get_stdout_lines(self):
return self.results.stdout.splitlines()
def get_stderr_lines(self):
return self.results.stderr.splitlines()
def cancel(self):
if self.exec_context and isinstance(self.exec_context, ExecutionContext):
self.exec_context.cancel(self)
def interrupt(self):
if self.exec_context and isinstance(self.exec_context, ExecutionContext):
self.exec_context.interrupt(self)
def was_successful(self):
if self.results is None:
return False
else:
return self.results.wasSuccessful()
def validate(self,expected_rc=0):
"""Plain vanilla validation which expects a 0 return code."""
"""Plain vanilla validation which expects a 0 return code."""
if self.results.rc != expected_rc:
raise ExecutionError("non-zero rc: %d" % self.results.rc, self)
class SQLCommand(Command):
"""Base class for commands that execute SQL statements. Classes
that inherit from SQLCOmmand should set cancel_conn to the pygresql
that inherit from SQLCOmmand should set cancel_conn to the pygresql
connection they wish to cancel and check self.cancel_flag."""
def __init__(self,name):
Command.__init__(self, name, cmdStr=None)
self.cancel_flag = False
self.cancel_conn = None
def run(self,validateAfter=False):
raise ExecutionError("programmer error. implementors of SQLCommand must implement run()", self)
def interrupt(self):
# No execution context for SQLCommands
pass
def cancel(self):
# assignment is an atomic operation in python
self.cancel_flag = True
# if self.conn is not set we cannot cancel.
if self.cancel_conn:
DB(self.cancel_conn).cancel()
def run_remote_commands(name, commands):
"""
"""
......
......@@ -41,7 +41,7 @@ class GpRecoverseg():
cmd = Command(name='Run gprecoverseg', cmdStr='source %s/greenplum_path.sh;%s' % (self.gphome, rcvr_cmd))
tinctest.logger.info("Running gprecoverseg : %s" % cmd)
pool = WorkerPool()
pool = WorkerPool(numWorkers=1, daemonize=True)
pool.addCommand(cmd)
def run(self,option=' ', validate=True, results=True):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册