From 488addaf448881ed902bac6c58522604f7264682 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 30 May 2020 05:56:24 +0000 Subject: [PATCH] [TD-437] fix definite lost while process show msg in mnode --- src/mnode/inc/mnodeDef.h | 2 +- src/mnode/src/mnodeShow.c | 80 +++++++++++++++++++++------------------ 2 files changed, 44 insertions(+), 38 deletions(-) diff --git a/src/mnode/inc/mnodeDef.h b/src/mnode/inc/mnodeDef.h index aacf8f419f..594fd3fd50 100644 --- a/src/mnode/inc/mnodeDef.h +++ b/src/mnode/inc/mnodeDef.h @@ -220,6 +220,7 @@ typedef struct SAcctObj { typedef struct { int8_t type; + int32_t index; char db[TSDB_DB_NAME_LEN + 1]; void * pIter; int16_t numOfColumns; @@ -228,7 +229,6 @@ typedef struct { int32_t numOfReads; int16_t offset[TSDB_MAX_COLUMNS]; int16_t bytes[TSDB_MAX_COLUMNS]; - void * signature; uint16_t payloadLen; char payload[]; } SShowObj; diff --git a/src/mnode/src/mnodeShow.c b/src/mnode/src/mnodeShow.c index d28d0b5b30..d342ea2d65 100644 --- a/src/mnode/src/mnodeShow.c +++ b/src/mnode/src/mnodeShow.c @@ -47,13 +47,14 @@ static int32_t mnodeProcessConnectMsg(SMnodeMsg *mnodeMsg); static int32_t mnodeProcessUseMsg(SMnodeMsg *mnodeMsg); static void mnodeFreeShowObj(void *data); -static bool mnodeCheckShowObj(SShowObj *pShow); +static bool mnodeAccquireShowObj(SShowObj *pShow); static bool mnodeCheckShowFinished(SShowObj *pShow); -static void *mnodeSaveShowObj(SShowObj *pShow, int32_t size); -static void mnodeCleanupShowObj(void *pShow, bool forceRemove); +static void *mnodePutShowObj(SShowObj *pShow, int32_t size); +static void mnodeReleaseShowObj(void *pShow, bool forceRemove); extern void *tsMnodeTmr; -static void *tsQhandleCache = NULL; +static void *tsMnodeShowCache = NULL; +static int32_t tsShowObjIndex = 0; static SShowMetaFp tsMnodeShowMetaFp[TSDB_MGMT_TABLE_MAX] = {0}; static SShowRetrieveFp tsMnodeShowRetrieveFp[TSDB_MGMT_TABLE_MAX] = {0}; @@ -64,14 +65,15 @@ int32_t mnodeInitShow() { mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_CONNECT, mnodeProcessConnectMsg); mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_USE_DB, mnodeProcessUseMsg); - tsQhandleCache = taosCacheInitWithCb(tsMnodeTmr, 10, mnodeFreeShowObj); + tsMnodeShowCache = taosCacheInitWithCb(tsMnodeTmr, 10, mnodeFreeShowObj); return 0; } void mnodeCleanUpShow() { - if (tsQhandleCache != NULL) { - taosCacheCleanup(tsQhandleCache); - tsQhandleCache = NULL; + if (tsMnodeShowCache != NULL) { + mPrint("show cache is cleanup"); + taosCacheCleanup(tsMnodeShowCache); + tsMnodeShowCache = NULL; } } @@ -118,13 +120,12 @@ static int32_t mnodeProcessShowMsg(SMnodeMsg *pMsg) { int32_t showObjSize = sizeof(SShowObj) + htons(pShowMsg->payloadLen); SShowObj *pShow = (SShowObj *) calloc(1, showObjSize); - pShow->signature = pShow; pShow->type = pShowMsg->type; pShow->payloadLen = htons(pShowMsg->payloadLen); strcpy(pShow->db, pShowMsg->db); memcpy(pShow->payload, pShowMsg->payload, pShow->payloadLen); - pShow = mnodeSaveShowObj(pShow, showObjSize); + pShow = mnodePutShowObj(pShow, showObjSize); if (pShow == NULL) { return TSDB_CODE_SERV_OUT_OF_MEMORY; } @@ -132,21 +133,22 @@ static int32_t mnodeProcessShowMsg(SMnodeMsg *pMsg) { int32_t size = sizeof(SCMShowRsp) + sizeof(SSchema) * TSDB_MAX_COLUMNS + TSDB_EXTRA_PAYLOAD_SIZE; SCMShowRsp *pShowRsp = rpcMallocCont(size); if (pShowRsp == NULL) { - mnodeFreeShowObj(pShow); + mnodeReleaseShowObj(pShow, true); return TSDB_CODE_SERV_OUT_OF_MEMORY; } pShowRsp->qhandle = htobe64((uint64_t) pShow); - mTrace("show:%p, type:%s, start to get meta", pShow, mnodeGetShowType(pShowMsg->type)); + mTrace("%p, show type:%s, start to get meta", pShow, mnodeGetShowType(pShowMsg->type)); int32_t code = (*tsMnodeShowMetaFp[pShowMsg->type])(&pShowRsp->tableMeta, pShow, pMsg->rpcMsg.handle); if (code == 0) { pMsg->rpcRsp.rsp = pShowRsp; pMsg->rpcRsp.len = sizeof(SCMShowRsp) + sizeof(SSchema) * pShow->numOfColumns; + mnodeReleaseShowObj(pShow, false); return TSDB_CODE_SUCCESS; } else { - mError("show:%p, type:%s, failed to get meta, reason:%s", pShow, mnodeGetShowType(pShowMsg->type), tstrerror(code)); + mError("%p, show type:%s, failed to get meta, reason:%s", pShow, mnodeGetShowType(pShowMsg->type), tstrerror(code)); rpcFreeCont(pShowRsp); - mnodeCleanupShowObj(pShow, true); + mnodeReleaseShowObj(pShow, true); return code; } } @@ -159,22 +161,20 @@ static int32_t mnodeProcessRetrieveMsg(SMnodeMsg *pMsg) { pRetrieve->qhandle = htobe64(pRetrieve->qhandle); SShowObj *pShow = (SShowObj *)pRetrieve->qhandle; - mTrace("show:%p, type:%s, retrieve data", pShow, mnodeGetShowType(pShow->type)); + mTrace("%p, show type:%s, retrieve data", pShow, mnodeGetShowType(pShow->type)); /* * in case of server restart, apps may hold qhandle created by server before * restart, which is actually invalid, therefore, signature check is required. */ - if (!mnodeCheckShowObj(pShow)) { - mError("retrieve:%p, qhandle:%p is invalid", pRetrieve, pShow); + if (!mnodeAccquireShowObj(pShow)) { + mError("%p, show is invalid", pShow); return TSDB_CODE_INVALID_QHANDLE; } if (mnodeCheckShowFinished(pShow)) { - mTrace("retrieve:%p, qhandle:%p already read finished, numOfReads:%d numOfRows:%d", pRetrieve, pShow, pShow->numOfReads, pShow->numOfRows); + mTrace("%p, show is already read finished, numOfReads:%d numOfRows:%d", pShow, pShow->numOfReads, pShow->numOfRows); pShow->numOfReads = pShow->numOfRows; - //mnodeCleanupShowObj(pShow, true); - //return TSDB_CODE_SUCCESS; } if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) { @@ -200,7 +200,7 @@ static int32_t mnodeProcessRetrieveMsg(SMnodeMsg *pMsg) { if (rowsRead < 0) { rpcFreeCont(pRsp); - mnodeCleanupShowObj(pShow, false); + mnodeReleaseShowObj(pShow, false); assert(false); return TSDB_CODE_ACTION_IN_PROGRESS; } @@ -211,10 +211,11 @@ static int32_t mnodeProcessRetrieveMsg(SMnodeMsg *pMsg) { pMsg->rpcRsp.rsp = pRsp; pMsg->rpcRsp.len = size; - if (rowsToRead == 0) { - mnodeCleanupShowObj(pShow, true); + if (rowsToRead == 0 || (rowsRead == rowsToRead)) { + pRsp->completed = 1; + mnodeReleaseShowObj(pShow, true); } else { - mnodeCleanupShowObj(pShow, false); + mnodeReleaseShowObj(pShow, false); } return TSDB_CODE_SUCCESS; @@ -318,24 +319,29 @@ static bool mnodeCheckShowFinished(SShowObj *pShow) { return false; } -static bool mnodeCheckShowObj(SShowObj *pShow) { - SShowObj *pSaved = taosCacheAcquireByData(tsQhandleCache, pShow); +static bool mnodeAccquireShowObj(SShowObj *pShow) { + char key[10]; + sprintf(key, "%d", pShow->index); + + SShowObj *pSaved = taosCacheAcquireByName(tsMnodeShowCache, key); if (pSaved == pShow) { + mTrace("%p, show is accquired from cache", pShow); return true; } else { - mTrace("show:%p, is already released", pShow); return false; } } -static void *mnodeSaveShowObj(SShowObj *pShow, int32_t size) { - if (tsQhandleCache != NULL) { - char key[24]; - sprintf(key, "show:%p", pShow); - SShowObj *newQhandle = taosCachePut(tsQhandleCache, key, pShow, size, 60); +static void *mnodePutShowObj(SShowObj *pShow, int32_t size) { + if (tsMnodeShowCache != NULL) { + char key[10]; + pShow->index = atomic_add_fetch_32(&tsShowObjIndex, 1); + sprintf(key, "%d", pShow->index); + + SShowObj *newQhandle = taosCachePut(tsMnodeShowCache, key, pShow, size, 60); free(pShow); - mTrace("show:%p, is saved", newQhandle); + mTrace("%p, show is put into cache", newQhandle); return newQhandle; } @@ -345,10 +351,10 @@ static void *mnodeSaveShowObj(SShowObj *pShow, int32_t size) { static void mnodeFreeShowObj(void *data) { SShowObj *pShow = data; sdbFreeIter(pShow->pIter); - mTrace("show:%p, is destroyed", pShow); + mTrace("%p, show is destroyed", pShow); } -static void mnodeCleanupShowObj(void *pShow, bool forceRemove) { - mTrace("show:%p, is released, force:%s", pShow, forceRemove ? "true" : "false"); - taosCacheRelease(tsQhandleCache, &pShow, forceRemove); +static void mnodeReleaseShowObj(void *pShow, bool forceRemove) { + mTrace("%p, show is released, force:%s", pShow, forceRemove ? "true" : "false"); + taosCacheRelease(tsMnodeShowCache, &pShow, forceRemove); } -- GitLab