From b00fb8c10dc79c4551da669265c4b9bb4828833e Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 26 Nov 2020 00:00:20 +0000 Subject: [PATCH] reduce client mem cost --- src/client/inc/tsclient.h | 6 +- src/client/src/tscLocal.c | 7 +- src/client/src/tscServer.c | 224 ++++++++++++++++------------------- src/client/src/tscSql.c | 21 ++-- src/client/src/tscSub.c | 5 +- src/client/src/tscSubquery.c | 1 + src/client/src/tscSystem.c | 16 ++- src/client/src/tscUtil.c | 28 +++-- 8 files changed, 153 insertions(+), 155 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 4c85af4919..4a732fc11f 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -331,7 +331,7 @@ typedef struct STscObj { char superAuth : 1; uint32_t connId; uint64_t rid; // ref ID returned by taosAddRef - struct SSqlObj * pHb; + int64_t hbrid; struct SSqlObj * sqlList; struct SSqlStream *streamList; void* pDnodeConn; @@ -371,7 +371,7 @@ typedef struct SSqlObj { struct SSqlObj **pSubs; struct SSqlObj *prev, *next; - struct SSqlObj **self; + int64_t self; } SSqlObj; typedef struct SSqlStream { @@ -504,7 +504,7 @@ static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pField } extern SCacheObj* tscMetaCache; -extern SCacheObj* tscObjCache; +extern int tscObjRef; extern void * tscTmr; extern void * tscQhandle; extern int tscKeepConn[]; diff --git a/src/client/src/tscLocal.c b/src/client/src/tscLocal.c index 44dffab56f..6c125f76dc 100644 --- a/src/client/src/tscLocal.c +++ b/src/client/src/tscLocal.c @@ -814,8 +814,11 @@ static int32_t tscProcessClientVer(SSqlObj *pSql) { static int32_t tscProcessServStatus(SSqlObj *pSql) { STscObj* pObj = pSql->pTscObj; - if (pObj->pHb != NULL) { - if (pObj->pHb->res.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { + SSqlObj* pHb = (SSqlObj*)taosAcquireRef(tscObjRef, pObj->hbrid); + if (pHb != NULL) { + int32_t code = pHb->res.code; + taosReleaseRef(tscObjRef, pObj->hbrid); + if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { pSql->res.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; return pSql->res.code; } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index e97e95a317..80e05f1ba2 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -175,10 +175,10 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId)); } } else { - tscDebug("%p heartbeat failed, code:%s", pObj->pHb, tstrerror(code)); + tscDebug("%" PRId64 " heartbeat failed, code:%s", pObj->hbrid, tstrerror(code)); } - if (pObj->pHb != NULL) { + if (pObj->hbrid != 0) { int32_t waitingDuring = tsShellActivityTimer * 500; tscDebug("%p send heartbeat in %dms", pSql, waitingDuring); @@ -193,21 +193,11 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { STscObj *pObj = taosAcquireRef(tscRefId, rid); if (pObj == NULL) return; - SSqlObj* pHB = pObj->pHb; - - void** p = taosCacheAcquireByKey(tscObjCache, &pHB, sizeof(TSDB_CACHE_PTR_TYPE)); - if (p == NULL) { - tscWarn("%p HB object has been released already", pHB); - taosReleaseRef(tscRefId, pObj->rid); - return; - } - - assert(*pHB->self == pHB); - + SSqlObj* pHB = taosAcquireRef(tscObjRef, pObj->hbrid); + assert(pHB->self == pObj->hbrid); pHB->retry = 0; int32_t code = tscProcessSql(pHB); - taosCacheRelease(tscObjCache, (void**) &p, false); - + taosReleaseRef(tscObjRef, pObj->hbrid); if (code != TSDB_CODE_SUCCESS) { tscError("%p failed to sent HB to server, reason:%s", pHB, tstrerror(code)); } @@ -236,7 +226,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { .msgType = pSql->cmd.msgType, .pCont = pMsg, .contLen = pSql->cmd.payloadLen, - .ahandle = pSql, + .ahandle = (void *)pSql->self, .handle = NULL, .code = 0 }; @@ -251,26 +241,25 @@ int tscSendMsgToServer(SSqlObj *pSql) { void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { TSDB_CACHE_PTR_TYPE handle = (TSDB_CACHE_PTR_TYPE) rpcMsg->ahandle; - void** p = taosCacheAcquireByKey(tscObjCache, &handle, sizeof(TSDB_CACHE_PTR_TYPE)); - if (p == NULL) { + SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle); + if (pSql == NULL) { rpcFreeCont(rpcMsg->pCont); return; - } - - SSqlObj* pSql = *p; - assert(pSql != NULL); + } + assert(pSql->self == handle); STscObj *pObj = pSql->pTscObj; SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; - assert(*pSql->self == pSql); + assert(pSql->self == handle); pSql->rpcRid = -1; if (pObj->signature != pObj) { tscDebug("%p DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, pObj, pObj->signature); - taosCacheRelease(tscObjCache, (void**) &p, true); + taosRemoveRef(tscObjRef, pSql->self); + taosReleaseRef(tscObjRef, pSql->self); rpcFreeCont(rpcMsg->pCont); return; } @@ -280,10 +269,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { tscDebug("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p", pSql, pCmd->command, pQueryInfo->type, pObj, pObj->signature); - void** p1 = p; - taosCacheRelease(tscObjCache, (void**) &p1, false); - - taosCacheRelease(tscObjCache, (void**) &p, true); + taosRemoveRef(tscObjRef, pSql->self); + taosReleaseRef(tscObjRef, pSql->self); rpcFreeCont(rpcMsg->pCont); return; } @@ -326,7 +313,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { // if there is an error occurring, proceed to the following error handling procedure. if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { - taosCacheRelease(tscObjCache, (void**) &p, false); + taosReleaseRef(tscObjRef, pSql->self); rpcFreeCont(rpcMsg->pCont); return; } @@ -334,7 +321,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { } pRes->rspLen = 0; - + if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) { tscDebug("%p query is cancelled, code:%s", pSql, tstrerror(pRes->code)); } else { @@ -383,7 +370,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { tscDebug("%p SQL cmd:%s, code:%s rspLen:%d", pSql, sqlCmd[pCmd->command], tstrerror(pRes->code), pRes->rspLen); } } - + if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) { rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql); } @@ -394,11 +381,10 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { (*pSql->fp)(pSql->param, pSql, rpcMsg->code); } - void** p1 = p; - taosCacheRelease(tscObjCache, (void**) &p1, false); + taosReleaseRef(tscObjRef, pSql->self); if (shouldFree) { // in case of table-meta/vgrouplist query, automatically free it - taosCacheRelease(tscObjCache, (void **)&p, true); + taosRemoveRef(tscObjRef, pSql->self); tscDebug("%p sqlObj is automatically freed", pSql); } @@ -419,7 +405,7 @@ int doProcessSql(SSqlObj *pSql) { pCmd->command == TSDB_SQL_STABLEVGROUP) { pRes->code = tscBuildMsg[pCmd->command](pSql, NULL); } - + if (pRes->code != TSDB_CODE_SUCCESS) { tscQueueAsyncRes(pSql); return pRes->code; @@ -433,14 +419,14 @@ int doProcessSql(SSqlObj *pSql) { tscQueueAsyncRes(pSql); return code; } - + return TSDB_CODE_SUCCESS; } int tscProcessSql(SSqlObj *pSql) { char *name = NULL; SSqlCmd *pCmd = &pSql->cmd; - + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); STableMetaInfo *pTableMetaInfo = NULL; uint32_t type = 0; @@ -465,7 +451,7 @@ int tscProcessSql(SSqlObj *pSql) { } else { // local handler return (*tscProcessMsgRsp[pCmd->command])(pSql); } - + return doProcessSql(pSql); } @@ -478,7 +464,7 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) { // todo valid the vgroupId at the client side STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - + if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { int32_t vgIndex = pTableMetaInfo->vgroupIndex; if (pTableMetaInfo->pVgroupTables == NULL) { @@ -513,9 +499,9 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta; - + char* pMsg = pSql->cmd.payload; - + // NOTE: shell message size should not include SMsgDesc int32_t size = pSql->cmd.payloadLen - sizeof(SMsgDesc); int32_t vgId = pTableMeta->vgroupInfo.vgId; @@ -529,7 +515,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pShellMsg->header.vgId = htonl(vgId); pShellMsg->header.contLen = htonl(size); // the length not includes the size of SMsgDesc pShellMsg->length = pShellMsg->header.contLen; - + pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit); // number of tables to be inserted // pSql->cmd.payloadLen is set during copying data into payload @@ -570,7 +556,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) { } return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + exprSize + tsBufSize + - tableSerialize + 4096; + tableSerialize + 4096; } static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char *pMsg) { @@ -579,12 +565,12 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || pTableMetaInfo->pVgroupTables == NULL) { - + SVgroupInfo* pVgroupInfo = NULL; if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { int32_t index = pTableMetaInfo->vgroupIndex; assert(index >= 0); - + if (pTableMetaInfo->vgroupList->numOfVgroups > 0) { assert(index < pTableMetaInfo->vgroupList->numOfVgroups); pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[index]; @@ -618,14 +604,14 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char // set the vgroup info tscSetDnodeEpSet(pSql, &pTableIdList->vgInfo); pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId); - + int32_t numOfTables = (int32_t)taosArrayGetSize(pTableIdList->itemList); pQueryMsg->numOfTables = htonl(numOfTables); // set the number of tables - + // serialize each table id info for(int32_t i = 0; i < numOfTables; ++i) { STableIdInfo* pItem = taosArrayGet(pTableIdList->itemList, i); - + STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg; pTableIdInfo->tid = htonl(pItem->tid); pTableIdInfo->uid = htobe64(pItem->uid); @@ -633,10 +619,10 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char pMsg += sizeof(STableIdInfo); } } - + tscDebug("%p vgId:%d, query on table:%s, tid:%d, uid:%" PRIu64, pSql, htonl(pQueryMsg->head.vgId), pTableMetaInfo->name, pTableMeta->id.tid, pTableMeta->id.uid); - + return pMsg; } @@ -649,7 +635,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { tscError("%p failed to malloc for query msg", pSql); return TSDB_CODE_TSC_INVALID_SQL; // todo add test for this } - + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; @@ -661,12 +647,12 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { return TSDB_CODE_TSC_INVALID_SQL; } - + if (pQueryInfo->interval.interval < 0) { tscError("%p illegal value of aggregation time interval in query msg: %" PRId64, pSql, (int64_t)pQueryInfo->interval.interval); return TSDB_CODE_TSC_INVALID_SQL; } - + if (pQueryInfo->groupbyExpr.numOfGroupCols < 0) { tscError("%p illegal value of numOfGroupCols in query msg: %d", pSql, pQueryInfo->groupbyExpr.numOfGroupCols); return TSDB_CODE_TSC_INVALID_SQL; @@ -675,7 +661,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload; int32_t numOfTags = (int32_t)taosArrayGetSize(pTableMetaInfo->tagColList); - + if (pQueryInfo->order.order == TSDB_ORDER_ASC) { pQueryMsg->window.skey = htobe64(pQueryInfo->window.skey); pQueryMsg->window.ekey = htobe64(pQueryInfo->window.ekey); @@ -700,7 +686,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->numOfTags = htonl(numOfTags); pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType); pQueryMsg->queryType = htonl(pQueryInfo->type); - + size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo); pQueryMsg->numOfOutput = htons((int16_t)numOfOutput); @@ -708,7 +694,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { size_t numOfCols = taosArrayGetSize(pQueryInfo->colList); char *pMsg = (char *)(pQueryMsg->colList) + numOfCols * sizeof(SColumnInfo); SSchema *pSchema = tscGetTableSchema(pTableMeta); - + for (int32_t i = 0; i < numOfCols; ++i) { SColumn *pCol = taosArrayGetP(pQueryInfo->colList, i); SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex]; @@ -717,7 +703,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pColSchema->type > TSDB_DATA_TYPE_NCHAR) { tscError("%p tid:%d uid:%" PRIu64" id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s", pSql, pTableMeta->id.tid, pTableMeta->id.uid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex.columnIndex, - pColSchema->name); + pColSchema->name); return TSDB_CODE_TSC_INVALID_SQL; } @@ -787,10 +773,10 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pSqlFuncExpr = (SSqlFuncMsg *)pMsg; } - + // serialize the table info (sid, uid, tags) pMsg = doSerializeTableInfo(pQueryMsg, pSql, pMsg); - + SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr; if (pGroupbyExpr->numOfGroupCols > 0) { pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex); @@ -798,7 +784,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { for (int32_t j = 0; j < pGroupbyExpr->numOfGroupCols; ++j) { SColIndex* pCol = taosArrayGet(pGroupbyExpr->columnInfo, j); - + *((int16_t *)pMsg) = pCol->colId; pMsg += sizeof(pCol->colId); @@ -807,7 +793,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { *((int16_t *)pMsg) += pCol->flag; pMsg += sizeof(pCol->flag); - + memcpy(pMsg, pCol->name, tListLen(pCol->name)); pMsg += tListLen(pCol->name); } @@ -819,14 +805,14 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pMsg += sizeof(pQueryInfo->fillVal[0]); } } - + if (numOfTags != 0) { int32_t numOfColumns = tscGetNumOfColumns(pTableMeta); int32_t numOfTagColumns = tscGetNumOfTags(pTableMeta); int32_t total = numOfTagColumns + numOfColumns; - + pSchema = tscGetTableTagSchema(pTableMeta); - + for (int32_t i = 0; i < numOfTags; ++i) { SColumn *pCol = taosArrayGetP(pTableMetaInfo->tagColList, i); SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex]; @@ -834,19 +820,19 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { if ((pCol->colIndex.columnIndex >= numOfTagColumns || pCol->colIndex.columnIndex < -1) || (pColSchema->type < TSDB_DATA_TYPE_BOOL || pColSchema->type > TSDB_DATA_TYPE_NCHAR)) { tscError("%p tid:%d uid:%" PRIu64 " id:%s, tag index out of range, totalCols:%d, numOfTags:%d, index:%d, column name:%s", - pSql, pTableMeta->id.tid, pTableMeta->id.uid, pTableMetaInfo->name, total, numOfTagColumns, - pCol->colIndex.columnIndex, pColSchema->name); + pSql, pTableMeta->id.tid, pTableMeta->id.uid, pTableMetaInfo->name, total, numOfTagColumns, + pCol->colIndex.columnIndex, pColSchema->name); return TSDB_CODE_TSC_INVALID_SQL; } - + SColumnInfo* pTagCol = (SColumnInfo*) pMsg; - + pTagCol->colId = htons(pColSchema->colId); pTagCol->bytes = htons(pColSchema->bytes); pTagCol->type = htons(pColSchema->type); pTagCol->numOfFilters = 0; - + pMsg += sizeof(SColumnInfo); } } @@ -854,16 +840,16 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { // serialize tag column query condition if (pQueryInfo->tagCond.pCond != NULL && taosArrayGetSize(pQueryInfo->tagCond.pCond) > 0) { STagCond* pTagCond = &pQueryInfo->tagCond; - + SCond *pCond = tsGetSTableQueryCond(pTagCond, pTableMeta->id.uid); if (pCond != NULL && pCond->cond != NULL) { pQueryMsg->tagCondLen = htons(pCond->len); memcpy(pMsg, pCond->cond, pCond->len); - + pMsg += pCond->len; } } - + if (pQueryInfo->tagCond.tbnameCond.cond == NULL) { *pMsg = 0; pMsg++; @@ -895,7 +881,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { tscDebug("%p msg built success,len:%d bytes", pSql, msgLen); pCmd->payloadLen = msgLen; pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY; - + pQueryMsg->head.contLen = htonl(msgLen); assert(msgLen + minMsgSize() <= (int32_t)pCmd->allocSize); @@ -926,7 +912,7 @@ int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SCreateDnodeMsg *pCreate = (SCreateDnodeMsg *)pCmd->payload; strncpy(pCreate->ep, pInfo->pDCLInfo->a[0].z, pInfo->pDCLInfo->a[0].n); - + pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DNODE; return TSDB_CODE_SUCCESS; @@ -1287,14 +1273,14 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - + SAlterTableSQL *pAlterInfo = pInfo->pAlterInfo; int size = tscEstimateAlterTableMsgLength(pCmd); if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) { tscError("%p failed to malloc for alter table msg", pSql); return TSDB_CODE_TSC_OUT_OF_MEMORY; } - + SAlterTableMsg *pAlterTableMsg = (SAlterTableMsg *)pCmd->payload; tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pAlterTableMsg->db); @@ -1305,7 +1291,7 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SSchema *pSchema = pAlterTableMsg->schema; for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) { TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i); - + pSchema->type = pField->type; strcpy(pSchema->name, pField->name); pSchema->bytes = htons(pField->bytes); @@ -1330,7 +1316,7 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) { SSqlCmd* pCmd = &pSql->cmd; pCmd->msgType = TSDB_MSG_TYPE_UPDATE_TAG_VAL; - + SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) pCmd->payload; pCmd->payloadLen = htonl(pUpdateMsg->head.contLen); @@ -1425,7 +1411,7 @@ int tscProcessDescribeTableRsp(SSqlObj *pSql) { STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); - + int32_t numOfRes = tinfo.numOfColumns + tinfo.numOfTags; return tscLocalResultCommonBuilder(pSql, numOfRes); } @@ -1557,7 +1543,7 @@ int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { assert(pCmd->payloadLen + minMsgSize() <= pCmd->allocSize); tscDebug("%p build load multi-metermeta msg completed, numOfTables:%d, msg size:%d", pSql, pCmd->count, - pCmd->payloadLen); + pCmd->payloadLen); return pCmd->payloadLen; #endif @@ -1593,7 +1579,7 @@ int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SSqlCmd *pCmd = &pSql->cmd; - + char* pMsg = pCmd->payload; SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); @@ -1668,7 +1654,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { pMetaMsg->sversion = htons(pMetaMsg->sversion); pMetaMsg->tversion = htons(pMetaMsg->tversion); pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId); - + pMetaMsg->uid = htobe64(pMetaMsg->uid); pMetaMsg->contLen = htons(pMetaMsg->contLen); pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns); @@ -1676,7 +1662,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { if ((pMetaMsg->tableType != TSDB_SUPER_TABLE) && (pMetaMsg->tid <= 0 || pMetaMsg->vgroup.vgId < 2 || pMetaMsg->vgroup.numOfEps <= 0)) { tscError("invalid value in table numOfEps:%d, vgId:%d tid:%d, name:%s", pMetaMsg->vgroup.numOfEps, pMetaMsg->vgroup.vgId, - pMetaMsg->tid, pMetaMsg->tableId); + pMetaMsg->tid, pMetaMsg->tableId); return TSDB_CODE_TSC_INVALID_VALUE; } @@ -1711,7 +1697,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { size_t size = 0; STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size); - + // todo add one more function: taosAddDataIfNotExists(); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); assert(pTableMetaInfo->pTableMeta == NULL); @@ -1726,7 +1712,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { tscDebug("%p recv table meta, uid:%"PRId64 ", tid:%d, name:%s", pSql, pTableMeta->id.uid, pTableMeta->id.tid, pTableMetaInfo->name); free(pTableMeta); - + return TSDB_CODE_SUCCESS; } @@ -1829,19 +1815,19 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) { // pMeta->index = 0; // (void)taosCachePut(tscMetaCache, pMeta->tableId, (char *)pMeta, size, tsTableMetaKeepTimer); // } - } - - pSql->res.code = TSDB_CODE_SUCCESS; - pSql->res.numOfTotal = i; - tscDebug("%p load multi-metermeta resp from complete num:%d", pSql, pSql->res.numOfTotal); +} + +pSql->res.code = TSDB_CODE_SUCCESS; +pSql->res.numOfTotal = i; +tscDebug("%p load multi-metermeta resp from complete num:%d", pSql, pSql->res.numOfTotal); #endif - - return TSDB_CODE_SUCCESS; + +return TSDB_CODE_SUCCESS; } int tscProcessSTableVgroupRsp(SSqlObj *pSql) { SSqlRes* pRes = &pSql->res; - + // NOTE: the order of several table must be preserved. SSTableVgroupRspMsg *pStableVgroup = (SSTableVgroupRspMsg *)pRes->pRsp; pStableVgroup->numOfTables = htonl(pStableVgroup->numOfTables); @@ -1850,7 +1836,7 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) { // master sqlObj locates in param SSqlObj* parent = pSql->param; assert(parent != NULL); - + SSqlCmd* pCmd = &parent->cmd; for(int32_t i = 0; i < pStableVgroup->numOfTables; ++i) { STableMetaInfo *pInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i); @@ -1883,7 +1869,7 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) { pMsg += size; } - + return pSql->res.code; } @@ -1928,7 +1914,7 @@ int tscProcessShowRsp(SSqlObj *pSql) { size_t size = 0; STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size); - + pTableMetaInfo->pTableMeta = taosCachePut(tscMetaCache, key, strlen(key), (char *)pTableMeta, size, tsTableMetaKeepTimer * 1000); SSchema *pTableSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta); @@ -1936,33 +1922,33 @@ int tscProcessShowRsp(SSqlObj *pSql) { if (pQueryInfo->colList == NULL) { pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES); } - + SFieldInfo* pFieldInfo = &pQueryInfo->fieldsInfo; - + SColumnIndex index = {0}; pSchema = pMetaMsg->schema; - + for (int16_t i = 0; i < pMetaMsg->numOfColumns; ++i, ++pSchema) { index.columnIndex = i; tscColumnListInsert(pQueryInfo->colList, &index); - + TAOS_FIELD f = tscCreateField(pSchema->type, pSchema->name, pSchema->bytes); SInternalField* pInfo = tscFieldInfoAppend(pFieldInfo, &f); - + pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, - pTableSchema[i].type, pTableSchema[i].bytes, pTableSchema[i].bytes, false); + pTableSchema[i].type, pTableSchema[i].bytes, pTableSchema[i].bytes, false); } - + pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutput; tscFieldInfoUpdateOffset(pQueryInfo); - + tfree(pTableMeta); return 0; } // TODO multithread problem static void createHBObj(STscObj* pObj) { - if (pObj->pHb != NULL) { + if (pObj->hbrid != 0) { return; } @@ -1992,7 +1978,7 @@ static void createHBObj(STscObj* pObj) { registerSqlObj(pSql); tscDebug("%p HB is allocated, pObj:%p", pSql, pObj); - pObj->pHb = pSql; + pObj->hbrid = pSql->self; } int tscProcessConnectRsp(SSqlObj *pSql) { @@ -2007,7 +1993,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) { assert(len <= sizeof(pObj->db)); tstrncpy(pObj->db, temp, sizeof(pObj->db)); - + if (pConnect->epSet.numOfEps > 0) { tscEpSetHtons(&pConnect->epSet); tscUpdateMgmtEpSet(&pConnect->epSet); @@ -2125,18 +2111,18 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { pRes->useconds = htobe64(pRetrieve->useconds); pRes->completed = (pRetrieve->completed == 1); pRes->data = pRetrieve->data; - + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) { return pRes->code; } - + if (pSql->pSubscription != NULL) { int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput; - + TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, numOfCols - 1); int16_t offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1); - + char* p = pRes->data + (pField->bytes + offset) * pRes->numOfRows; int32_t numOfTables = htonl(*(int32_t*)p); @@ -2209,16 +2195,16 @@ int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) { if (pTableMetaInfo->pTableMeta != NULL) { taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), false); } - + pTableMetaInfo->pTableMeta = (STableMeta *)taosCacheAcquireByKey(tscMetaCache, pTableMetaInfo->name, strlen(pTableMetaInfo->name)); if (pTableMetaInfo->pTableMeta != NULL) { STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); tscDebug("%p retrieve table Meta from cache, the number of columns:%d, numOfTags:%d, %p", pSql, tinfo.numOfColumns, - tinfo.numOfTags, pTableMetaInfo->pTableMeta); + tinfo.numOfTags, pTableMetaInfo->pTableMeta); return TSDB_CODE_SUCCESS; } - + return getTableMetaFromMgmt(pSql, pTableMetaInfo); } @@ -2242,7 +2228,7 @@ int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) { STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; if (pTableMetaInfo->pTableMeta) { tscDebug("%p update table meta, old meta numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql, - tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->id.uid, pTableMeta); + tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->id.uid, pTableMeta); } taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), true); @@ -2257,7 +2243,7 @@ static bool allVgroupInfoRetrieved(SSqlCmd* pCmd, int32_t clauseIndex) { return false; } } - + // all super tables vgroupinfo are retrieved, no need to retrieve vgroup info anymore return true; } @@ -2265,7 +2251,7 @@ static bool allVgroupInfoRetrieved(SSqlCmd* pCmd, int32_t clauseIndex) { int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) { int code = TSDB_CODE_RPC_NETWORK_UNAVAIL; SSqlCmd *pCmd = &pSql->cmd; - + if (allVgroupInfoRetrieved(pCmd, clauseIndex)) { return TSDB_CODE_SUCCESS; } @@ -2282,7 +2268,7 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) { tscFreeSqlObj(pNew); return code; } - + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex); for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { STableMetaInfo *pMInfo = tscGetMetaInfo(pQueryInfo, i); @@ -2377,7 +2363,7 @@ void tscInitMsgsFp() { tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_TABLE] = tscProcessShowCreateRsp; tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_DATABASE] = tscProcessShowCreateRsp; - + tscKeepConn[TSDB_SQL_SHOW] = 1; tscKeepConn[TSDB_SQL_RETRIEVE] = 1; diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 5f8a2eb6b7..1514d75c41 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -277,8 +277,8 @@ void taos_close(TAOS *taos) { pObj->signature = NULL; taosTmrStopA(&(pObj->pTimer)); - SSqlObj* pHb = pObj->pHb; - if (pHb != NULL && atomic_val_compare_exchange_ptr(&pObj->pHb, pHb, 0) == pHb) { + SSqlObj* pHb = (SSqlObj*)taosAcquireRef(tscObjRef, pObj->hbrid); + if (pHb != NULL) { if (pHb->rpcRid > 0) { // wait for rsp from dnode rpcCancelRequest(pHb->rpcRid); pHb->rpcRid = -1; @@ -286,6 +286,7 @@ void taos_close(TAOS *taos) { tscDebug("%p HB is freed", pHb); taos_free_result(pHb); + taosReleaseRef(tscObjRef, pHb->self); } int32_t ref = T_REF_DEC(pObj); @@ -645,8 +646,7 @@ void taos_free_result(TAOS_RES *res) { bool freeNow = tscKillQueryInDnode(pSql); if (freeNow) { tscDebug("%p free sqlObj in cache", pSql); - SSqlObj** p = pSql->self; - taosCacheRelease(tscObjCache, (void**) &p, true); + taosReleaseRef(tscObjRef, pSql->self); } } @@ -738,15 +738,8 @@ static void tscKillSTableQuery(SSqlObj *pSql) { if (pSub == NULL) { continue; } - - void** p = taosCacheAcquireByKey(tscObjCache, &pSub, sizeof(TSDB_CACHE_PTR_TYPE)); - if (p == NULL) { - continue; - } - - SSqlObj* pSubObj = (SSqlObj*) (*p); - assert(pSubObj->self == (SSqlObj**) p); - + //SSqlObj* pHb = (SSqlObj*)taosAcquireRef(tscObjRef, pObj->hbrid); + SSqlObj *pSubObj = pSub; pSubObj->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; if (pSubObj->rpcRid > 0) { rpcCancelRequest(pSubObj->rpcRid); @@ -754,7 +747,7 @@ static void tscKillSTableQuery(SSqlObj *pSql) { } tscQueueAsyncRes(pSubObj); - taosCacheRelease(tscObjCache, (void**) &p, false); + taosReleaseRef(tscObjRef, pSubObj->self); } tscDebug("%p super table query cancelled", pSql); diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index 2c81bd7c7c..258db8832e 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -179,8 +179,9 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* fail: tscError("tscCreateSubscription failed at line %d, reason: %s", line, tstrerror(code)); if (pSql != NULL) { - if (pSql->self != NULL) { - taos_free_result(pSql); + if (pSql->self != 0) { + //taos_free_result(pSql); + taosReleaseRef(tscObjRef, pSql->self); } else { tscFreeSqlObj(pSql); } diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index eb32e2490a..7a5d4f8c27 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -2199,6 +2199,7 @@ int32_t tscHandleInsertRetry(SSqlObj* pSql) { STableDataBlocks* pTableDataBlock = taosArrayGetP(pCmd->pDataBlocks, pSupporter->index); int32_t code = tscCopyDataBlockToPayload(pSql, pTableDataBlock); + pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); if ((pRes->code = code)!= TSDB_CODE_SUCCESS) { tscQueueAsyncRes(pSql); diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 01046a2840..69753b9623 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -16,6 +16,7 @@ #include "os.h" #include "taosmsg.h" #include "tcache.h" +#include "tref.h" #include "trpc.h" #include "tsystem.h" #include "ttimer.h" @@ -31,7 +32,7 @@ // global, not configurable SCacheObj* tscMetaCache; -SCacheObj* tscObjCache; +int tscObjRef = -1; void * tscTmr; void * tscQhandle; void * tscCheckDiskUsageTmr; @@ -143,7 +144,12 @@ void taos_init_imp(void) { int64_t refreshTime = 10; // 10 seconds by default if (tscMetaCache == NULL) { tscMetaCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, tscFreeTableMetaHelper, "tableMeta"); + +#ifndef SQLOBJ_USE_CACHE + tscObjRef = taosOpenRef(4096, tscFreeRegisteredSqlObj); +#else tscObjCache = taosCacheInit(TSDB_CACHE_PTR_KEY, refreshTime / 2, false, tscFreeRegisteredSqlObj, "sqlObj"); +#endif } tscRefId = taosOpenRef(200, tscCloseTscObj); @@ -166,11 +172,11 @@ void taos_cleanup(void) { taosCacheCleanup(m); } - m = tscObjCache; - if (m != NULL && atomic_val_compare_exchange_ptr(&tscObjCache, m, 0) == m) { - taosCacheCleanup(m); + int refId = atomic_exchange_32(&tscObjRef, -1); + if (refId != -1) { + taosCloseRef(refId); } - + m = tscQhandle; if (m != NULL && atomic_val_compare_exchange_ptr(&tscQhandle, m, 0) == m) { taosCleanUpScheduler(m); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index c452b51050..da89e6530d 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -364,18 +364,18 @@ static void tscFreeSubobj(SSqlObj* pSql) { void tscFreeRegisteredSqlObj(void *pSql) { assert(pSql != NULL); - SSqlObj** p = (SSqlObj**)pSql; - STscObj* pTscObj = (*p)->pTscObj; + SSqlObj* p = *(SSqlObj**)pSql; + STscObj* pTscObj = p->pTscObj; - assert((*p)->self != 0 && (*p)->self == (p)); - tscFreeSqlObj(*p); + assert(p->self != 0); + tscFreeSqlObj(p); int32_t ref = T_REF_DEC(pTscObj); assert(ref >= 0); - tscDebug("%p free sqlObj completed, tscObj:%p ref:%d", *p, pTscObj, ref); + tscDebug("%p free sqlObj completed, tscObj:%p ref:%d", p, pTscObj, ref); if (ref == 0) { - tscDebug("%p all sqlObj freed, free tscObj:%p", *p, pTscObj); + tscDebug("%p all sqlObj freed, free tscObj:%p", p, pTscObj); taosRemoveRef(tscRefId, pTscObj->rid); } } @@ -750,6 +750,10 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) { dataBuf->size += (finalLen + sizeof(SSubmitBlk)); assert(dataBuf->size <= dataBuf->nAllocSize); + char* p = realloc(dataBuf->pData, dataBuf->size); + if (p != NULL) { + dataBuf->pData = p; + } // the length does not include the SSubmitBlk structure pBlocks->dataLen = htonl(finalLen); @@ -1487,6 +1491,8 @@ void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SQueryInfo* pQueryInfo) { } } +#ifndef SQLOBJ_USE_CACHE +#else void tscSetFreeHeatBeat(STscObj* pObj) { if (pObj == NULL || pObj->signature != pObj || pObj->pHb == NULL) { return; @@ -1499,6 +1505,7 @@ void tscSetFreeHeatBeat(STscObj* pObj) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pHeatBeat->cmd, 0); pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE; } +#endif /* * the following four kinds of SqlObj should not be freed @@ -1518,7 +1525,7 @@ bool tscShouldBeFreed(SSqlObj* pSql) { } STscObj* pTscObj = pSql->pTscObj; - if (pSql->pStream != NULL || pTscObj->pHb == pSql || pSql->pSubscription != NULL) { + if (pSql->pStream != NULL || pTscObj->hbrid == pSql->self || pSql->pSubscription != NULL) { return false; } @@ -1809,13 +1816,14 @@ void tscResetForNextRetrieve(SSqlRes* pRes) { } void registerSqlObj(SSqlObj* pSql) { - int32_t DEFAULT_LIFE_TIME = 2 * 600 * 1000; // 1200 sec + //int32_t DEFAULT_LIFE_TIME = 2 * 600 * 1000; // 1200 sec int32_t ref = T_REF_INC(pSql->pTscObj); tscDebug("%p add to tscObj:%p, ref:%d", pSql, pSql->pTscObj, ref); - TSDB_CACHE_PTR_TYPE p = (TSDB_CACHE_PTR_TYPE) pSql; - pSql->self = taosCachePut(tscObjCache, &p, sizeof(TSDB_CACHE_PTR_TYPE), &p, sizeof(TSDB_CACHE_PTR_TYPE), DEFAULT_LIFE_TIME); + //TSDB_CACHE_PTR_TYPE p = (TSDB_CACHE_PTR_TYPE) pSql; + //pSql->self = taosCachePut(tscObjCache, &p, sizeof(TSDB_CACHE_PTR_TYPE), &p, sizeof(TSDB_CACHE_PTR_TYPE), DEFAULT_LIFE_TIME); + pSql->self = taosAddRef(tscObjRef, pSql); } SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cmd) { -- GitLab