提交 3a0fc274 编写于 作者: A Alex Duan

feat(tsdb): sync code from v24 after add delete case log

上级 be112f71
......@@ -23,7 +23,7 @@
#include "tscSubquery.h"
void tscSetDnodeEpSet(SRpcEpSet* pEpSet, SVgroupMsg* pVgroupInfo);
void tscDumpEpSetFromVgroupInfo(SRpcEpSet *pEpSet, SNewVgroupInfo *pVgroupInfo);
//
// handle error
......@@ -85,6 +85,7 @@ void tscSubDeleteCallback(void *param, TAOS_RES *tres, int code) {
return;
}
// record
tscInfo("0x%"PRIx64":CDEL sub:0x%"PRIx64" query complete, ep:%s, vgId:%d, orderOfSub:%d, retrieve row(s)=%d tables(s)=%d", trsupport->pParentSql->self,
pSql->self, pVgroup->epAddr[pSql->epSet.inUse].fqdn, pVgroup->vgId, trsupport->subqueryIndex, pSql->res.numOfRows, pSql->res.numOfTables);
......@@ -156,8 +157,9 @@ SSqlObj *tscCreateSTableSubDelete(SSqlObj *pSql, SVgroupMsg* pVgroupMsg, SRetrie
registerSqlObj(pNew);
tscDebug("0x%"PRIx64":CDEL new sub insertion: %p", pSql->self, pNew);
// set vnode epset
tscSetDnodeEpSet(&pNew->epSet, pVgroupMsg);
SNewVgroupInfo vgroupInfo = {0};
taosHashGetClone(UTIL_GET_VGROUPMAP(pSql), &pVgroupMsg->vgId, sizeof(pVgroupMsg->vgId), NULL, &vgroupInfo);
tscDumpEpSetFromVgroupInfo(&pNew->epSet, &vgroupInfo);
return pNew;
}
......
......@@ -73,7 +73,7 @@ static int32_t removeDupVgid(int32_t *src, int32_t sz) {
return ret;
}
void tscSetDnodeEpSet(SRpcEpSet* pEpSet, SVgroupMsg* pVgroupInfo) {
static void tscSetDnodeEpSet(SRpcEpSet* pEpSet, SVgroupMsg* pVgroupInfo) {
assert(pEpSet != NULL && pVgroupInfo != NULL && pVgroupInfo->numOfEps > 0);
// Issue the query to one of the vnode among a vgroup randomly.
......@@ -130,7 +130,7 @@ void tscUpdateMgmtEpSet(SSqlObj *pSql, SRpcEpSet *pEpSet) {
taosCorEndWrite(&pCorEpSet->version);
}
static void tscDumpEpSetFromVgroupInfo(SRpcEpSet *pEpSet, SNewVgroupInfo *pVgroupInfo) {
void tscDumpEpSetFromVgroupInfo(SRpcEpSet *pEpSet, SNewVgroupInfo *pVgroupInfo) {
if (pVgroupInfo == NULL) { return;}
int8_t inUse = pVgroupInfo->inUse;
pEpSet->inUse = (inUse >= 0 && inUse < TSDB_MAX_REPLICA) ? inUse: 0;
......
......@@ -3332,6 +3332,9 @@ bool tscShouldBeFreed(SSqlObj* pSql) {
STableMetaInfo* tscGetTableMetaInfoFromCmd(SSqlCmd* pCmd, int32_t tableIndex) {
assert(pCmd != NULL);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
if(pQueryInfo == NULL) {
return NULL;
}
return tscGetMetaInfo(pQueryInfo, tableIndex);
}
......
......@@ -164,10 +164,19 @@ void dnodeFreeVWriteQueue(void *pWqueue) {
void* waitingResultThread(void* param) {
SVWriteMsg* pWrite = (SVWriteMsg* )param;
// wait AddWaitThread to list finished
dDebug(":SDEL pVnode:%p wait AddWaitThread finished... pWrite=%p", pWrite->pVnode, pWrite);
tsem_t* psem = vnodeSemWait(pWrite->pVnode);
tsem_wait(psem);
tsem_post(psem);
dDebug(":SDEL pVnode:%p wait AddWaitThread ok pWrite=%p", pWrite->pVnode, pWrite);
// wait request deal finished
int32_t ret = tsem_wait(pWrite->rspRet.psem);
dDebug(":SDEL pVnode:%p wait request ok pWrite=%p", pWrite->pVnode, pWrite);
if(ret == 0) {
// success
}
tsem_destroy(pWrite->rspRet.psem);
tfree(pWrite->rspRet.psem);
......@@ -196,7 +205,6 @@ void dnodeSendRpcVWriteRsp(void *pVnode, void *wparam, int32_t code) {
if (count <= 1) return;
if(pWrite->rspRet.psem == 0) {
SRpcMsg rpcRsp = {
.handle = pWrite->rpcMsg.handle,
.pCont = pWrite->rspRet.rsp,
......@@ -204,13 +212,26 @@ void dnodeSendRpcVWriteRsp(void *pVnode, void *wparam, int32_t code) {
.code = pWrite->code,
};
if(pWrite->rspRet.psem == 0) {
// no wait response
rpcSendResponse(&rpcRsp);
vnodeFreeFromWQueue(pVnode, pWrite);
} else {
if (vnodeWaitTooMany(pVnode)) {
// too many wait , so can not wait again
rpcRsp.code = TSDB_CODE_APP_NOT_READY;
rpcSendResponse(&rpcRsp);
vnodeFreeFromWQueue(pVnode, pWrite);
} else {
tsem_t* psem = vnodeSemWait(pVnode);
tsem_wait(psem);
// need async to wait result in another thread
pthread_t* thread = taosCreateThread(waitingResultThread, pWrite);
// add to wait thread manager
vnodeAddWait(pVnode, thread, pWrite->rspRet.psem, pWrite);
dDebug(":SDEL pVnode=%p vnode add wait %p ok, tsem_post.", pVnode, pWrite);
tsem_post(psem);
}
}
}
......
......@@ -249,6 +249,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_VND_NO_WRITE_AUTH TAOS_DEF_ERROR_CODE(0, 0x0512) //"Database write operation denied"
#define TSDB_CODE_VND_IS_SYNCING TAOS_DEF_ERROR_CODE(0, 0x0513) //"Database is syncing"
#define TSDB_CODE_VND_INVALID_TSDB_STATE TAOS_DEF_ERROR_CODE(0, 0x0514) //"Invalid tsdb state"
#define TSDB_CODE_WAIT_THREAD_TOO_MANY TAOS_DEF_ERROR_CODE(0, 0x0515) //"Wait threads too many"
// tsdb
#define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600) //"Invalid table ID")
......
......@@ -61,9 +61,9 @@ typedef struct {
typedef struct {
int32_t startTime;
pthread_t* pthread;
tsem_t* psem;
void* param;
pthread_t * pthread;
tsem_t * psem;
void * param;
} SWaitThread;
// vnodeStatus
......@@ -107,6 +107,9 @@ int32_t vnodeProcessRead(void *pVnode, SVReadMsg *pRead);
// wait thread
void vnodeAddWait(void* pVnode, pthread_t* pthread, tsem_t* psem, void* param);
void vnodeRemoveWait(void* pVnode, void* param);
// get wait thread count
bool vnodeWaitTooMany(void* vparam);
tsem_t* vnodeSemWait(void* vparam);
#ifdef __cplusplus
}
......
......@@ -2390,6 +2390,9 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
taosHashCleanup(pRuntimeEnv->pTableRetrieveTsMap);
pRuntimeEnv->pTableRetrieveTsMap = NULL;
taosHashCleanup(pRuntimeEnv->pTablesRead);
pRuntimeEnv->pTablesRead = NULL;
taosHashCleanup(pRuntimeEnv->pResultRowListSet);
pRuntimeEnv->pResultRowListSet = NULL;
......
......@@ -1212,6 +1212,9 @@ void SqlInfoDestroy(SSqlInfo *pInfo) {
taosArrayDestroy(&pInfo->funcs);
if (pInfo->type == TSDB_SQL_SELECT) {
destroyAllSqlNode(pInfo->list);
} else if (pInfo->type == TSDB_SQL_DELETE_DATA) {
tSqlExprDestroy(pInfo->pDelData->pWhere);
tfree(pInfo->pDelData);
} else if (pInfo->type == TSDB_SQL_CREATE_TABLE) {
pInfo->pCreateTableInfo = destroyCreateTableSql(pInfo->pCreateTableInfo);
} else if (pInfo->type == TSDB_SQL_ALTER_TABLE) {
......
......@@ -142,6 +142,7 @@ static int32_t tsRpcNum = 0;
#define RPC_CONN_UDPC 1
#define RPC_CONN_TCPS 2
#define RPC_CONN_TCPC 3
#define RPC_CONN_AUTO 4 // need tcp use tcp
void *(*taosInitConn[])(uint32_t ip, uint16_t port, char *label, int threads, void *fp, void *shandle) = {
taosInitUdpConnection,
......@@ -405,7 +406,7 @@ void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64
// connection type is application specific.
// for TDengine, all the query, show commands shall have TCP connection
char type = pMsg->msgType;
if (type == TSDB_MSG_TYPE_QUERY || type == TSDB_MSG_TYPE_CM_RETRIEVE
if (type == TSDB_MSG_TYPE_QUERY || type == TSDB_MSG_TYPE_CM_RETRIEVE || type == TSDB_MSG_TYPE_SUBMIT
|| type == TSDB_MSG_TYPE_FETCH || type == TSDB_MSG_TYPE_CM_STABLE_VGROUP
|| type == TSDB_MSG_TYPE_CM_TABLES_META || type == TSDB_MSG_TYPE_CM_TABLE_META
|| type == TSDB_MSG_TYPE_CM_SHOW || type == TSDB_MSG_TYPE_DM_STATUS || type == TSDB_MSG_TYPE_CM_ALTER_TABLE)
......
......@@ -58,11 +58,20 @@ static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) {
}
static FORCE_INLINE int TSDB_KEY_FID(TSKEY key, int32_t days, int8_t precision) {
int64_t fid;
if (key < 0) {
return (int)((key + 1) / tsTickPerDay[precision] / days - 1);
fid = ((key + 1) / tsTickPerDay[precision] / days - 1);
} else {
return (int)((key / tsTickPerDay[precision] / days));
fid = ((key / tsTickPerDay[precision] / days));
}
// check fid over int max or min, set with int max or min
if (fid > INT32_MAX) {
fid = INT32_MAX;
} else if(fid < INT32_MIN){
fid = INT32_MIN;
}
return (int)fid;
}
#endif /* _TD_TSDB_COMMIT_H_ */
\ No newline at end of file
......@@ -162,7 +162,7 @@ static FORCE_INLINE STSchema *tsdbGetTableTagSchema(STable *pTable) {
}
static FORCE_INLINE TSKEY tsdbGetTableLastKeyImpl(STable* pTable) {
ASSERT((pTable->lastRow == NULL) || (pTable->lastKey == memRowKey(pTable->lastRow)));
ASSERT((pTable->lastRow == NULL) || (pTable->lastKey == TSKEY_INITIAL_VAL) || (pTable->lastKey == memRowKey(pTable->lastRow)));
return pTable->lastKey;
}
......
......@@ -208,6 +208,7 @@ static int tsdbDeleteTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo, SArray
int sFid = TSDB_KEY_FID(win.skey, pCfg->daysPerFile, pCfg->precision);
int eFid = TSDB_KEY_FID(win.ekey, pCfg->daysPerFile, pCfg->precision);
if(sFid > eFid) {
tsdbError("vgId:%d :SDEL sFid > eFid no fid to delete. sFid=%d eFid=%d", REPO_ID(pRepo), sFid, eFid);
tsdbDestroyDeleteH(&deleteH);
return -1;
}
......@@ -357,10 +358,12 @@ static int tsdbInitDeleteH(SDeleteH *pdh, STsdbRepo *pRepo) {
tsdbFSIterInit(&(pdh->fsIter), REPO_FS(pRepo), TSDB_FS_ITER_FORWARD);
if (tsdbInitReadH(&(pdh->readh), pRepo) < 0) {
tsdbError("vgId:%d :SDEL tsdbInitReadH return -1. malloc memory failed.", REPO_ID(pRepo));
return -1;
}
if (tsdbInitDeleteTblArray(pdh) < 0) {
tsdbError("vgId:%d :SDEL tsdbInitDeleteTblArray return -1. maybe malloc memory failed or lock meta error.", REPO_ID(pRepo));
tsdbDestroyDeleteH(pdh);
return -1;
}
......@@ -666,6 +669,8 @@ static int tsdbModifyBlocks(SDeleteH *pdh, STableDeleteH *pItem) {
// get pSchema for del table
if ((pSchema = tsdbGetTableSchemaImpl(pItem->pTable, true, true, -1, -1)) == NULL) {
tsdbError("vgId:%d :SDEL tsdbGetTableSchemaImpl return NULL tid=%d. errno=%d (%s)", REPO_ID(pdh->pRepo),
pItem->pTable->tableId.tid, errno, tstrerror(terrno));
return -1;
}
......@@ -700,6 +705,7 @@ static int tsdbModifyBlocks(SDeleteH *pdh, STableDeleteH *pItem) {
// border block need load to delete no-use data
if (tsdbLoadBlockData(pReadh, pBlock, pItem->pInfo) < 0) {
tsdbError("vgId:%d :SDEL tsdbLoadBlockData return -1. i=%d. errno=%d (%s)", REPO_ID(pdh->pRepo), i, errno, tstrerror(terrno));
return -1;
}
......@@ -710,6 +716,7 @@ static int tsdbModifyBlocks(SDeleteH *pdh, STableDeleteH *pItem) {
SBlock newBlock = {0};
if (tsdbWriteBlockToFile(pdh, pItem->pTable, pdh->pDCols, ppBuf, ppCBuf, ppExBuf, &newBlock) < 0) {
tsdbError("vgId:%d :SDEL tsdbWriteBlockToFile return -1. i=%d. errno=%d (%s)", REPO_ID(pdh->pRepo), i, errno, tstrerror(terrno));
return -1;
}
......@@ -720,6 +727,8 @@ static int tsdbModifyBlocks(SDeleteH *pdh, STableDeleteH *pItem) {
// write block info for each table
if (tsdbWriteBlockInfoImpl(TSDB_DELETE_HEAD_FILE(pdh), pItem->pTable, pdh->aSupBlk, pdh->aSubBlk,
ppBuf, &blkIdx) < 0) {
tsdbError("vgId:%d :SDEL tsdbWriteBlockInfoImpl return -1. tid=%d. errno=%d (%s)", REPO_ID(pdh->pRepo),
pItem->pTable->tableId.tid, errno, tstrerror(terrno));
return -1;
}
......@@ -797,6 +806,7 @@ static int tsdbFSetDeleteImpl(SDeleteH *pdh) {
// 3.WRITE INDEX OF ALL TABLE'S BLOCK TO HEAD FILE
if (tsdbWriteBlockIdx(TSDB_DELETE_HEAD_FILE(pdh), pdh->aBlkIdx, ppBuf) < 0) {
tsdbError("vgId:%d :SDEL tsdbWriteBlockIdx return -1. errno=%d (%s)", REPO_ID(pdh->pRepo), terrno, tstrerror(terrno));
return -1;
}
......
......@@ -855,6 +855,8 @@ int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh, SBlockI
} else {
pTable->lastRow = lastRow;
}
} else {
taosTZfree(lastRow);
}
TSDB_WUNLOCK_TABLE(pTable);
......
......@@ -1165,12 +1165,16 @@ int32_t tsdbInsertControlData(STsdbRepo* pRepo, SSubmitBlk* pBlock, SShellSubmit
ret = tsdbQuerySTableByTagCond(pRepo, pBlock->uid, pCtlData->win.skey, pCtlData->tagCond, pCtlData->tagCondLen, &tableGroupInfo, NULL, 0);
if (ret != TSDB_CODE_SUCCESS) {
tsdbError(":SDEL vgId:%d failed to get child tables id from stable with tag condition. uid=%" PRIu64, REPO_ID(pRepo), pBlock->uid);
if(tableGroupInfo.pGroupList)
tsdbDestroyTableGroup(&tableGroupInfo);
return ret;
}
tnum = tsdbTableGroupInfo(&tableGroupInfo, NULL);
if (tnum == 0) {
tsdbWarn(":SDEL vgId:%d super table no child tables after filter by tag. uid=%" PRIu64, REPO_ID(pRepo), pBlock->uid);
if(tableGroupInfo.pGroupList)
tsdbDestroyTableGroup(&tableGroupInfo);
return TSDB_CODE_SUCCESS;
}
} else {
......@@ -1178,11 +1182,13 @@ int32_t tsdbInsertControlData(STsdbRepo* pRepo, SSubmitBlk* pBlock, SShellSubmit
tnum = 1;
}
// INIT SEM FOR ASYNC WAIT COMMIT RESULT
if (ppSem) {
// if need response (pRsp not null) , malloc ppSem for async wait response
if (ppSem && pRsp) {
*ppSem = (tsem_t* )tmalloc(sizeof(tsem_t));
ret = tsem_init(*ppSem, 0, 0);
if(ret != 0) {
if(tableGroupInfo.pGroupList)
tsdbDestroyTableGroup(&tableGroupInfo);
return TAOS_SYSTEM_ERROR(ret);
}
}
......@@ -1220,5 +1226,8 @@ int32_t tsdbInsertControlData(STsdbRepo* pRepo, SSubmitBlk* pBlock, SShellSubmit
tfree(pNew);
}
if(tableGroupInfo.pGroupList)
tsdbDestroyTableGroup(&tableGroupInfo);
return ret;
}
\ No newline at end of file
......@@ -47,8 +47,6 @@ void *taosAllocateQitem(int size);
void taosFreeQitem(void *item);
int taosWriteQitem(taos_queue, int type, void *item);
int taosReadQitem(taos_queue, int *type, void **pitem);
// special type search Qitem
int taosSearchQitem(taos_queue, int type, void **pitem);
taos_qall taosAllocateQall();
void taosFreeQall(taos_qall);
......
......@@ -256,6 +256,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_SYNCED, "Database suspended")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, "Database write operation denied")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_SYNCING, "Database is syncing")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_TSDB_STATE, "Invalid tsdb state")
TAOS_DEFINE_ERROR(TSDB_CODE_WAIT_THREAD_TOO_MANY, "Wait threads too many")
// tsdb
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, "Invalid table ID")
......
......@@ -160,47 +160,6 @@ int taosReadQitem(taos_queue param, int *type, void **pitem) {
return code;
}
// search Qitem with type
int taosSearchQitem(taos_queue param, int type, void **pitem) {
STaosQueue *queue = (STaosQueue *)param;
STaosQnode *pNode = NULL;
STaosQnode *pPre = NULL;
int code = 0;
pthread_mutex_lock(&queue->mutex);
pNode = queue->head;
while (pNode) {
if(pNode->type == type) {
// found
*pitem = pNode->item;
if(pPre == NULL) {
queue->head = pNode->next;
} else {
pPre->next = pNode->next;
}
if (queue->head == NULL)
queue->tail = NULL;
// reduce number
queue->numOfItems--;
if (queue->qset) {
atomic_sub_fetch_32(&queue->qset->numOfItems, 1);
}
code = 1;
uDebug("item:%p is read out from queue:%p, type:%d items:%d", *pitem, queue, type, queue->numOfItems);
break;
}
// move next
pPre = pNode;
pNode = pNode->next;
}
pthread_mutex_unlock(&queue->mutex);
return code;
}
void *taosAllocateQall() {
void *p = calloc(sizeof(STaosQall), 1);
return p;
......
......@@ -76,6 +76,9 @@ typedef struct {
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
pthread_mutex_t statusMutex;
void * tqueue; // async threads queue
// thread for wait deal result to response client
SList * waitThreads;
tsem_t semWait;
} SVnodeObj;
#ifdef __cplusplus
......
......@@ -274,6 +274,9 @@ int32_t vnodeOpen(int32_t vgId) {
tsem_init(&pVnode->sem, 0, 0);
pthread_mutex_init(&pVnode->statusMutex, NULL);
vnodeSetInitStatus(pVnode);
// wait thread init
tsem_init(&pVnode->semWait, 0, 1);
pVnode->waitThreads = tdListNew(sizeof(SWaitThread));
tsdbIncCommitRef(pVnode->vgId);
......@@ -422,14 +425,29 @@ int32_t vnodeOpen(int32_t vgId) {
}
#define LOOP_CNT 10
void freeWaitThread(SVnodeObj* pVnode) {
void vnodeStopWaitingThread(SVnodeObj* pVnode) {
// check wait thread empty
int type = 0;
SWaitThread* pWaitThread = NULL;
while(taosReadQitem(pVnode->tqueue, &type, (void** )&pWaitThread) > 0) {
vDebug("vgId:%d :SDEL stop waiting thread count=%d", pVnode->vgId, listNEles(pVnode->waitThreads));
if(listNEles(pVnode->waitThreads) == 0) {
return;
}
vInfo("vgId:%d :SDEL stop waiting thread not zero. count=%d", pVnode->vgId, listNEles(pVnode->waitThreads));
// get lock
tsem_wait(&pVnode->semWait);
// loop stop
while (1) {
SListNode * pNode = tdListPopHead(pVnode->waitThreads);
if(pNode == NULL)
break;
// thread is running
pWaitThread = (SWaitThread *)pNode->data;
int32_t loop = LOOP_CNT;
while (taosThreadRunning(pWaitThread->pthread)) {
vInfo("vgId:%d :SDEL loop=%d thread runing post to quit. pthread=%p", pVnode->vgId, loop, pWaitThread->pthread);
// only post once
if(loop == LOOP_CNT)
tsem_post(pWaitThread->psem);
......@@ -441,6 +459,7 @@ void freeWaitThread(SVnodeObj* pVnode) {
// free all
if(loop == 0) {
vInfo("vgId:%d :SDEL force kill thread to quit. pthread=%p pWrite=%p", pVnode->vgId, pWaitThread->pthread, pWaitThread->param);
// thread not stop , so need kill
taosDestoryThread(pWaitThread->pthread);
// write msg need remove from queue
......@@ -448,13 +467,17 @@ void freeWaitThread(SVnodeObj* pVnode) {
if (pWrite)
vnodeFreeFromWQueue(pWrite->pVnode, pWrite);
} else {
vInfo("vgId:%d :SDEL quit thread ok. pthread=%p pWrite=%p", pVnode->vgId, pWaitThread->pthread, pWaitThread->param);
free(pWaitThread->pthread);
}
tsem_destroy(pWaitThread->psem);
taosFreeQitem(pWaitThread);
// free node
free(pNode);
}
taosCloseQueue(pVnode->tqueue);
// unlock
tsem_post(&pVnode->semWait);
}
int32_t vnodeClose(int32_t vgId) {
......@@ -482,6 +505,9 @@ void vnodeDestroy(SVnodeObj *pVnode) {
int32_t code = 0;
int32_t vgId = pVnode->vgId;
// stop wait thread if have
vnodeStopWaitingThread(pVnode);
if (pVnode->qMgmt) {
qCleanupQueryMgmt(pVnode->qMgmt);
pVnode->qMgmt = NULL;
......@@ -547,6 +573,8 @@ void vnodeDestroy(SVnodeObj *pVnode) {
dnodeSendStatusMsgToMnode();
}
pVnode->waitThreads = tdListFree(pVnode->waitThreads);
tsem_destroy(pVnode->semWait);
tsem_destroy(&pVnode->sem);
pthread_mutex_destroy(&pVnode->statusMutex);
free(pVnode);
......@@ -614,33 +642,58 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) {
// wait thread
void vnodeAddWait(void* vparam, pthread_t* pthread, tsem_t* psem, void* param) {
SVnodeObj* pVnode = (SVnodeObj* )vparam;
if(pVnode->tqueue == NULL) {
pVnode->tqueue = taosOpenQueue();
}
SWaitThread waitThread = {0};
SWaitThread* pWaitThread = (SWaitThread* )taosAllocateQitem(sizeof(SWaitThread));
pWaitThread->pthread = pthread;
pWaitThread->startTime = taosGetTimestampSec();
pWaitThread->psem = psem;
pWaitThread->param = param;
waitThread.pthread = pthread;
waitThread.startTime = taosGetTimestampSec();
waitThread.psem = psem;
waitThread.param = param;
int32_t crc = crc32c_sf(0, (crc_stream)param, sizeof(void* ));
taosWriteQitem(pVnode->tqueue, crc, pWaitThread);
// append
tdListAppend(pVnode->waitThreads, &waitThread);
vDebug("vgId:%d :SDEL add wait thread %p wait list count=%d ", pVnode->vgId, param, listNEles(pVnode->waitThreads));
}
// called in wait thread
void vnodeRemoveWait(void* vparam, void* param) {
SVnodeObj* pVnode = (SVnodeObj* )vparam;
int32_t crc = crc32c_sf(0, (crc_stream)param, sizeof(void* ));
SListIter iter = {0};
SWaitThread* pWaitThread = NULL;
taosSearchQitem(pVnode->tqueue, crc, (void** )&pWaitThread);
if (pWaitThread == NULL) {
// not found
return ;
}
tsem_wait(&pVnode->semWait);
tdListInitIter(pVnode->waitThreads, &iter, TD_LIST_FORWARD);
while (1) {
SListNode* pNode = tdListNext(&iter);
if (pNode == NULL)
break;
// free thread
SWaitThread * pWaitThread = (SWaitThread *)pNode->data;
if (pWaitThread->param == param) {
// found , free SWaitThread memeber
free(pWaitThread->pthread);
taosFreeQitem(pWaitThread);
tdListPopNode(pVnode->waitThreads, pNode);
vDebug("vgId:%d :SDEL removed wait thread %p wait list count=%d ", pVnode->vgId, param, listNEles(pVnode->waitThreads));
// free pListNode self
free(pNode);
break;
}
}
tsem_post(&pVnode->semWait);
}
// get wait thread count
bool vnodeWaitTooMany(void* vparam) {
SVnodeObj* pVnode = (SVnodeObj* )vparam;
int32_t count = listNEles(pVnode->waitThreads);
if( count > 32 ) {
vError("vgId:%d :SDEL wait threads too many. count=%d", pVnode->vgId, count);
return true;
}
return false;
}
tsem_t* vnodeSemWait(void* vparam) {
SVnodeObj* pVnode = (SVnodeObj* )vparam;
return &pVnode->semWait;
}
\ No newline at end of file
......@@ -63,6 +63,10 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara
SRspRet *pRspRet = NULL;
if (pWrite != NULL) pRspRet = &pWrite->rspRet;
// if wal and forward write , no need response
if( qtype == TAOS_QTYPE_WAL || qtype == TAOS_QTYPE_FWD) {
pRspRet = NULL;
}
if (vnodeProcessWriteMsgFp[pHead->msgType] == NULL) {
vError("vgId:%d, msg:%s not processed since no handle, qtype:%s hver:%" PRIu64, pVnode->vgId,
......@@ -107,7 +111,7 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara
code = walWrite(pVnode->wal, pHead);
}
if (code < 0) {
if (syncCode > 0) atomic_sub_fetch_32(&pWrite->processedCount, 1);
if (syncCode > 0 && pWrite) atomic_sub_fetch_32(&pWrite->processedCount, 1);
vError("vgId:%d, hver:%" PRIu64 " vver:%" PRIu64 " code:0x%x", pVnode->vgId, pHead->version, pVnode->version, code);
pHead->version = 0;
return code;
......@@ -118,7 +122,7 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara
// write data locally
code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, pRspRet);
if (code < 0) {
if (syncCode > 0) atomic_sub_fetch_32(&pWrite->processedCount, 1);
if (syncCode > 0 && pWrite) atomic_sub_fetch_32(&pWrite->processedCount, 1);
return code;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册