提交 b82f27ec 编写于 作者: dengyihao's avatar dengyihao

tsc: handle schedule error

上级 4b4d8d7f
...@@ -31,7 +31,6 @@ typedef struct SSchedMsg { ...@@ -31,7 +31,6 @@ typedef struct SSchedMsg {
void *thandle; void *thandle;
} SSchedMsg; } SSchedMsg;
typedef struct { typedef struct {
char label[TSDB_LABEL_LEN]; char label[TSDB_LABEL_LEN];
tsem_t emptySem; tsem_t emptySem;
...@@ -48,7 +47,6 @@ typedef struct { ...@@ -48,7 +47,6 @@ typedef struct {
void *pTimer; void *pTimer;
} SSchedQueue; } SSchedQueue;
/** /**
* Create a thread-safe ring-buffer based task queue and return the instance. A thread * Create a thread-safe ring-buffer based task queue and return the instance. A thread
* pool will be created to consume the messages in the queue. * pool will be created to consume the messages in the queue.
...@@ -57,7 +55,7 @@ typedef struct { ...@@ -57,7 +55,7 @@ typedef struct {
* @param label the label of the queue * @param label the label of the queue
* @return the created queue scheduler * @return the created queue scheduler
*/ */
void *taosInitScheduler(int32_t capacity, int32_t numOfThreads, const char *label, SSchedQueue* pSched); void *taosInitScheduler(int32_t capacity, int32_t numOfThreads, const char *label, SSchedQueue *pSched);
/** /**
* Create a thread-safe ring-buffer based task queue and return the instance. * Create a thread-safe ring-buffer based task queue and return the instance.
...@@ -83,7 +81,7 @@ void taosCleanUpScheduler(void *queueScheduler); ...@@ -83,7 +81,7 @@ void taosCleanUpScheduler(void *queueScheduler);
* @param queueScheduler the queue scheduler instance * @param queueScheduler the queue scheduler instance
* @param pMsg the message for the task * @param pMsg the message for the task
*/ */
void taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg); int taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -1399,7 +1399,12 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { ...@@ -1399,7 +1399,12 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
arg->msg = *pMsg; arg->msg = *pMsg;
arg->pEpset = tEpSet; arg->pEpset = tEpSet;
taosAsyncExec(doProcessMsgFromServer, arg, NULL); if (0 != taosAsyncExec(doProcessMsgFromServer, arg, NULL)) {
tscError("failed to sched msg to tsc, tsc ready to quit");
rpcFreeCont(pMsg->pCont);
taosMemoryFree(arg->pEpset);
taosMemoryFree(arg);
}
} }
TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) { TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) {
......
...@@ -134,8 +134,7 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code) ...@@ -134,8 +134,7 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code)
schedMsg.thandle = execParam; schedMsg.thandle = execParam;
schedMsg.msg = code; schedMsg.msg = code;
taosScheduleTask(&pTaskQueue, &schedMsg); return taosScheduleTask(&pTaskQueue, &schedMsg);
return 0;
} }
void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
...@@ -472,5 +471,3 @@ int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst) { ...@@ -472,5 +471,3 @@ int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -149,18 +149,18 @@ void *taosProcessSchedQueue(void *scheduler) { ...@@ -149,18 +149,18 @@ void *taosProcessSchedQueue(void *scheduler) {
return NULL; return NULL;
} }
void taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg) { int taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg) {
SSchedQueue *pSched = (SSchedQueue *)queueScheduler; SSchedQueue *pSched = (SSchedQueue *)queueScheduler;
int32_t ret = 0; int32_t ret = 0;
if (pSched == NULL) { if (pSched == NULL) {
uError("sched is not ready, msg:%p is dropped", pMsg); uError("sched is not ready, msg:%p is dropped", pMsg);
return; return -1;
} }
if (atomic_load_8(&pSched->stop)) { if (atomic_load_8(&pSched->stop)) {
uError("sched is already stopped, msg:%p is dropped", pMsg); uError("sched is already stopped, msg:%p is dropped", pMsg);
return; return -1;
} }
if ((ret = tsem_wait(&pSched->emptySem)) != 0) { if ((ret = tsem_wait(&pSched->emptySem)) != 0) {
...@@ -185,6 +185,7 @@ void taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg) { ...@@ -185,6 +185,7 @@ void taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg) {
uFatal("post %s fullSem failed(%s)", pSched->label, strerror(errno)); uFatal("post %s fullSem failed(%s)", pSched->label, strerror(errno));
ASSERT(0); ASSERT(0);
} }
return ret;
} }
void taosCleanUpScheduler(void *param) { void taosCleanUpScheduler(void *param) {
...@@ -192,11 +193,11 @@ void taosCleanUpScheduler(void *param) { ...@@ -192,11 +193,11 @@ void taosCleanUpScheduler(void *param) {
if (pSched == NULL) return; if (pSched == NULL) return;
uDebug("start to cleanup %s schedQsueue", pSched->label); uDebug("start to cleanup %s schedQsueue", pSched->label);
atomic_store_8(&pSched->stop, 1); atomic_store_8(&pSched->stop, 1);
taosMsleep(200); taosMsleep(200);
for (int32_t i = 0; i < pSched->numOfThreads; ++i) { for (int32_t i = 0; i < pSched->numOfThreads; ++i) {
if (taosCheckPthreadValid(pSched->qthread[i])) { if (taosCheckPthreadValid(pSched->qthread[i])) {
tsem_post(&pSched->fullSem); tsem_post(&pSched->fullSem);
...@@ -220,7 +221,7 @@ void taosCleanUpScheduler(void *param) { ...@@ -220,7 +221,7 @@ void taosCleanUpScheduler(void *param) {
if (pSched->queue) taosMemoryFree(pSched->queue); if (pSched->queue) taosMemoryFree(pSched->queue);
if (pSched->qthread) taosMemoryFree(pSched->qthread); if (pSched->qthread) taosMemoryFree(pSched->qthread);
//taosMemoryFree(pSched); // taosMemoryFree(pSched);
} }
// for debug purpose, dump the scheduler status every 1min. // for debug purpose, dump the scheduler status every 1min.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册