提交 5754c95a 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/develop' into hotfix/crash

......@@ -1213,7 +1213,11 @@ class Task():
self._executeInternal(te, wt) # TODO: no return value?
except taos.error.ProgrammingError as err:
errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno # correct error scheme
if ( errno2 in [0x200, 0x360, 0x362, 0x36A, 0x36B, 0x36D, 0x381, 0x380, 0x383, 0x503, 0x600,
if ( errno2 in [
0x05, # TSDB_CODE_RPC_NOT_READY
0x200, 0x360, 0x362, 0x36A, 0x36B, 0x36D, 0x381, 0x380, 0x383, 0x503,
0x510, # vnode not in ready state
0x600,
1000 # REST catch-all error
]) : # allowed errors
self.logDebug("[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, self._lastSql))
......@@ -1629,73 +1633,219 @@ class MyLoggingAdapter(logging.LoggerAdapter):
# return '[%s] %s' % (self.extra['connid'], msg), kwargs
class SvcManager:
MAX_QUEUE_SIZE = 10000
def __init__(self):
print("Starting service manager")
signal.signal(signal.SIGTERM, self.sigIntHandler)
signal.signal(signal.SIGINT, self.sigIntHandler)
signal.signal(signal.SIGUSR1, self.sigUsrHandler)
self.ioThread = None
self.subProcess = None
self.shouldStop = False
self.status = MainExec.STATUS_RUNNING
# self.status = MainExec.STATUS_RUNNING # set inside _startTaosService()
def svcOutputReader(self, out: IO, queue):
# print("This is the svcOutput Reader...")
for line in out : # iter(out.readline, b''):
# Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
print("This is the svcOutput Reader...")
# for line in out :
for line in iter(out.readline, b''):
# print("Finished reading a line: {}".format(line))
queue.put(line.rstrip()) # get rid of new lines
print("No more output from incoming IO") # meaning sub process must have died
# print("Adding item to queue...")
line = line.decode("utf-8").rstrip()
queue.put(line) # This might block, and then causing "out" buffer to block
print("_i", end="", flush=True)
# Trim the queue if necessary
oneTenthQSize = self.MAX_QUEUE_SIZE // 10
if (queue.qsize() >= (self.MAX_QUEUE_SIZE - oneTenthQSize) ) : # 90% full?
print("Triming IPC queue by: {}".format(oneTenthQSize))
for i in range(0, oneTenthQSize) :
try:
queue.get_nowait()
except Empty:
break # break out of for loop, no more trimming
if self.shouldStop :
print("Stopping to read output from sub process")
break
# queue.put(line)
print("\nNo more output (most likely) from IO thread managing TDengine service") # meaning sub process must have died
out.close()
def sigIntHandler(self, signalNumber, frame):
def _doMenu(self):
choice = ""
while True:
print("\nInterrupting Service Program, Choose an Action: ")
print("1: Resume")
print("2: Terminate")
print("3: Restart")
# Remember to update the if range below
# print("Enter Choice: ", end="", flush=True)
while choice == "":
choice = input("Enter Choice: ")
if choice != "":
break # done with reading repeated input
if choice in ["1", "2", "3"]:
break # we are done with whole method
print("Invalid choice, please try again.")
choice = "" # reset
return choice
def sigUsrHandler(self, signalNumber, frame) :
print("Interrupting main thread execution upon SIGUSR1")
if self.status != MainExec.STATUS_RUNNING :
print("Ignoring repeated SIGINT...")
print("Ignoring repeated SIG...")
return # do nothing if it's already not running
self.status = MainExec.STATUS_STOPPING
choice = self._doMenu()
if choice == "1" :
self.sigHandlerResume() # TODO: can the sub-process be blocked due to us not reading from queue?
elif choice == "2" :
self.stopTaosService()
elif choice == "3" :
self.stopTaosService()
self.startTaosService()
else:
raise RuntimeError("Invalid menu choice: {}".format(choice))
def sigIntHandler(self, signalNumber, frame):
print("Sig INT Handler starting...")
if self.status != MainExec.STATUS_RUNNING :
print("Ignoring repeated SIG_INT...")
return
self.status = MainExec.STATUS_STOPPING # immediately set our status
self.stopTaosService()
print("INT signal handler returning...")
print("Terminating program...")
self.subProcess.send_signal(signal.SIGINT)
self.shouldStop = True
self.joinIoThread()
def sigHandlerResume(self) :
print("Resuming TDengine service manager thread (main thread)...\n\n")
self.status = MainExec.STATUS_RUNNING
def joinIoThread(self):
if self.ioThread :
self.ioThread.join()
self.ioThread = None
else :
print("Joining empty thread, doing nothing")
def run(self):
TD_READY_MSG = "TDengine is initialized successfully"
def _procIpcBatch(self):
# Process all the output generated by the underlying sub process, managed by IO thread
while True :
try:
line = self.ipcQueue.get_nowait() # getting output at fast speed
print("_o", end="", flush=True)
if self.status == MainExec.STATUS_STARTING : # we are starting, let's see if we have started
if line.find(self.TD_READY_MSG) != -1 : # found
self.status = MainExec.STATUS_RUNNING
except Empty:
# time.sleep(2.3) # wait only if there's no output
# no more output
return # we are done with THIS BATCH
else: # got line
print(line)
def _procIpcAll(self):
while True :
print("<", end="", flush=True)
self._procIpcBatch() # process one batch
# check if the ioThread is still running
if (not self.ioThread) or (not self.ioThread.is_alive()):
print("IO Thread (with subprocess) has ended, main thread now exiting...")
self.stopTaosService()
self._procIpcBatch() # one more batch
return # TODO: maybe one last batch?
# Maybe handler says we should exit now
if self.shouldStop:
print("Main thread ending all IPC processing with IOThread/SubProcess")
self._procIpcBatch() # one more batch
return
print(">", end="", flush=True)
time.sleep(0.5)
def startTaosService(self):
ON_POSIX = 'posix' in sys.builtin_module_names
svcCmd = ['../../build/build/bin/taosd', '-c', '../../build/test/cfg']
# svcCmd = ['vmstat', '1']
self.subProcess = subprocess.Popen(svcCmd, stdout=subprocess.PIPE, bufsize=1, close_fds=ON_POSIX, text=True)
q = Queue()
self.ioThread = threading.Thread(target=self.svcOutputReader, args=(self.subProcess.stdout, q))
if self.subProcess : # already there
raise RuntimeError("Corrupt process state")
self.subProcess = subprocess.Popen(
svcCmd,
stdout=subprocess.PIPE,
# bufsize=1, # not supported in binary mode
close_fds=ON_POSIX) # had text=True, which interferred with reading EOF
self.ipcQueue = Queue()
if self.ioThread :
raise RuntimeError("Corrupt thread state")
self.ioThread = threading.Thread(target=self.svcOutputReader, args=(self.subProcess.stdout, self.ipcQueue))
self.ioThread.daemon = True # thread dies with the program
self.ioThread.start()
self.shouldStop = False # don't let the main loop stop
self.status = MainExec.STATUS_STARTING
# wait for service to start
for i in range(0, 10) :
time.sleep(1.0)
self._procIpcBatch() # pump messages
print("_zz_", end="", flush=True)
if self.status == MainExec.STATUS_RUNNING :
print("TDengine service READY to process requests")
return # now we've started
raise RuntimeError("TDengine service did not start successfully") # TODO: handle this better?
def stopTaosService(self):
# can be called from both main thread or signal handler
print("Terminating TDengine service running as the sub process...")
# Linux will send Control-C generated SIGINT to the TDengine process already, ref: https://unix.stackexchange.com/questions/176235/fork-and-how-signals-are-delivered-to-processes
if not self.subProcess :
print("Process already stopped")
return
retCode = self.subProcess.poll()
if retCode : # valid return code, process ended
self.subProcess = None
else: # process still alive, let's interrupt it
print("Sub process still running, sending SIG_INT and waiting for it to stop...")
self.subProcess.send_signal(signal.SIGINT) # sub process should end, then IPC queue should end, causing IO thread to end
try :
self.subProcess.wait(10)
except subprocess.TimeoutExpired as err:
print("Time out waiting for TDengine service process to exit")
else:
print("TDengine service process terminated successfully from SIG_INT")
self.subProcess = None
if self.subProcess and (not self.subProcess.poll()):
print("Sub process is still running... pid = {}".format(self.subProcess.pid))
self.shouldStop = True
self.joinIoThread()
def run(self):
self.startTaosService()
# proc = subprocess.Popen(['echo', '"to stdout"'],
# stdout=subprocess.PIPE,
# )
# stdout_value = proc.communicate()[0]
# print('\tstdout: {}'.format(repr(stdout_value)))
while True :
try:
line = q.get_nowait() # getting output at fast speed
except Empty:
# print('no output yet')
time.sleep(2.3) # wait only if there's no output
else: # got line
print(line)
# print("----end of iteration----")
if self.shouldStop:
print("Ending main Svc thread")
break
self._procIpcAll()
print("end of loop")
self.joinIoThread()
print("Finished")
print("End of loop reading from IPC queue")
self.joinIoThread() # should have started already
print("SvcManager Run Finished")
class ClientManager:
def __init__(self):
......@@ -1747,6 +1897,10 @@ class ClientManager:
self._printLastNumbers()
def run(self):
if gConfig.auto_start_service :
svcMgr = SvcManager()
svcMgr.startTaosService()
self._printLastNumbers()
dbManager = DbManager() # Regular function
......@@ -1758,6 +1912,8 @@ class ClientManager:
# print("exec stats: {}".format(self.tc.getExecStats()))
# print("TC failed = {}".format(self.tc.isFailed()))
self.conclude()
if gConfig.auto_start_service :
svcMgr.stopTaosService()
# print("TC failed (2) = {}".format(self.tc.isFailed()))
return 1 if self.tc.isFailed() else 0 # Linux return code: ref https://shapeshed.com/unix-exit-codes/
......@@ -1767,8 +1923,9 @@ class ClientManager:
class MainExec:
STATUS_RUNNING = 1
STATUS_STOPPING = 2
STATUS_STARTING = 1
STATUS_RUNNING = 2
STATUS_STOPPING = 3
# STATUS_STOPPED = 3 # Not used yet
@classmethod
......@@ -1837,6 +1994,8 @@ def main():
'''))
parser.add_argument('-a', '--auto-start-service', action='store_true',
help='Automatically start/stop the TDengine service (default: false)')
parser.add_argument('-c', '--connector-type', action='store', default='native', type=str,
help='Connector type to use: native, rest, or mixed (default: 10)')
parser.add_argument('-d', '--debug', action='store_true',
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册