From 3b2b281045543555f9f637138d797415d87a919b Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sun, 28 Jun 2020 09:23:23 +0800 Subject: [PATCH] [TD-771] refCount while process sdbqueue --- src/mnode/src/mnodeSdb.c | 35 ++++++++++++++++++++++++++--------- src/mnode/src/mnodeTable.c | 2 +- tests/script/loop.sh | 9 +++++++-- 3 files changed, 34 insertions(+), 12 deletions(-) diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index a9d450aff0..a6d373de99 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -551,16 +551,20 @@ static int sdbWrite(void *param, void *data, int type) { // from app, oper is created if (pOper != NULL) { - sdbTrace("record from app is disposed, version:%" PRIu64 " result:%s", pHead->version, tstrerror(code)); + sdbTrace("record from app is disposed, table:%s action:%s record:%s version:%" PRIu64 " result:%s", + pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version, + tstrerror(code)); return code; } - + // from wal or forward msg, oper not created, should add into hash if (tsSdbObj.sync != NULL) { - sdbTrace("record from wal forward is disposed, version:%" PRIu64 " confirm it", pHead->version); + sdbTrace("record from wal forward is disposed, table:%s action:%s record:%s version:%" PRIu64 " confirm it", + pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); syncConfirmForward(tsSdbObj.sync, pHead->version, code); } else { - sdbTrace("record from wal restore is disposed, version:%" PRIu64 , pHead->version); + sdbTrace("record from wal restore is disposed, table:%s action:%s record:%s version:%" PRIu64, pTable->tableName, + sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); } if (action == SDB_ACTION_INSERT) { @@ -626,9 +630,11 @@ int32_t sdbInsertRow(SSdbOper *pOper) { memcpy(pNewOper, pOper, sizeof(SSdbOper)); if (pNewOper->pMsg != NULL) { - sdbTrace("app:%p:%p, insert action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle, pNewOper->pMsg); + sdbTrace("app:%p:%p, table:%s record:%p:%s, insert action is add to sdb queue, ", pNewOper->pMsg->rpcMsg.ahandle, + pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj)); } + sdbIncRef(pNewOper->table, pNewOper->pObj); taosWriteQitem(tsSdbWriteQueue, TAOS_QTYPE_RPC, pNewOper); return TSDB_CODE_SUCCESS; } @@ -674,9 +680,11 @@ int32_t sdbDeleteRow(SSdbOper *pOper) { memcpy(pNewOper, pOper, sizeof(SSdbOper)); if (pNewOper->pMsg != NULL) { - sdbTrace("app:%p:%p, delete action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle, pNewOper->pMsg); + sdbTrace("app:%p:%p, table:%s record:%p:%s, delete action is add to sdb queue, ", pNewOper->pMsg->rpcMsg.ahandle, + pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj)); } + sdbIncRef(pNewOper->table, pNewOper->pObj); taosWriteQitem(tsSdbWriteQueue, TAOS_QTYPE_RPC, pNewOper); return TSDB_CODE_SUCCESS; } @@ -722,9 +730,11 @@ int32_t sdbUpdateRow(SSdbOper *pOper) { memcpy(pNewOper, pOper, sizeof(SSdbOper)); if (pNewOper->pMsg != NULL) { - sdbTrace("app:%p:%p, update action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle, pNewOper->pMsg); + sdbTrace("app:%p:%p, table:%s record:%p:%s, update action is add to sdb queue, ", pNewOper->pMsg->rpcMsg.ahandle, + pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj)); } + sdbIncRef(pNewOper->table, pNewOper->pObj); taosWriteQitem(tsSdbWriteQueue, TAOS_QTYPE_RPC, pNewOper); return TSDB_CODE_SUCCESS; } @@ -943,7 +953,9 @@ static void *sdbWorkerFp(void *param) { } if (pOper != NULL && pOper->pMsg != NULL) { - sdbTrace("app:%p:%p, will be processed in sdb queue", pOper->pMsg->rpcMsg.ahandle, pOper->pMsg); + sdbTrace("app:%p:%p, table:%s record:%p:%s version:%" PRIu64 ", will be processed in sdb queue", + pOper->pMsg->rpcMsg.ahandle, pOper->pMsg, ((SSdbTable *)pOper->table)->tableName, pOper->pObj, + sdbGetKeyStr(pOper->table, pOper->pObj), pHead->version); } int32_t code = sdbWrite(pOper, pHead, type); @@ -959,15 +971,20 @@ static void *sdbWorkerFp(void *param) { if (type == TAOS_QTYPE_RPC) { pOper = (SSdbOper *)item; if (pOper != NULL && pOper->cb != NULL) { + sdbTrace("app:%p:%p, will do callback func, index:%d", pOper->pMsg->rpcMsg.ahandle, pOper->pMsg, i); pOper->retCode = (*pOper->cb)(pOper->pMsg, pOper->retCode); } - + if (pOper != NULL && pOper->pMsg != NULL) { sdbTrace("app:%p:%p, msg is processed, result:%s", pOper->pMsg->rpcMsg.ahandle, pOper->pMsg, tstrerror(pOper->retCode)); } dnodeSendRpcMnodeWriteRsp(pOper->pMsg, pOper->retCode); + + if (pOper != NULL) { + sdbDecRef(pOper->table, pOper->pObj); + } } taosFreeQitem(item); } diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index 4906aeaeb0..da998a6578 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -1420,7 +1420,7 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) { continue; } if (pTable->vgHash == NULL) { - mError("app:%p:%p, stable:%s, not vgroup exist while get stable vgroup info", pMsg->rpcMsg.ahandle, pMsg, + mError("app:%p:%p, stable:%s, no vgroup exist while get stable vgroup info", pMsg->rpcMsg.ahandle, pMsg, stableName); mnodeDecTableRef(pTable); diff --git a/tests/script/loop.sh b/tests/script/loop.sh index 3e6306d624..b435eef123 100755 --- a/tests/script/loop.sh +++ b/tests/script/loop.sh @@ -11,8 +11,9 @@ set -e CMD_NAME= LOOP_TIMES=5 +SLEEP_TIME=0 -while getopts "f:t:" arg +while getopts "f:t:s:" arg do case $arg in f) @@ -21,6 +22,9 @@ do t) LOOP_TIMES=$OPTARG ;; + s) + SLEEP_TIME=$OPTARG + ;; ?) echo "unknow argument" ;; @@ -29,6 +33,7 @@ done echo LOOP_TIMES ${LOOP_TIMES} echo CMD_NAME ${CMD_NAME} +echo SLEEP_TIME ${SLEEP_TIME} GREEN='\033[1;32m' GREEN_DARK='\033[0;32m' @@ -40,5 +45,5 @@ do echo -e $GREEN loop $i $NC echo -e $GREEN cmd $CMD_NAME $NC $CMD_NAME - sleep 2 + sleep ${SLEEP_TIME} done -- GitLab