diff --git a/src/balance/src/bnThread.c b/src/balance/src/bnThread.c index c5dca2da8596dffbbee6417281a739cec7bba016..20da83ccba4c192a733cdbb530a2b6aab358896a 100644 --- a/src/balance/src/bnThread.c +++ b/src/balance/src/bnThread.c @@ -23,7 +23,7 @@ static SBnThread tsBnThread; static void *bnThreadFunc(void *arg) { - setThreadName("bnThreadd"); + setThreadName("balance"); while (1) { pthread_mutex_lock(&tsBnThread.mutex); diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 2af44db62b34cfcd1313966aabd36ced25a3ab6e..a7c2862f51984d6d9a0b17ea29b250f50ab8e684 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -355,6 +355,8 @@ char* strdup_throw(const char* str); bool vgroupInfoIdentical(SNewVgroupInfo *pExisted, SVgroupMsg* src); SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg); +void tscRemoveTableMetaBuf(STableMetaInfo* pTableMetaInfo, uint64_t id); + #ifdef __cplusplus } #endif diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 83ec28898cde23256f1048b1f2eba55b5f22fd33..904f5d4503321f36e8b44c93404a2a0a68843f22 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -151,7 +151,8 @@ typedef struct STableDataBlocks { typedef struct { STableMeta *pTableMeta; - SVgroupsInfo *pVgroupInfo; + SArray *vgroupIdList; +// SVgroupsInfo *pVgroupsInfo; } STableMetaVgroupInfo; typedef struct SInsertStatementParam { @@ -375,6 +376,8 @@ void tscResetSqlCmd(SSqlCmd *pCmd, bool removeMeta); */ void tscFreeSqlResult(SSqlObj *pSql); +void* tscCleanupTableMetaMap(SHashObj* pTableMetaMap); + /** * free sql object, release allocated resource * @param pObj @@ -415,7 +418,8 @@ int32_t tscValidateSqlInfo(SSqlObj *pSql, struct SSqlInfo *pInfo); int32_t tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows); extern int32_t sentinel; extern SHashObj *tscVgroupMap; -extern SHashObj *tscTableMetaInfo; +extern SHashObj *tscTableMetaMap; +extern SCacheObj *tscVgroupListBuf; extern int tscObjRef; extern void *tscTmr; diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index d857d00e159d06df266187693913a6b6e404a2b3..c8c9fe85e39161e02623561f401574eb38465e1e 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -325,61 +325,6 @@ void tscAsyncResultOnError(SSqlObj* pSql) { int tscSendMsgToServer(SSqlObj *pSql); -static int32_t updateMetaBeforeRetryQuery(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SQueryInfo* pQueryInfo) { - // handle the invalid table error code for super table. - // update the pExpr info, colList info, number of table columns - // TODO Re-parse this sql and issue the corresponding subquery as an alternative for this case. - if (pSql->retryReason == TSDB_CODE_TDB_INVALID_TABLE_ID) { - int32_t numOfExprs = (int32_t) tscNumOfExprs(pQueryInfo); - int32_t numOfCols = tscGetNumOfColumns(pTableMetaInfo->pTableMeta); - int32_t numOfTags = tscGetNumOfTags(pTableMetaInfo->pTableMeta); - - SSchema *pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta); - SSchema *pTagSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta); - - for (int32_t i = 0; i < numOfExprs; ++i) { - SSqlExpr *pExpr = &(tscExprGet(pQueryInfo, i)->base); - - // update the table uid - pExpr->uid = pTableMetaInfo->pTableMeta->id.uid; - - if (pExpr->colInfo.colIndex >= 0) { - int32_t index = pExpr->colInfo.colIndex; - - if ((TSDB_COL_IS_NORMAL_COL(pExpr->colInfo.flag) && index >= numOfCols) || - (TSDB_COL_IS_TAG(pExpr->colInfo.flag) && (index < 0 || index >= numOfTags))) { - return pSql->retryReason; - } - - if (TSDB_COL_IS_TAG(pExpr->colInfo.flag)) { - if ((pTagSchema[pExpr->colInfo.colIndex].colId != pExpr->colInfo.colId) && - strcasecmp(pExpr->colInfo.name, pTagSchema[pExpr->colInfo.colIndex].name) != 0) { - return pSql->retryReason; - } - } else if (TSDB_COL_IS_NORMAL_COL(pExpr->colInfo.flag)) { - if ((pSchema[pExpr->colInfo.colIndex].colId != pExpr->colInfo.colId) && - strcasecmp(pExpr->colInfo.name, pSchema[pExpr->colInfo.colIndex].name) != 0) { - return pSql->retryReason; - } - } else { // do nothing for udc - } - } - } - - // validate the table columns information - for (int32_t i = 0; i < taosArrayGetSize(pQueryInfo->colList); ++i) { - SColumn *pCol = taosArrayGetP(pQueryInfo->colList, i); - if (pCol->columnIndex >= numOfCols) { - return pSql->retryReason; - } - } - } else { - // do nothing - } - - return TSDB_CODE_SUCCESS; -} - void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, (int64_t)param); if (pSql == NULL) return; @@ -391,7 +336,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { pRes->code = code; SSqlObj *sub = (SSqlObj*) res; - const char* msg = (sub->cmd.command == TSDB_SQL_STABLEVGROUP)? "vgroup-list":"table-meta"; + const char* msg = (sub->cmd.command == TSDB_SQL_STABLEVGROUP)? "vgroup-list":"multi-tableMeta"; if (code != TSDB_CODE_SUCCESS) { tscError("0x%"PRIx64" get %s failed, code:%s", pSql->self, msg, tstrerror(code)); goto _error; @@ -401,85 +346,56 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { if (pSql->pStream == NULL) { SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd); - // check if it is a sub-query of super table query first, if true, enter another routine - if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY | TSDB_QUERY_TYPE_SUBQUERY | - TSDB_QUERY_TYPE_TAG_FILTER_QUERY))) { - tscDebug("0x%" PRIx64 " update cached table-meta, continue to process sql and send the corresponding query", pSql->self); - STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - - code = tscGetTableMeta(pSql, pTableMetaInfo); - assert(code == TSDB_CODE_TSC_ACTION_IN_PROGRESS || code == TSDB_CODE_SUCCESS); + if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT)) { + tscDebug("0x%" PRIx64 " continue parse sql after get table-meta", pSql->self); + code = tsParseSql(pSql, false); if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { taosReleaseRef(tscObjRef, pSql->self); return; - } - - assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0)); - code = updateMetaBeforeRetryQuery(pSql, pTableMetaInfo, pQueryInfo); - if (code != TSDB_CODE_SUCCESS) { + } else if (code != TSDB_CODE_SUCCESS) { goto _error; } - // tscBuildAndSendRequest can add error into async res - tscBuildAndSendRequest(pSql, NULL); - taosReleaseRef(tscObjRef, pSql->self); - return; - } else { // continue to process normal async query - if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT)) { - tscDebug("0x%" PRIx64 " continue parse sql after get table-meta", pSql->self); - - code = tsParseSql(pSql, false); + if (TSDB_QUERY_HAS_TYPE(pCmd->insertParam.insertType, TSDB_QUERY_TYPE_STMT_INSERT)) { // stmt insert + STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + code = tscGetTableMeta(pSql, pTableMetaInfo); if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { taosReleaseRef(tscObjRef, pSql->self); return; - } else if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - - if (TSDB_QUERY_HAS_TYPE(pCmd->insertParam.insertType, TSDB_QUERY_TYPE_STMT_INSERT)) { - STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - code = tscGetTableMeta(pSql, pTableMetaInfo); - if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { - taosReleaseRef(tscObjRef, pSql->self); - return; - } else { - assert(code == TSDB_CODE_SUCCESS); - } - - (*pSql->fp)(pSql->param, pSql, code); } else { - if (TSDB_QUERY_HAS_TYPE(pCmd->insertParam.insertType, TSDB_QUERY_TYPE_FILE_INSERT)) { - tscImportDataFromFile(pSql); - } else { - tscHandleMultivnodeInsert(pSql); - } - } - } else { - if (pSql->retryReason != TSDB_CODE_SUCCESS) { - tscDebug("0x%" PRIx64 " update cached table-meta, re-validate sql statement and send query again", - pSql->self); - tscResetSqlCmd(pCmd, false); - pSql->retryReason = TSDB_CODE_SUCCESS; - } else { - tscDebug("0x%" PRIx64 " cached table-meta, continue validate sql statement and send query", pSql->self); + assert(code == TSDB_CODE_SUCCESS); } - code = tsParseSql(pSql, true); - if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { - taosReleaseRef(tscObjRef, pSql->self); - return; - } else if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + (*pSql->fp)(pSql->param, pSql, code); + } else if (TSDB_QUERY_HAS_TYPE(pCmd->insertParam.insertType, TSDB_QUERY_TYPE_FILE_INSERT)) { // file insert + tscImportDataFromFile(pSql); + } else { // sql string insert + tscHandleMultivnodeInsert(pSql); + } + } else { + if (pSql->retryReason != TSDB_CODE_SUCCESS) { + tscDebug("0x%" PRIx64 " update cached table-meta, re-validate sql statement and send query again", pSql->self); + tscResetSqlCmd(pCmd, false); + pSql->retryReason = TSDB_CODE_SUCCESS; + } else { + tscDebug("0x%" PRIx64 " cached table-meta, continue validate sql statement and send query", pSql->self); + } - SQueryInfo *pQueryInfo1 = tscGetQueryInfo(pCmd); - executeQuery(pSql, pQueryInfo1); + code = tsParseSql(pSql, true); + if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { + taosReleaseRef(tscObjRef, pSql->self); + return; + } else if (code != TSDB_CODE_SUCCESS) { + goto _error; } - taosReleaseRef(tscObjRef, pSql->self); - return; + SQueryInfo *pQueryInfo1 = tscGetQueryInfo(pCmd); + executeQuery(pSql, pQueryInfo1); } + + taosReleaseRef(tscObjRef, pSql->self); + return; } else { // stream computing tscDebug("0x%"PRIx64" stream:%p meta is updated, start new query, command:%d", pSql->self, pSql->pStream, pCmd->command); diff --git a/src/client/src/tscLocal.c b/src/client/src/tscLocal.c index d1a325be3592a1789ac62661e61a84e6ccb969d3..ec7cb228ddbb66ddc88a75cb2150a771338e9ba5 100644 --- a/src/client/src/tscLocal.c +++ b/src/client/src/tscLocal.c @@ -920,7 +920,8 @@ int tscProcessLocalCmd(SSqlObj *pSql) { } else if (pCmd->command == TSDB_SQL_SHOW_CREATE_DATABASE) { pRes->code = tscProcessShowCreateDatabase(pSql); } else if (pCmd->command == TSDB_SQL_RESET_CACHE) { - taosHashClear(tscTableMetaInfo); + taosHashClear(tscTableMetaMap); + taosCacheEmpty(tscVgroupListBuf); pRes->code = TSDB_CODE_SUCCESS; } else if (pCmd->command == TSDB_SQL_SERV_VERSION) { pRes->code = tscProcessServerVer(pSql); diff --git a/src/client/src/tscParseLineProtocol.c b/src/client/src/tscParseLineProtocol.c index 26d52ad12f3d6f636e70a2c1b479dd8a484b9864..3613bad5347be64333903bce040927e85f5cf095 100644 --- a/src/client/src/tscParseLineProtocol.c +++ b/src/client/src/tscParseLineProtocol.c @@ -457,7 +457,7 @@ int32_t loadTableMeta(TAOS* taos, char* tableName, SSmlSTableSchema* schema, SSm uint32_t size = tscGetTableMetaMaxSize(); STableMeta* tableMeta = calloc(1, size); - taosHashGetClone(tscTableMetaInfo, fullTableName, strlen(fullTableName), NULL, tableMeta, -1); + taosHashGetClone(tscTableMetaMap, fullTableName, strlen(fullTableName), NULL, tableMeta); tstrncpy(schema->sTableName, tableName, strlen(tableName)+1); schema->precision = tableMeta->tableInfo.precision; diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 1c12f19834b4c012489a6722c58ae3d84714b404..ef4b295990409809f825525d56bf4ba416dff128 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -80,8 +80,8 @@ static void getColumnName(tSqlExprItem* pItem, char* resultFieldName, char* rawN static int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t colIndex, tSqlExprItem* pItem, bool finalResult, SUdfInfo* pUdfInfo); -static int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SColumnList* pIdList, int16_t bytes, - int8_t type, char* fieldName, SExprInfo* pSqlExpr); +static int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SColumnList* pColList, int16_t bytes, + int8_t type, char* fieldName, SExprInfo* pSqlExpr); static uint8_t convertRelationalOperator(SStrToken *pToken); @@ -7247,7 +7247,7 @@ void tscPrintSelNodeList(SSqlObj* pSql, int32_t subClauseIndex) { } tmpLen = - sprintf(tmpBuf, "%s(uid:%" PRId64 ", %d)", name, pExpr->base.uid, pExpr->base.colInfo.colId); + sprintf(tmpBuf, "%s(uid:%" PRIu64 ", %d)", name, pExpr->base.uid, pExpr->base.colInfo.colId); if (tmpLen + offset >= totalBufSize - 1) break; @@ -8123,6 +8123,7 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } } + pTableMeta = calloc(1, maxSize); plist = taosArrayInit(4, POINTER_BYTES); @@ -8138,9 +8139,13 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) { size_t len = strlen(name); memset(pTableMeta, 0, maxSize); - taosHashGetClone(tscTableMetaInfo, name, len, NULL, pTableMeta, -1); + taosHashGetClone(tscTableMetaMap, name, len, NULL, pTableMeta); if (pTableMeta->id.uid > 0) { + tscDebug("0x%"PRIx64" retrieve table meta %s from local buf", pSql->self, name); + + // avoid mem leak, may should update pTableMeta + void* pVgroupIdList = NULL; if (pTableMeta->tableType == TSDB_CHILD_TABLE) { code = tscCreateTableMetaFromSTableMeta(pTableMeta, name, pSql->pBuf); @@ -8152,23 +8157,34 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) { } } else if (pTableMeta->tableType == TSDB_SUPER_TABLE) { // the vgroup list of super table is not kept in local buffer, so here need retrieve it from the mnode each time - char* t = strdup(name); - taosArrayPush(pVgroupList, &t); - } + tscDebug("0x%"PRIx64" try to acquire cached super table %s vgroup id list", pSql->self, name); + void* pv = taosCacheAcquireByKey(tscVgroupListBuf, name, len); + if (pv == NULL) { + char* t = strdup(name); + taosArrayPush(pVgroupList, &t); + tscDebug("0x%"PRIx64" failed to retrieve stable %s vgroup id list in cache, try fetch from mnode", pSql->self, name); + } else { + tFilePage* pdata = (tFilePage*) pv; + pVgroupIdList = taosArrayInit((size_t) pdata->num, sizeof(int32_t)); + if (pVgroupIdList == NULL) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } - //STableMeta* pMeta = tscTableMetaDup(pTableMeta); - //STableMetaVgroupInfo p = { .pTableMeta = pMeta }; + taosArrayAddBatch(pVgroupIdList, pdata->data, (int32_t) pdata->num); + taosCacheRelease(tscVgroupListBuf, &pv, false); + } + } - //const char* px = tNameGetTableName(pname); - //taosHashPut(pCmd->pTableMetaMap, px, strlen(px), &p, sizeof(STableMetaVgroupInfo)); - // avoid mem leak, may should update pTableMeta - const char* px = tNameGetTableName(pname); - if (taosHashGet(pCmd->pTableMetaMap, px, strlen(px)) == NULL) { + if (taosHashGet(pCmd->pTableMetaMap, name, len) == NULL) { STableMeta* pMeta = tscTableMetaDup(pTableMeta); - STableMetaVgroupInfo p = { .pTableMeta = pMeta, .pVgroupInfo = NULL}; - taosHashPut(pCmd->pTableMetaMap, px, strlen(px), &p, sizeof(STableMetaVgroupInfo)); + STableMetaVgroupInfo tvi = { .pTableMeta = pMeta, .vgroupIdList = pVgroupIdList}; + taosHashPut(pCmd->pTableMetaMap, name, len, &tvi, sizeof(STableMetaVgroupInfo)); } - } else { // add to the retrieve table meta array list. + } else { + // Add to the retrieve table meta array list. + // If the tableMeta is missing, the cached vgroup list for the corresponding super table will be ignored. + tscDebug("0x%"PRIx64" failed to retrieve table meta %s from local buf", pSql->self, name); + char* t = strdup(name); taosArrayPush(plist, &t); } @@ -8282,22 +8298,44 @@ static int32_t doLoadAllTableMeta(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNod strncpy(pTableMetaInfo->aliasName, tNameGetTableName(&pTableMetaInfo->name), tListLen(pTableMetaInfo->aliasName)); } - const char* name = tNameGetTableName(&pTableMetaInfo->name); - STableMetaVgroupInfo* p = taosHashGet(pCmd->pTableMetaMap, name, strlen(name)); + char fname[TSDB_TABLE_FNAME_LEN] = {0}; + tNameExtractFullName(&pTableMetaInfo->name, fname); + STableMetaVgroupInfo* p = taosHashGet(pCmd->pTableMetaMap, fname, strnlen(fname, TSDB_TABLE_FNAME_LEN)); pTableMetaInfo->pTableMeta = tscTableMetaDup(p->pTableMeta); assert(pTableMetaInfo->pTableMeta != NULL); - if (p->pVgroupInfo != NULL) { - pTableMetaInfo->vgroupList = tscVgroupsInfoDup(p->pVgroupInfo); - } + if (p->vgroupIdList != NULL) { + size_t s = taosArrayGetSize(p->vgroupIdList); - if (code != TSDB_CODE_SUCCESS) { - return code; + size_t vgroupsz = sizeof(SVgroupInfo) * s + sizeof(SVgroupsInfo); + pTableMetaInfo->vgroupList = calloc(1, vgroupsz); + if (pTableMetaInfo->vgroupList == NULL) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + pTableMetaInfo->vgroupList->numOfVgroups = (int32_t) s; + for(int32_t j = 0; j < s; ++j) { + int32_t* id = taosArrayGet(p->vgroupIdList, j); + + // check if current buffer contains the vgroup info. If not, add it + SNewVgroupInfo existVgroupInfo = {.inUse = -1,}; + taosHashGetClone(tscVgroupMap, id, sizeof(*id), NULL, &existVgroupInfo); + + assert(existVgroupInfo.inUse >= 0); + SVgroupInfo *pVgroup = &pTableMetaInfo->vgroupList->vgroups[j]; + + pVgroup->numOfEps = existVgroupInfo.numOfEps; + pVgroup->vgId = existVgroupInfo.vgId; + for (int32_t k = 0; k < existVgroupInfo.numOfEps; ++k) { + pVgroup->epAddr[k].port = existVgroupInfo.ep[k].port; + pVgroup->epAddr[k].fqdn = strndup(existVgroupInfo.ep[k].fqdn, TSDB_FQDN_LEN); + } + } } } - return TSDB_CODE_SUCCESS; + return code; } static STableMeta* extractTempTableMetaFromSubquery(SQueryInfo* pUpstream) { diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 38c676f36f46250e156dd0edb4435881b1f295ae..eaf397529b73b6078dfd820ea0580d1711fba5e7 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -157,7 +157,7 @@ static void tscUpdateVgroupInfo(SSqlObj *pSql, SRpcEpSet *pEpSet) { assert(vgId > 0); SNewVgroupInfo vgroupInfo = {.vgId = -1}; - taosHashGetClone(tscVgroupMap, &vgId, sizeof(vgId), NULL, &vgroupInfo, sizeof(SNewVgroupInfo)); + taosHashGetClone(tscVgroupMap, &vgId, sizeof(vgId), NULL, &vgroupInfo); assert(vgroupInfo.numOfEps > 0 && vgroupInfo.vgId > 0); tscDebug("before: Endpoint in use:%d, numOfEps:%d", vgroupInfo.inUse, vgroupInfo.numOfEps); @@ -344,6 +344,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { rpcFreeCont(rpcMsg->pCont); return; } + assert(pSql->self == handle); STscObj *pObj = pSql->pTscObj; @@ -389,33 +390,40 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { pSql->cmd.insertParam.schemaAttached = 1; } + // single table query error need to be handled here. if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) && - (rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || - rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID || - rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || + (((rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || // change the retry procedure + rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID)) || + rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || // change the retry procedure rpcMsg->code == TSDB_CODE_APP_NOT_READY)) { - - pSql->retry++; - tscWarn("0x%"PRIx64" it shall renew table meta, code:%s, retry:%d", pSql->self, tstrerror(rpcMsg->code), pSql->retry); - - pSql->res.code = rpcMsg->code; // keep the previous error code - if (pSql->retry > pSql->maxRetry) { - tscError("0x%"PRIx64" max retry %d reached, give up", pSql->self, pSql->maxRetry); - } else { - // wait for a little bit moment and then retry - // todo do not sleep in rpc callback thread, add this process into queueu to process - if (rpcMsg->code == TSDB_CODE_APP_NOT_READY || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) { - int32_t duration = getWaitingTimeInterval(pSql->retry); - taosMsleep(duration); - } - pSql->retryReason = rpcMsg->code; - rpcMsg->code = tscRenewTableMeta(pSql, 0); - // if there is an error occurring, proceed to the following error handling procedure. - if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { - taosReleaseRef(tscObjRef, handle); - rpcFreeCont(rpcMsg->pCont); - return; + if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY | TSDB_QUERY_TYPE_SUBQUERY | + TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) && + !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_PROJECTION_QUERY)) { + // do nothing in case of super table subquery + } else { + pSql->retry += 1; + tscWarn("0x%" PRIx64 " it shall renew table meta, code:%s, retry:%d", pSql->self, tstrerror(rpcMsg->code), pSql->retry); + + pSql->res.code = rpcMsg->code; // keep the previous error code + if (pSql->retry > pSql->maxRetry) { + tscError("0x%" PRIx64 " max retry %d reached, give up", pSql->self, pSql->maxRetry); + } else { + // wait for a little bit moment and then retry + // todo do not sleep in rpc callback thread, add this process into queue to process + if (rpcMsg->code == TSDB_CODE_APP_NOT_READY || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) { + int32_t duration = getWaitingTimeInterval(pSql->retry); + taosMsleep(duration); + } + + pSql->retryReason = rpcMsg->code; + rpcMsg->code = tscRenewTableMeta(pSql, 0); + // if there is an error occurring, proceed to the following error handling procedure. + if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { + taosReleaseRef(tscObjRef, handle); + rpcFreeCont(rpcMsg->pCont); + return; + } } } } @@ -614,7 +622,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT; SNewVgroupInfo vgroupInfo = {0}; - taosHashGetClone(tscVgroupMap, &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo, sizeof(SNewVgroupInfo)); + taosHashGetClone(tscVgroupMap, &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo); tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo); tscDebug("0x%"PRIx64" submit msg built, numberOfEP:%d", pSql->self, pSql->epSet.numOfEps); @@ -687,7 +695,7 @@ static char *doSerializeTableInfo(SQueryTableMsg *pQueryMsg, SSqlObj *pSql, STab vgId = pTableMeta->vgId; SNewVgroupInfo vgroupInfo = {0}; - taosHashGetClone(tscVgroupMap, &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo, sizeof(SNewVgroupInfo)); + taosHashGetClone(tscVgroupMap, &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo); tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo); } @@ -1582,7 +1590,7 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) { STableMeta *pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta; SNewVgroupInfo vgroupInfo = {.vgId = -1}; - taosHashGetClone(tscVgroupMap, &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo, sizeof(SNewVgroupInfo)); + taosHashGetClone(tscVgroupMap, &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo); assert(vgroupInfo.vgId > 0); tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo); @@ -1809,34 +1817,6 @@ int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) { return TSDB_CODE_SUCCESS; } -int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { -#if 0 - SSqlCmd *pCmd = &pSql->cmd; - SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd); - - STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - STableInfoMsg *pInfoMsg = (STableInfoMsg *)pCmd->payload; - - int32_t code = tNameExtractFullName(&pTableMetaInfo->name, pInfoMsg->tableFname); - if (code != TSDB_CODE_SUCCESS) { - return TSDB_CODE_TSC_INVALID_OPERATION; - } - - pInfoMsg->createFlag = htons(pSql->cmd.autoCreated ? 1 : 0); - - char *pMsg = (char *)pInfoMsg + sizeof(STableInfoMsg); - - if (pCmd->autoCreated && pCmd->tagData.dataLen != 0) { - pMsg = serializeTagData(&pCmd->tagData, pMsg); - } - - pCmd->payloadLen = (int32_t)(pMsg - (char*)pInfoMsg); - pCmd->msgType = TSDB_MSG_TYPE_CM_TABLE_META; -#endif - - return TSDB_CODE_SUCCESS; -} - /** * multi table meta req pkg format: * |SMultiTableInfoMsg | tableId0 | tableId1 | tableId2 | ...... @@ -1996,20 +1976,17 @@ static int32_t tableMetaMsgConvert(STableMetaMsg* pMetaMsg) { } // update the vgroupInfo if needed -static void doUpdateVgroupInfo(STableMeta *pTableMeta, SVgroupMsg *pVgroupMsg) { - if (pTableMeta->vgId > 0) { - int32_t vgId = pTableMeta->vgId; - assert(pTableMeta->tableType != TSDB_SUPER_TABLE); - - SNewVgroupInfo vgroupInfo = {.inUse = -1}; - taosHashGetClone(tscVgroupMap, &vgId, sizeof(vgId), NULL, &vgroupInfo, sizeof(SNewVgroupInfo)); - - // vgroup info exists, compare with it - if (((vgroupInfo.inUse >= 0) && !vgroupInfoIdentical(&vgroupInfo, pVgroupMsg)) || (vgroupInfo.inUse < 0)) { - vgroupInfo = createNewVgroupInfo(pVgroupMsg); - taosHashPut(tscVgroupMap, &vgId, sizeof(vgId), &vgroupInfo, sizeof(vgroupInfo)); - tscDebug("add new VgroupInfo, vgId:%d, total cached:%d", vgId, (int32_t) taosHashGetSize(tscVgroupMap)); - } +static void doUpdateVgroupInfo(int32_t vgId, SVgroupMsg *pVgroupMsg) { + assert(vgId > 0); + + SNewVgroupInfo vgroupInfo = {.inUse = -1}; + taosHashGetClone(tscVgroupMap, &vgId, sizeof(vgId), NULL, &vgroupInfo); + + // vgroup info exists, compare with it + if (((vgroupInfo.inUse >= 0) && !vgroupInfoIdentical(&vgroupInfo, pVgroupMsg)) || (vgroupInfo.inUse < 0)) { + vgroupInfo = createNewVgroupInfo(pVgroupMsg); + taosHashPut(tscVgroupMap, &vgId, sizeof(vgId), &vgroupInfo, sizeof(vgroupInfo)); + tscDebug("add/update new VgroupInfo, vgId:%d, total cached:%d", vgId, (int32_t) taosHashGetSize(tscVgroupMap)); } } @@ -2022,18 +1999,18 @@ static void doAddTableMetaToLocalBuf(STableMeta* pTableMeta, STableMetaMsg* pMet if (updateSTable) { STableMeta* pSupTableMeta = createSuperTableMeta(pMetaMsg); uint32_t size = tscGetTableMetaSize(pSupTableMeta); - int32_t code = taosHashPut(tscTableMetaInfo, pTableMeta->sTableName, len, pSupTableMeta, size); + int32_t code = taosHashPut(tscTableMetaMap, pTableMeta->sTableName, len, pSupTableMeta, size); assert(code == TSDB_CODE_SUCCESS); tfree(pSupTableMeta); } CChildTableMeta* cMeta = tscCreateChildMeta(pTableMeta); - taosHashPut(tscTableMetaInfo, pMetaMsg->tableFname, strlen(pMetaMsg->tableFname), cMeta, sizeof(CChildTableMeta)); + taosHashPut(tscTableMetaMap, pMetaMsg->tableFname, strlen(pMetaMsg->tableFname), cMeta, sizeof(CChildTableMeta)); tfree(cMeta); } else { uint32_t s = tscGetTableMetaSize(pTableMeta); - taosHashPut(tscTableMetaInfo, pMetaMsg->tableFname, strlen(pMetaMsg->tableFname), pTableMeta, s); + taosHashPut(tscTableMetaMap, pMetaMsg->tableFname, strlen(pMetaMsg->tableFname), pTableMeta, s); } } @@ -2058,7 +2035,9 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { assert(strncmp(pMetaMsg->tableFname, name, tListLen(pMetaMsg->tableFname)) == 0); doAddTableMetaToLocalBuf(pTableMeta, pMetaMsg, true); - doUpdateVgroupInfo(pTableMeta, &pMetaMsg->vgroup); + if (pTableMeta->tableType != TSDB_SUPER_TABLE) { + doUpdateVgroupInfo(pTableMeta->vgId, &pMetaMsg->vgroup); + } tscDebug("0x%"PRIx64" recv table meta, uid:%" PRIu64 ", tid:%d, name:%s, numOfCols:%d, numOfTags:%d", pSql->self, pTableMeta->id.uid, pTableMeta->id.tid, tNameGetTableName(&pTableMetaInfo->name), pTableMeta->tableInfo.numOfColumns, @@ -2068,6 +2047,37 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { return TSDB_CODE_SUCCESS; } +static SArray* createVgroupIdListFromMsg(char* pMsg, SHashObj* pSet, char* name, int32_t* size, uint64_t id) { + SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)pMsg; + + pVgroupMsg->numOfVgroups = htonl(pVgroupMsg->numOfVgroups); + *size = (int32_t)(sizeof(SVgroupMsg) * pVgroupMsg->numOfVgroups + sizeof(SVgroupsMsg)); + + SArray* vgroupIdList = taosArrayInit(pVgroupMsg->numOfVgroups, sizeof(int32_t)); + + if (pVgroupMsg->numOfVgroups <= 0) { + tscDebug("0x%" PRIx64 " empty vgroup id list, no corresponding tables for stable:%s", id, name); + } else { + // just init, no need to lock + for (int32_t j = 0; j < pVgroupMsg->numOfVgroups; ++j) { + SVgroupMsg *vmsg = &pVgroupMsg->vgroups[j]; + vmsg->vgId = htonl(vmsg->vgId); + for (int32_t k = 0; k < vmsg->numOfEps; ++k) { + vmsg->epAddr[k].port = htons(vmsg->epAddr[k].port); + } + + taosArrayPush(vgroupIdList, &vmsg->vgId); + + if (taosHashGet(pSet, &vmsg->vgId, sizeof(vmsg->vgId)) == NULL) { + taosHashPut(pSet, &vmsg->vgId, sizeof(vmsg->vgId), "", 0); + doUpdateVgroupInfo(vmsg->vgId, vmsg); + } + } + } + + return vgroupIdList; +} + static SVgroupsInfo* createVgroupInfoFromMsg(char* pMsg, int32_t* size, uint64_t id) { SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)pMsg; pVgroupMsg->numOfVgroups = htonl(pVgroupMsg->numOfVgroups); @@ -2092,24 +2102,14 @@ static SVgroupsInfo* createVgroupInfoFromMsg(char* pMsg, int32_t* size, uint64_t vmsg->epAddr[k].port = htons(vmsg->epAddr[k].port); } - SNewVgroupInfo newVi = createNewVgroupInfo(vmsg); - pVgroup->numOfEps = newVi.numOfEps; - pVgroup->vgId = newVi.vgId; + pVgroup->numOfEps = vmsg->numOfEps; + pVgroup->vgId = vmsg->vgId; for (int32_t k = 0; k < vmsg->numOfEps; ++k) { - pVgroup->epAddr[k].port = newVi.ep[k].port; - pVgroup->epAddr[k].fqdn = strndup(newVi.ep[k].fqdn, TSDB_FQDN_LEN); + pVgroup->epAddr[k].port = vmsg->epAddr[k].port; + pVgroup->epAddr[k].fqdn = strndup(vmsg->epAddr[k].fqdn, TSDB_FQDN_LEN); } - // check if current buffer contains the vgroup info. - // If not, add it - SNewVgroupInfo existVgroupInfo = {.inUse = -1}; - taosHashGetClone(tscVgroupMap, &newVi.vgId, sizeof(newVi.vgId), NULL, &existVgroupInfo, sizeof(SNewVgroupInfo)); - - if (((existVgroupInfo.inUse >= 0) && !vgroupInfoIdentical(&existVgroupInfo, vmsg)) || - (existVgroupInfo.inUse < 0)) { // vgroup info exists, compare with it - taosHashPut(tscVgroupMap, &newVi.vgId, sizeof(newVi.vgId), &newVi, sizeof(newVi)); - tscDebug("0x%" PRIx64 " add new VgroupInfo, vgId:%d, total cached:%d", id, newVi.vgId, (int32_t)taosHashGetSize(tscVgroupMap)); - } + doUpdateVgroupInfo(pVgroup->vgId, vmsg); } } @@ -2187,6 +2187,8 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) { char* buf = NULL; char* pMsg = pMultiMeta->meta; + + // decompresss the message payload if (pMultiMeta->compressed) { buf = malloc(pMultiMeta->rawLen - sizeof(SMultiTableMeta)); int32_t len = tsDecompressString(pMultiMeta->meta, pMultiMeta->contLen - sizeof(SMultiTableMeta), 1, @@ -2219,15 +2221,13 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) { return TSDB_CODE_TSC_INVALID_VALUE; } - SName sn = {0}; - tNameFromString(&sn, pMetaMsg->tableFname, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); - if (pMultiMeta->metaClone == 1 || pTableMeta->tableType == TSDB_SUPER_TABLE) { STableMetaVgroupInfo p = {.pTableMeta = pTableMeta,}; + size_t keyLen = strnlen(pMetaMsg->tableFname, TSDB_TABLE_FNAME_LEN); + void* t = taosHashGet(pParentCmd->pTableMetaMap, pMetaMsg->tableFname, keyLen); + assert(t == NULL); - const char* tableName = tNameGetTableName(&sn); - size_t keyLen = strlen(tableName); - taosHashPut(pParentCmd->pTableMetaMap, tableName, keyLen, &p, sizeof(STableMetaVgroupInfo)); + taosHashPut(pParentCmd->pTableMetaMap, pMetaMsg->tableFname, keyLen, &p, sizeof(STableMetaVgroupInfo)); } else { freeMeta = true; } @@ -2245,7 +2245,7 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) { // for each vgroup, only update the information once. int64_t vgId = pMetaMsg->vgroup.vgId; if (pTableMeta->tableType != TSDB_SUPER_TABLE && taosHashGet(pSet, &vgId, sizeof(vgId)) == NULL) { - doUpdateVgroupInfo(pTableMeta, &pMetaMsg->vgroup); + doUpdateVgroupInfo((int32_t) vgId, &pMetaMsg->vgroup); taosHashPut(pSet, &vgId, sizeof(vgId), "", 0); } @@ -2256,18 +2256,33 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) { } for(int32_t i = 0; i < pMultiMeta->numOfVgroup; ++i) { - char* name = pMsg; - pMsg += TSDB_TABLE_NAME_LEN; + char fname[TSDB_TABLE_FNAME_LEN] = {0}; + tstrncpy(fname, pMsg, TSDB_TABLE_FNAME_LEN); + size_t len = strnlen(fname, TSDB_TABLE_FNAME_LEN); + + pMsg += TSDB_TABLE_FNAME_LEN; - STableMetaVgroupInfo* p = taosHashGet(pParentCmd->pTableMetaMap, name, strnlen(name, TSDB_TABLE_NAME_LEN)); + STableMetaVgroupInfo* p = taosHashGet(pParentCmd->pTableMetaMap, fname, len); assert(p != NULL); int32_t size = 0; - if (p->pVgroupInfo!= NULL) { - tscVgroupInfoClear(p->pVgroupInfo); - //tfree(p->pTableMeta); + if (p->vgroupIdList!= NULL) { + taosArrayDestroy(p->vgroupIdList); } - p->pVgroupInfo = createVgroupInfoFromMsg(pMsg, &size, pSql->self); + + p->vgroupIdList = createVgroupIdListFromMsg(pMsg, pSet, fname, &size, pSql->self); + + int32_t numOfVgId = (int32_t) taosArrayGetSize(p->vgroupIdList); + int32_t s = sizeof(tFilePage) + numOfVgId * sizeof(int32_t); + + tFilePage* idList = calloc(1, s); + idList->num = numOfVgId; + memcpy(idList->data, TARRAY_GET_START(p->vgroupIdList), numOfVgId * sizeof(int32_t)); + + void* idListInst = taosCachePut(tscVgroupListBuf, fname, len, idList, s, 5000); + taosCacheRelease(tscVgroupListBuf, (void*) &idListInst, false); + + tfree(idList); pMsg += size; } @@ -2332,14 +2347,18 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) { SSqlCmd* pCmd = &parent->cmd; SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd); + char fName[TSDB_TABLE_FNAME_LEN] = {0}; for(int32_t i = 0; i < pStableVgroup->numOfTables; ++i) { char* name = pMsg; - pMsg += TSDB_TABLE_NAME_LEN; + pMsg += TSDB_TABLE_FNAME_LEN; STableMetaInfo *pInfo = NULL; for(int32_t j = 0; j < pQueryInfo->numOfTables; ++j) { STableMetaInfo *pInfo1 = tscGetTableMetaInfoFromCmd(pCmd, j); - if (strcmp(name, tNameGetTableName(&pInfo1->name)) != 0) { + memset(fName, 0, tListLen(fName)); + + tNameExtractFullName(&pInfo1->name, fName); + if (strcmp(name, fName) != 0) { continue; } @@ -2499,24 +2518,20 @@ int tscProcessUseDbRsp(SSqlObj *pSql) { return ret; } +//todo only invalid the buffered data that belongs to dropped databases int tscProcessDropDbRsp(SSqlObj *pSql) { //TODO LOCK DB WHEN MODIFY IT //pSql->pTscObj->db[0] = 0; - taosHashClear(tscTableMetaInfo); + taosHashClear(tscTableMetaMap); + taosHashClear(tscVgroupMap); + taosCacheEmpty(tscVgroupListBuf); return 0; } int tscProcessDropTableRsp(SSqlObj *pSql) { STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0); - - //The cached tableMeta is expired in this case, so clean it in hash table - char name[TSDB_TABLE_FNAME_LEN] = {0}; - tNameExtractFullName(&pTableMetaInfo->name, name); - - taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); - tscDebug("0x%"PRIx64" remove table meta after drop table:%s, numOfRemain:%d", pSql->self, name, (int32_t) taosHashGetSize(tscTableMetaInfo)); - + tscRemoveTableMetaBuf(pTableMetaInfo, pSql->self); tfree(pTableMetaInfo->pTableMeta); return 0; } @@ -2530,11 +2545,11 @@ int tscProcessAlterTableMsgRsp(SSqlObj *pSql) { tscDebug("0x%"PRIx64" remove tableMeta in hashMap after alter-table: %s", pSql->self, name); bool isSuperTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo); - taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); + taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); tfree(pTableMetaInfo->pTableMeta); if (isSuperTable) { // if it is a super table, iterate the hashTable and remove all the childTableMeta - taosHashClear(tscTableMetaInfo); + taosHashClear(tscTableMetaMap); } return 0; @@ -2801,7 +2816,7 @@ int32_t tscGetTableMetaImpl(SSqlObj* pSql, STableMetaInfo *pTableMetaInfo, bool tNameExtractFullName(&pTableMetaInfo->name, name); size_t len = strlen(name); - taosHashGetClone(tscTableMetaInfo, name, len, NULL, pTableMetaInfo->pTableMeta, -1); + taosHashGetClone(tscTableMetaMap, name, len, NULL, pTableMetaInfo->pTableMeta); // TODO resize the tableMeta assert(size < 80 * TSDB_MAX_COLUMNS); @@ -2892,6 +2907,10 @@ int32_t tscGetUdfFromNode(SSqlObj *pSql, SQueryInfo* pQueryInfo) { return code; } +static void freeElem(void* p) { + tfree(*(char**)p); +} + /** * retrieve table meta from mnode, and then update the local table meta hashmap. * @param pSql sql object @@ -2899,7 +2918,7 @@ int32_t tscGetUdfFromNode(SSqlObj *pSql, SQueryInfo* pQueryInfo) { * @return status code */ int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) { - SSqlCmd *pCmd = &pSql->cmd; + SSqlCmd* pCmd = &pSql->cmd; SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex); @@ -2913,15 +2932,26 @@ int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) { STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; if (pTableMeta) { - tscDebug("0x%"PRIx64" update table meta:%s, old meta numOfTags:%d, numOfCols:%d, uid:%" PRId64, pSql->self, name, + tscDebug("0x%"PRIx64" update table meta:%s, old meta numOfTags:%d, numOfCols:%d, uid:%" PRIu64, pSql->self, name, tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->id.uid); } // remove stored tableMeta info in hash table - size_t len = strlen(name); - taosHashRemove(tscTableMetaInfo, name, len); + tscRemoveTableMetaBuf(pTableMetaInfo, pSql->self); + + pCmd->pTableMetaMap = tscCleanupTableMetaMap(pCmd->pTableMetaMap); + pCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + + SArray* pNameList = taosArrayInit(1, POINTER_BYTES); + SArray* vgroupList = taosArrayInit(1, POINTER_BYTES); - return getTableMetaFromMnode(pSql, pTableMetaInfo, false); + char* n = strdup(name); + taosArrayPush(pNameList, &n); + code = getMultiTableMetaFromMnode(pSql, pNameList, vgroupList, NULL, tscTableMetaCallBack, true); + taosArrayDestroyEx(pNameList, freeElem); + taosArrayDestroyEx(vgroupList, freeElem); + + return code; } static bool allVgroupInfoRetrieved(SQueryInfo* pQueryInfo) { @@ -2971,8 +3001,6 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, SQueryInfo* pQueryInfo) { tscDebug("0x%"PRIx64" svgroupRid from %" PRId64 " to %" PRId64 , pSql->self, pSql->svgroupRid, pNew->self); pSql->svgroupRid = pNew->self; - - tscDebug("0x%"PRIx64" new sqlObj:%p to get vgroupInfo, numOfTables:%d", pSql->self, pNew, pNewQueryInfo->numOfTables); pNew->fp = tscTableMetaCallBack; @@ -3015,7 +3043,6 @@ void tscInitMsgsFp() { tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg; tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg; -// tscBuildMsg[TSDB_SQL_META] = tscBuildTableMetaMsg; tscBuildMsg[TSDB_SQL_STABLEVGROUP] = tscBuildSTableVgroupMsg; tscBuildMsg[TSDB_SQL_RETRIEVE_FUNC] = tscBuildRetrieveFuncMsg; diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index da5bdf669fa41e4e44cd5551743c4feb77f447ac..502ef22d4bff87548dd02be7a521251f3224c4c2 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -206,7 +206,7 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf char name[TSDB_TABLE_FNAME_LEN] = {0}; tNameExtractFullName(&pTableMetaInfo->name, name); - taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); + taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); tfree(pTableMetaInfo->pTableMeta); diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 771e92a7e4af5d2c13a78c10ab26310fc330480d..af7a18ca7a18fdee5b315d6fce3416931f098305 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -2704,8 +2704,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO tstrerror(pParentSql->res.code)); // release allocated resource - tscDestroyGlobalMergerEnv(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, - pState->numOfSub); + tscDestroyGlobalMergerEnv(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, pState->numOfSub); tscFreeRetrieveSup(pSql); @@ -2713,7 +2712,35 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO SQueryInfo *pQueryInfo = tscGetQueryInfo(&pParentSql->cmd); if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) { - (*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.code); + + int32_t code = pParentSql->res.code; + if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && pParentSql->retry < pParentSql->maxRetry) { + // remove the cached tableMeta and vgroup id list, and then parse the sql again + STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pParentSql->cmd, 0); + tscRemoveTableMetaBuf(pTableMetaInfo, pParentSql->self); + + tscResetSqlCmd(&pParentSql->cmd, true); + pParentSql->res.code = TSDB_CODE_SUCCESS; + pParentSql->retry++; + + tscDebug("0x%"PRIx64" retry parse sql and send query, prev error: %s, retry:%d", pParentSql->self, + tstrerror(code), pParentSql->retry); + + code = tsParseSql(pParentSql, true); + if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { + return; + } + + if (code != TSDB_CODE_SUCCESS) { + pParentSql->res.code = code; + tscAsyncResultOnError(pParentSql); + return; + } + + executeQuery(pParentSql, pQueryInfo); + } else { + (*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.code); + } } else { // regular super table query if (pParentSql->res.code != TSDB_CODE_SUCCESS) { tscAsyncResultOnError(pParentSql); @@ -2996,7 +3023,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { if (taos_errno(pSql) != TSDB_CODE_SUCCESS) { assert(code == taos_errno(pSql)); - if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY) { + if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && (code != TSDB_CODE_TDB_INVALID_TABLE_ID)) { tscError("0x%"PRIx64" sub:0x%"PRIx64" failed code:%s, retry:%d", pParentSql->self, pSql->self, tstrerror(code), trsupport->numOfRetry); int32_t sent = 0; @@ -3005,7 +3032,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { return; } } else { - tscError("0x%"PRIx64" sub:0x%"PRIx64" reach the max retry times, set global code:%s", pParentSql->self, pSql->self, tstrerror(code)); + tscError("0x%"PRIx64" sub:0x%"PRIx64" reach the max retry times or no need to retry, set global code:%s", pParentSql->self, pSql->self, tstrerror(code)); atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, code); // set global code and abort } @@ -3125,12 +3152,10 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) for(int32_t i = 0; i < pParentObj->cmd.insertParam.numOfTables; ++i) { char name[TSDB_TABLE_FNAME_LEN] = {0}; tNameExtractFullName(pParentObj->cmd.insertParam.pTableNameList[i], name); - taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); + taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); } pParentObj->res.code = TSDB_CODE_SUCCESS; -// pParentObj->cmd.parseFinished = false; - tscResetSqlCmd(&pParentObj->cmd, false); // in case of insert, redo parsing the sql string and build new submit data block for two reasons: diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 7b8f24a093eb48f8a0f06b3b93f4fe1d33edcca5..c04765b0651f59066dd5897f2eaf0924b7113a21 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -19,15 +19,12 @@ #include "trpc.h" #include "tnote.h" #include "ttimer.h" -#include "tutil.h" #include "tsched.h" #include "tscLog.h" -#include "tscUtil.h" #include "tsclient.h" #include "tglobal.h" #include "tconfig.h" #include "ttimezone.h" -#include "tlocale.h" #include "qScript.h" // global, not configurable @@ -36,8 +33,10 @@ int32_t sentinel = TSC_VAR_NOT_RELEASE; -SHashObj *tscVgroupMap; // hash map to keep the global vgroup info -SHashObj *tscTableMetaInfo; // table meta info +SHashObj *tscVgroupMap; // hash map to keep the vgroup info from mnode +SHashObj *tscTableMetaMap; // table meta info buffer +SCacheObj *tscVgroupListBuf; // super table vgroup list information, only survives 5 seconds for each super table vgroup list + int32_t tscObjRef = -1; void *tscTmr; void *tscQhandle; @@ -45,17 +44,21 @@ int32_t tscRefId = -1; int32_t tscNumOfObj = 0; // number of sqlObj in current process. static void *tscCheckDiskUsageTmr; void *tscRpcCache; // cache to keep rpc obj -int32_t tscNumOfThreads = 1; // num of rpc threads -char tscLogFileName[12] = "taoslog"; -int tscLogFileNum = 10; -static pthread_mutex_t rpcObjMutex; // mutex to protect open the rpc obj concurrently -static pthread_once_t tscinit = PTHREAD_ONCE_INIT; +int32_t tscNumOfThreads = 1; // num of rpc threads +char tscLogFileName[12] = "taoslog"; +int tscLogFileNum = 10; + +static pthread_mutex_t rpcObjMutex; // mutex to protect open the rpc obj concurrently +static pthread_once_t tscinit = PTHREAD_ONCE_INIT; + +// pthread_once can not return result code, so result code is set to a global variable. static volatile int tscInitRes = 0; void tscCheckDiskUsage(void *UNUSED_PARAM(para), void *UNUSED_PARAM(param)) { taosGetDisk(); taosTmrReset(tscCheckDiskUsage, 20 * 1000, NULL, tscTmr, &tscCheckDiskUsageTmr); } + void tscFreeRpcObj(void *param) { assert(param); SRpcObj *pRpcObj = (SRpcObj *)(param); @@ -67,10 +70,9 @@ void tscReleaseRpc(void *param) { if (param == NULL) { return; } - pthread_mutex_lock(&rpcObjMutex); - taosCacheRelease(tscRpcCache, (void *)¶m, false); - pthread_mutex_unlock(&rpcObjMutex); -} + + taosCacheRelease(tscRpcCache, (void *)¶m, false); +} int32_t tscAcquireRpc(const char *key, const char *user, const char *secretEncrypt, void **ppRpcObj) { pthread_mutex_lock(&rpcObjMutex); @@ -80,7 +82,7 @@ int32_t tscAcquireRpc(const char *key, const char *user, const char *secretEncry *ppRpcObj = pRpcObj; pthread_mutex_unlock(&rpcObjMutex); return 0; - } + } SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); @@ -104,7 +106,8 @@ int32_t tscAcquireRpc(const char *key, const char *user, const char *secretEncry pthread_mutex_unlock(&rpcObjMutex); tscError("failed to init connection to TDengine"); return -1; - } + } + pRpcObj = taosCachePut(tscRpcCache, rpcObj.key, strlen(rpcObj.key), &rpcObj, sizeof(rpcObj), 1000*5); if (pRpcObj == NULL) { rpcClose(rpcObj.pDnodeConn); @@ -118,7 +121,7 @@ int32_t tscAcquireRpc(const char *key, const char *user, const char *secretEncry } void taos_init_imp(void) { - char temp[128] = {0}; + char temp[128] = {0}; errno = TSDB_CODE_SUCCESS; srand(taosGetTimestampSec()); @@ -151,36 +154,41 @@ void taos_init_imp(void) { rpcInit(); scriptEnvPoolInit(); + tscDebug("starting to initialize TAOS client ..."); tscDebug("Local End Point is:%s", tsLocalEp); } taosSetCoreDump(); tscInitMsgsFp(); - int queueSize = tsMaxConnections*2; double factor = (tscEmbedded == 0)? 2.0:4.0; tscNumOfThreads = (int)(tsNumOfCores * tsNumOfThreadsPerCore / factor); if (tscNumOfThreads < 2) { tscNumOfThreads = 2; } + + int32_t queueSize = tsMaxConnections*2; tscQhandle = taosInitScheduler(queueSize, tscNumOfThreads, "tsc"); if (NULL == tscQhandle) { - tscError("failed to init scheduler"); + tscError("failed to init task queue"); tscInitRes = -1; return; } + tscDebug("client task queue is initialized, numOfWorkers: %d", tscNumOfThreads); + tscTmr = taosTmrInit(tsMaxConnections * 2, 200, 60000, "TSC"); if(0 == tscEmbedded){ taosTmrReset(tscCheckDiskUsage, 20 * 1000, NULL, tscTmr, &tscCheckDiskUsageTmr); } - if (tscTableMetaInfo == NULL) { - tscObjRef = taosOpenRef(40960, tscFreeRegisteredSqlObj); - tscVgroupMap = taosHashInit(256, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); - tscTableMetaInfo = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); - tscDebug("TableMeta:%p", tscTableMetaInfo); + if (tscTableMetaMap == NULL) { + tscObjRef = taosOpenRef(40960, tscFreeRegisteredSqlObj); + tscVgroupMap = taosHashInit(256, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + tscTableMetaMap = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + tscVgroupListBuf = taosCacheInit(TSDB_DATA_TYPE_BINARY, 5, false, NULL, "stable-vgroup-list"); + tscDebug("TableMeta:%p, vgroup:%p is initialized", tscTableMetaMap, tscVgroupMap); } int refreshTime = 5; @@ -189,14 +197,17 @@ void taos_init_imp(void) { tscRefId = taosOpenRef(200, tscCloseTscObj); - // in other language APIs, taos_cleanup is not available yet. - // So, to make sure taos_cleanup will be invoked to clean up the allocated - // resource to suppress the valgrind warning. + // In the APIs of other program language, taos_cleanup is not available yet. + // So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning. atexit(taos_cleanup); + tscDebug("client is initialized successfully"); } -int taos_init() { pthread_once(&tscinit, taos_init_imp); return tscInitRes;} +int taos_init() { + pthread_once(&tscinit, taos_init_imp); + return tscInitRes; +} // this function may be called by user or system, or by both simultaneously. void taos_cleanup(void) { @@ -205,11 +216,13 @@ void taos_cleanup(void) { if (atomic_val_compare_exchange_32(&sentinel, TSC_VAR_NOT_RELEASE, TSC_VAR_RELEASED) != TSC_VAR_NOT_RELEASE) { return; } + if (tscEmbedded == 0) { scriptEnvPoolCleanup(); } - taosHashCleanup(tscTableMetaInfo); - tscTableMetaInfo = NULL; + + taosHashCleanup(tscTableMetaMap); + tscTableMetaMap = NULL; taosHashCleanup(tscVgroupMap); tscVgroupMap = NULL; @@ -236,6 +249,9 @@ void taos_cleanup(void) { pthread_mutex_destroy(&rpcObjMutex); } + taosCacheCleanup(tscVgroupListBuf); + tscVgroupListBuf = NULL; + if (tscEmbedded == 0) { rpcCleanup(); taosCloseLog(); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 34644dc5778f123f32754766002b18526b050260..9dc541b1a6efce68c4f27fc4cbaed7e8c5b542a8 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1388,7 +1388,7 @@ void tscResetSqlCmd(SSqlCmd* pCmd, bool clearCachedMeta) { if (pCmd->pTableMetaMap != NULL) { STableMetaVgroupInfo* p = taosHashIterate(pCmd->pTableMetaMap, NULL); while (p) { - tscVgroupInfoClear(p->pVgroupInfo); + taosArrayDestroy(p->vgroupIdList); tfree(p->pTableMeta); p = taosHashIterate(pCmd->pTableMetaMap, p); } @@ -1398,6 +1398,22 @@ void tscResetSqlCmd(SSqlCmd* pCmd, bool clearCachedMeta) { } } +void* tscCleanupTableMetaMap(SHashObj* pTableMetaMap) { + if (pTableMetaMap == NULL) { + return NULL; + } + + STableMetaVgroupInfo* p = taosHashIterate(pTableMetaMap, NULL); + while (p) { + taosArrayDestroy(p->vgroupIdList); + tfree(p->pTableMeta); + p = taosHashIterate(pTableMetaMap, p); + } + + taosHashCleanup(pTableMetaMap); + return NULL; +} + void tscFreeSqlResult(SSqlObj* pSql) { SSqlRes* pRes = &pSql->res; @@ -1522,7 +1538,7 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) { char name[TSDB_TABLE_FNAME_LEN] = {0}; tNameExtractFullName(&pDataBlock->tableName, name); - taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); + taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); } if (!pDataBlock->cloned) { @@ -3365,7 +3381,7 @@ void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, bool removeMeta) { if (removeMeta) { char name[TSDB_TABLE_FNAME_LEN] = {0}; tNameExtractFullName(&pTableMetaInfo->name, name); - taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); + taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); } tscFreeVgroupTableInfo(pTableMetaInfo->pVgroupTables); @@ -3481,11 +3497,9 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, __async_cb_func_t fp, void* param, in SSqlCmd* pCmd = &pNew->cmd; pCmd->command = cmd; + tsem_init(&pNew->rspSem, 0 ,0); + if (tscAddQueryInfo(pCmd) != TSDB_CODE_SUCCESS) { -#ifdef __APPLE__ - // to satisfy later tsem_destroy in taos_free_result - tsem_init(&pNew->rspSem, 0, 0); -#endif // __APPLE__ tscFreeSqlObj(pNew); return NULL; } @@ -4360,7 +4374,7 @@ int32_t tscCreateTableMetaFromSTableMeta(STableMeta* pChild, const char* name, v assert(pChild != NULL && buf != NULL); STableMeta* p = buf; - taosHashGetClone(tscTableMetaInfo, pChild->sTableName, strnlen(pChild->sTableName, TSDB_TABLE_FNAME_LEN), NULL, p, -1); + taosHashGetClone(tscTableMetaMap, pChild->sTableName, strnlen(pChild->sTableName, TSDB_TABLE_FNAME_LEN), NULL, p); // tableMeta exists, build child table meta according to the super table meta // the uid need to be checked in addition to the general name of the super table. @@ -4374,7 +4388,7 @@ int32_t tscCreateTableMetaFromSTableMeta(STableMeta* pChild, const char* name, v memcpy(pChild->schema, p->schema, sizeof(SSchema) *total); return TSDB_CODE_SUCCESS; } else { // super table has been removed, current tableMeta is also expired. remove it here - taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); + taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); return -1; } } @@ -4873,3 +4887,19 @@ SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg) { return info; } + +void tscRemoveTableMetaBuf(STableMetaInfo* pTableMetaInfo, uint64_t id) { + char fname[TSDB_TABLE_FNAME_LEN] = {0}; + tNameExtractFullName(&pTableMetaInfo->name, fname); + + int32_t len = (int32_t) strnlen(fname, TSDB_TABLE_FNAME_LEN); + if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { + void* pv = taosCacheAcquireByKey(tscVgroupListBuf, fname, len); + if (pv != NULL) { + taosCacheRelease(tscVgroupListBuf, &pv, true); + } + } + + taosHashRemove(tscTableMetaMap, fname, len); + tscDebug("0x%"PRIx64" remove table meta %s, numOfRemain:%d", id, fname, (int32_t) taosHashGetSize(tscTableMetaMap)); +} \ No newline at end of file diff --git a/src/common/inc/tcmdtype.h b/src/common/inc/tcmdtype.h index bd2c2e46f8546eb74bb32f1ef007aada14a397a0..918763ebb4b92399872f10c0dd632689eaa08d1b 100644 --- a/src/common/inc/tcmdtype.h +++ b/src/common/inc/tcmdtype.h @@ -88,10 +88,7 @@ enum { TSDB_DEFINE_SQL_TYPE( TSDB_SQL_SHOW_CREATE_STABLE, "show-create-stable") TSDB_DEFINE_SQL_TYPE( TSDB_SQL_SHOW_CREATE_DATABASE, "show-create-database") - /* - * build empty result instead of accessing dnode to fetch result - * reset the client cache - */ + // build empty result instead of accessing dnode to fetch result reset the client cache TSDB_DEFINE_SQL_TYPE( TSDB_SQL_RETRIEVE_EMPTY_RESULT, "retrieve-empty-result" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_RESET_CACHE, "reset-cache" ) diff --git a/src/dnode/src/dnodeTelemetry.c b/src/dnode/src/dnodeTelemetry.c index 59b66879d42e3133ec7143c95f31c9b5df8ddb60..22a6dc5b1993b6d15510b078ac4245909221ae78 100644 --- a/src/dnode/src/dnodeTelemetry.c +++ b/src/dnode/src/dnodeTelemetry.c @@ -245,7 +245,7 @@ static void* telemetryThread(void* param) { clock_gettime(CLOCK_REALTIME, &end); end.tv_sec += 300; // wait 5 minutes before send first report - setThreadName("telemetryThrd"); + setThreadName("telemetry"); while (!tsExit) { int r = 0; diff --git a/src/dnode/src/dnodeVRead.c b/src/dnode/src/dnodeVRead.c index e8003a8fe7996316d0b91689ea9738cbd24184b9..c404ab1a55c3788f5756c99f7914764e6e9af295 100644 --- a/src/dnode/src/dnodeVRead.c +++ b/src/dnode/src/dnodeVRead.c @@ -118,10 +118,11 @@ static void *dnodeProcessReadQueue(void *wparam) { SVReadMsg * pRead; int32_t qtype; void * pVnode; - char name[16]; - memset(name, 0, 16); - snprintf(name, 16, "%s-dnReadQ", pPool->name); + char* threadname = strcmp(pPool->name, "vquery") == 0? "dnodeQueryQ":"dnodeFetchQ"; + + char name[16] = {0}; + snprintf(name, tListLen(name), "%s", threadname); setThreadName(name); while (1) { diff --git a/src/dnode/src/dnodeVnodes.c b/src/dnode/src/dnodeVnodes.c index 8ea8e280de10f0657ad0b937fb78794175d8c20a..a5b0e9fe30e88f89af2e79af16602dac9500a305 100644 --- a/src/dnode/src/dnodeVnodes.c +++ b/src/dnode/src/dnodeVnodes.c @@ -90,7 +90,6 @@ static void *dnodeOpenVnode(void *param) { char stepDesc[TSDB_STEP_DESC_LEN] = {0}; dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum); - setThreadName("dnodeOpenVnode"); for (int32_t v = 0; v < pThread->vnodeNum; ++v) { diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 06b80eea4f9536ca20bb9183c3fae1bafb96fafa..fb5bbe6c2d2442376f8937820822f654e1b41163 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -809,7 +809,7 @@ typedef struct SMultiTableMeta { int32_t contLen; uint8_t compressed; // denote if compressed or not uint32_t rawLen; // size before compress - uint8_t metaClone; // make meta clone after retrieve meta from mnode + uint8_t metaClone; // make meta clone after retrieve meta from mnode char meta[]; } SMultiTableMeta; diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index 6e5cf14b965d166e381dd1d535668580e4041d9d..0bc114ffdfe8d59f4941536b56bd95be96a03d0b 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -1812,12 +1812,8 @@ static int32_t getVgroupInfoLength(SSTableVgroupMsg* pInfo, int32_t numOfTable) } static char* serializeVgroupInfo(SSTableObj *pTable, char* name, char* msg, SMnodeMsg* pMsgBody, void* handle) { - SName sn = {0}; - tNameFromString(&sn, name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); - const char* tableName = tNameGetTableName(&sn); - - strncpy(msg, tableName, TSDB_TABLE_NAME_LEN); - msg += TSDB_TABLE_NAME_LEN; + strncpy(msg, name, TSDB_TABLE_FNAME_LEN); + msg += TSDB_TABLE_FNAME_LEN; if (pTable->vgHash == NULL) { mDebug("msg:%p, app:%p stable:%s, no vgroup exist while get stable vgroup info", pMsgBody, handle, name); diff --git a/src/plugins/monitor/src/monMain.c b/src/plugins/monitor/src/monMain.c index 960a097f5d99bb430f888d01960a879e80456d31..6e583fe0dfd809bac8c0aabf56e48bb33bd910ce 100644 --- a/src/plugins/monitor/src/monMain.c +++ b/src/plugins/monitor/src/monMain.c @@ -114,7 +114,7 @@ int32_t monStartSystem() { static void *monThreadFunc(void *param) { monDebug("starting to initialize monitor module ..."); - setThreadName("monThrd"); + setThreadName("monitor"); while (1) { static int32_t accessTimes = 0; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 199599468f8ee112784b4ed809eddf1a5547ba86..66a7b66d1b028ff323673c960fbb5899e9c2b9da 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -6599,9 +6599,11 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) { return NULL; } + SDistinctOperatorInfo* pInfo = pOperator->info; SSDataBlock* pRes = pInfo->pRes; + pRes->info.rows = 0; SSDataBlock* pBlock = NULL; while(1) { diff --git a/src/query/src/qScript.c b/src/query/src/qScript.c index 261164a84c0b347adc36e3e2abaf2113d5564436..74ddf5f548613be725a96fe6d691d5fc0d28c805 100644 --- a/src/query/src/qScript.c +++ b/src/query/src/qScript.c @@ -342,6 +342,7 @@ int32_t scriptEnvPoolInit() { env->lua_state = createLuaEnv(); tdListAppend(pool->scriptEnvs, (void *)(&env)); } + pool->mSize = size; pool->cSize = size; return 0; diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index e9feeef9d339a5b1a96e41fd35fb6c62e13ed94b..0449ecac8b228662455930b8caf7ff2b5a2da7b2 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -529,10 +529,9 @@ static void *taosProcessTcpData(void *param) { SFdObj *pFdObj; struct epoll_event events[maxEvents]; SRecvInfo recvInfo; - char name[16]; - memset(name, 0, sizeof(name)); - snprintf(name, 16, "%s-tcpData", pThreadObj->label); + char name[16] = {0}; + snprintf(name, tListLen(name), "%s-tcp", pThreadObj->label); setThreadName(name); while (1) { diff --git a/src/rpc/test/rclient.c b/src/rpc/test/rclient.c index de30114bd1c7fa9687a6d75bca3d7158137e29e4..2f4433f1bb32e965de66a40d7d6ae36c6804a06c 100644 --- a/src/rpc/test/rclient.c +++ b/src/rpc/test/rclient.c @@ -48,8 +48,6 @@ static void *sendRequest(void *param) { SInfo *pInfo = (SInfo *)param; SRpcMsg rpcMsg = {0}; - setThreadName("sendCliReq"); - tDebug("thread:%d, start to send request", pInfo->index); while ( pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) { diff --git a/src/rpc/test/rsclient.c b/src/rpc/test/rsclient.c index 3e94a56efb3494ac5fe1942245abd0bad8815ee7..65170d4abb2745a469dfda3e4146c2ea85405b33 100644 --- a/src/rpc/test/rsclient.c +++ b/src/rpc/test/rsclient.c @@ -40,9 +40,7 @@ static int terror = 0; static void *sendRequest(void *param) { SInfo *pInfo = (SInfo *)param; SRpcMsg rpcMsg, rspMsg; - - setThreadName("sendSrvReq"); - + tDebug("thread:%d, start to send request", pInfo->index); while ( pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) { diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c index 89fdda0686ffc6d7d5b372def92e48c6cf06c2ab..c86ab8549974712658ad3d381c4141427c000762 100644 --- a/src/sync/src/syncRetrieve.c +++ b/src/sync/src/syncRetrieve.c @@ -415,7 +415,6 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) { } void *syncRetrieveData(void *param) { - setThreadName("syncRetrievData"); int64_t rid = (int64_t)param; SSyncPeer *pPeer = syncAcquirePeer(rid); if (pPeer == NULL) { diff --git a/src/sync/test/syncClient.c b/src/sync/test/syncClient.c index 303d2376effffa3a3b2dc01580352a43aeaac9d8..23ea54ee0c19b6ad2f93d7577d8d711874b10968 100644 --- a/src/sync/test/syncClient.c +++ b/src/sync/test/syncClient.c @@ -48,8 +48,6 @@ void *sendRequest(void *param) { SInfo * pInfo = (SInfo *)param; SRpcMsg rpcMsg = {0}; - setThreadName("sendCliReq"); - uDebug("thread:%d, start to send request", pInfo->index); while (pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) { diff --git a/src/sync/test/syncServer.c b/src/sync/test/syncServer.c index a3d06966488b95a561c9bc688b49f7ceceb87248..4598e16a9d05be29d11612755a079ce0a228a2ff 100644 --- a/src/sync/test/syncServer.c +++ b/src/sync/test/syncServer.c @@ -178,7 +178,7 @@ void *processWriteQueue(void *param) { int type; void *item; - setThreadName("writeQ"); + setThreadName("syncWrite"); while (1) { int ret = taosReadQitem(qhandle, &type, &item); diff --git a/src/util/inc/hash.h b/src/util/inc/hash.h index 616b844c1388575130a2b1c02033cfedb7ef9e57..a53aa602c1e309ef0c25e370ae084b5a33e4144c 100644 --- a/src/util/inc/hash.h +++ b/src/util/inc/hash.h @@ -123,10 +123,9 @@ void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen); * @param keyLen * @param fp * @param d - * @param dsize * @return */ -void* taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void* d, size_t dsize); +void* taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void* d); /** * remove item with the specified key diff --git a/src/util/src/hash.c b/src/util/src/hash.c index d7bee9b67cad8fe91a182d76a443c04fd82be44c..2e18f36a17f9b3112c4d1747afa37944a1e93d28 100644 --- a/src/util/src/hash.c +++ b/src/util/src/hash.c @@ -294,10 +294,10 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da } void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen) { - return taosHashGetClone(pHashObj, key, keyLen, NULL, NULL, 0); + return taosHashGetClone(pHashObj, key, keyLen, NULL, NULL); } -void* taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void* d, size_t dsize) { +void* taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void* d) { if (taosHashTableEmpty(pHashObj) || keyLen == 0 || key == NULL) { return NULL; } diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index 4aa5b4378f79eaac7c31465cb90880c282670bcc..69b3741e13c9e0b3ee00615a29851a3f690a1e84 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -132,11 +132,11 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNo return; } - pCacheObj->totalSize -= pNode->size; + atomic_sub_fetch_64(&pCacheObj->totalSize, pNode->size); int32_t size = (int32_t)taosHashGetSize(pCacheObj->pHashTable); assert(size > 0); - uDebug("cache:%s, key:%p, %p is destroyed from cache, size:%dbytes, totalNum:%d size:%" PRId64 "bytes", + uDebug("cache:%s, key:%p, %p is destroyed from cache, size:%dbytes, total num:%d size:%" PRId64 "bytes", pCacheObj->name, pNode->key, pNode->data, pNode->size, size - 1, pCacheObj->totalSize); if (pCacheObj->freeFp) { @@ -252,6 +252,7 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v pCacheObj->freeFp(p->data); } + atomic_sub_fetch_64(&pCacheObj->totalSize, p->size); tfree(p); } else { taosAddToTrashcan(pCacheObj, p); @@ -302,7 +303,7 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen } SCacheDataNode* ptNode = NULL; - taosHashGetClone(pCacheObj->pHashTable, key, keyLen, incRefFn, &ptNode, sizeof(void*)); + taosHashGetClone(pCacheObj->pHashTable, key, keyLen, incRefFn, &ptNode); void* pData = (ptNode != NULL)? ptNode->data:NULL; @@ -679,7 +680,7 @@ void* taosCacheTimedRefresh(void *handle) { assert(pCacheArrayList != NULL); uDebug("cache refresh thread starts"); - setThreadName("cacheTimedRefre"); + setThreadName("cacheRefresh"); const int32_t SLEEP_DURATION = 500; //500 ms int64_t count = 0; diff --git a/src/util/src/tlog.c b/src/util/src/tlog.c index 88f57e8ac24cd207fc44f581564f45a9c33c348e..1ce3eadf58432337511d0d600848ad334b96fc91 100644 --- a/src/util/src/tlog.c +++ b/src/util/src/tlog.c @@ -178,8 +178,6 @@ static void *taosThreadToOpenNewFile(void *param) { char keepName[LOG_FILE_NAME_LEN + 20]; sprintf(keepName, "%s.%d", tsLogObj.logName, tsLogObj.flag); - setThreadName("openNewFile"); - tsLogObj.flag ^= 1; tsLogObj.lines = 0; char name[LOG_FILE_NAME_LEN + 20]; @@ -689,12 +687,9 @@ static void taosWriteLog(SLogBuff *tLogBuff) { static void *taosAsyncOutputLog(void *param) { SLogBuff *tLogBuff = (SLogBuff *)param; - - setThreadName("asyncOutputLog"); + setThreadName("log"); while (1) { - //tsem_wait(&(tLogBuff->buffNotEmpty)); - taosMsleep(writeInterval); // Polling the buffer diff --git a/src/util/src/tsched.c b/src/util/src/tsched.c index 3d3dfd989926c4acf8b32d8c7df724cb2d1ac079..b86ebb38bcd6446b56357f9667636403e14d688c 100644 --- a/src/util/src/tsched.c +++ b/src/util/src/tsched.c @@ -122,7 +122,9 @@ void *taosProcessSchedQueue(void *scheduler) { SSchedQueue *pSched = (SSchedQueue *)scheduler; int ret = 0; - setThreadName("schedQ"); + char name[16] = {0}; + snprintf(name, tListLen(name), "%s-taskQ", pSched->label); + setThreadName(name); while (1) { if ((ret = tsem_wait(&pSched->fullSem)) != 0) { diff --git a/src/util/tests/trefTest.c b/src/util/tests/trefTest.c index fe3dcab201de3f5b5068c997f3759a5b8397b5b7..e01da070afd3333cf02c25b51d2e9711c1616fb0 100644 --- a/src/util/tests/trefTest.c +++ b/src/util/tests/trefTest.c @@ -35,8 +35,6 @@ void *addRef(void *param) { SRefSpace *pSpace = (SRefSpace *)param; int id; - setThreadName("addRef"); - for (int i=0; i < pSpace->steps; ++i) { printf("a"); id = random() % pSpace->refNum; @@ -54,8 +52,6 @@ void *removeRef(void *param) { SRefSpace *pSpace = (SRefSpace *)param; int id, code; - setThreadName("removeRef"); - for (int i=0; i < pSpace->steps; ++i) { printf("d"); id = random() % pSpace->refNum; @@ -74,8 +70,6 @@ void *acquireRelease(void *param) { SRefSpace *pSpace = (SRefSpace *)param; int id; - setThreadName("acquireRelease"); - for (int i=0; i < pSpace->steps; ++i) { printf("a"); @@ -97,8 +91,6 @@ void myfree(void *p) { void *openRefSpace(void *param) { SRefSpace *pSpace = (SRefSpace *)param; - setThreadName("openRefSpace"); - printf("c"); pSpace->rsetId = taosOpenRef(50, myfree); diff --git a/src/vnode/src/vnodeMgmt.c b/src/vnode/src/vnodeMgmt.c index 8b17d3a5f2b8871aa83d4daf81ff936773de736a..e14b5a385e3e0e8a3405b3a1dcdd830ef5f10126 100644 --- a/src/vnode/src/vnodeMgmt.c +++ b/src/vnode/src/vnodeMgmt.c @@ -93,7 +93,7 @@ static void vnodeIncRef(void *ptNode) { void *vnodeAcquire(int32_t vgId) { SVnodeObj *pVnode = NULL; if (tsVnodesHash != NULL) { - taosHashGetClone(tsVnodesHash, &vgId, sizeof(int32_t), vnodeIncRef, &pVnode, sizeof(void *)); + taosHashGetClone(tsVnodesHash, &vgId, sizeof(int32_t), vnodeIncRef, &pVnode); } if (pVnode == NULL) { diff --git a/src/vnode/src/vnodeWorker.c b/src/vnode/src/vnodeWorker.c index e94c99cbea99139a21fb7fb64c729a12d3091349..7fcc393746639777af20730f9daf8d7533c2b5e6 100644 --- a/src/vnode/src/vnodeWorker.c +++ b/src/vnode/src/vnodeWorker.c @@ -25,7 +25,7 @@ typedef enum { VNODE_WORKER_ACTION_CLEANUP, - VNODE_WORKER_ACTION_DESTROUY + VNODE_WORKER_ACTION_DESTROY } EVMWorkerAction; typedef struct { @@ -155,7 +155,7 @@ int32_t vnodeCleanupInMWorker(SVnodeObj *pVnode) { int32_t vnodeDestroyInMWorker(SVnodeObj *pVnode) { vTrace("vgId:%d, will destroy in vmworker", pVnode->vgId); - return vnodeWriteIntoMWorker(pVnode, VNODE_WORKER_ACTION_DESTROUY, NULL); + return vnodeWriteIntoMWorker(pVnode, VNODE_WORKER_ACTION_DESTROY, NULL); } static void vnodeFreeMWorkerMsg(SVMWorkerMsg *pMsg) { @@ -179,7 +179,7 @@ static void vnodeProcessMWorkerMsg(SVMWorkerMsg *pMsg) { case VNODE_WORKER_ACTION_CLEANUP: vnodeCleanUp(pMsg->pVnode); break; - case VNODE_WORKER_ACTION_DESTROUY: + case VNODE_WORKER_ACTION_DESTROY: vnodeDestroy(pMsg->pVnode); break; default: diff --git a/src/wal/src/walMgmt.c b/src/wal/src/walMgmt.c index 45f65b2c2fc9ae5412f471805a3244644e590638..05324d31eec56ee74b81c70dc451eadf83d518d2 100644 --- a/src/wal/src/walMgmt.c +++ b/src/wal/src/walMgmt.c @@ -192,7 +192,7 @@ static void walFsyncAll() { static void *walThreadFunc(void *param) { int stop = 0; - setThreadName("walThrd"); + setThreadName("wal"); while (1) { walUpdateSeq(); walFsyncAll(); diff --git a/tests/examples/c/apitest.c b/tests/examples/c/apitest.c index 50a62e550fda56d46fc99276a7be82128e21d046..01169715f3e8b5b9d6e212b4b317ecca5fa4dbcd 100644 --- a/tests/examples/c/apitest.c +++ b/tests/examples/c/apitest.c @@ -1030,8 +1030,8 @@ int main(int argc, char *argv[]) { printf("server info: %s\n", info); info = taos_get_client_info(taos); printf("client info: %s\n", info); - - printf("************ verify shemaless *************\n"); + + printf("************ verify schema-less *************\n"); verify_schema_less(taos); diff --git a/tests/pytest/insert/line_insert.py b/tests/pytest/insert/line_insert.py index ff3a32b0f79028ce4f612c12b41171a2bd45a765..53eaa55aa50a1369b4aff9c49421263788205038 100644 --- a/tests/pytest/insert/line_insert.py +++ b/tests/pytest/insert/line_insert.py @@ -77,6 +77,8 @@ class TDTestCase: "sth,t1=4i64,t2=5f64,t4=5f64,ID=\"childtable\" c1=3i64,c3=L\"passitagin_stf\",c2=false,c5=5f64,c6=7u64 1626006933641ms", "sth,t1=4i64,t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin_stf\",c2=false,c5=5f64,c6=7u64 1626006933654ms" ]) + tdSql.execute('reset query cache') + tdSql.query('select tbname, * from sth') tdSql.checkRows(2) diff --git a/tests/pytest/tools/taosdemoAllTest/querrThreads0.json b/tests/pytest/tools/taosdemoAllTest/querrThreads0.json index 69557a784180acec3c6de059b9285df4d4b31456..3999845dec12042eecd031a4731f3aa8403d067d 100644 --- a/tests/pytest/tools/taosdemoAllTest/querrThreads0.json +++ b/tests/pytest/tools/taosdemoAllTest/querrThreads0.json @@ -7,7 +7,7 @@ "password": "taosdata", "confirm_parameter_prompt": "no", "databases": "db", - "query_times":3, + "query_times": 3, "specified_table_query": { "query_interval": 0, "concurrent": 1, @@ -34,4 +34,4 @@ ] } } - \ No newline at end of file + diff --git a/tests/pytest/tools/taosdemoAllTest/querrThreadsless0.json b/tests/pytest/tools/taosdemoAllTest/querrThreadsless0.json index 9074ae8fd1049d2dbaedfff881feefd84583ca20..646cbcfbe21a7fa7fd6f305eadda63fdce00dcf5 100644 --- a/tests/pytest/tools/taosdemoAllTest/querrThreadsless0.json +++ b/tests/pytest/tools/taosdemoAllTest/querrThreadsless0.json @@ -7,7 +7,7 @@ "password": "taosdata", "confirm_parameter_prompt": "no", "databases": "db", - "query_times":3, + "query_times": 3, "specified_table_query": { "query_interval": 0, "concurrent": 1, @@ -34,4 +34,4 @@ ] } } - \ No newline at end of file + diff --git a/tests/script/sh/stop_dnodes.sh b/tests/script/sh/stop_dnodes.sh index 430f39901e62415e780999171139fcd961cdd54c..4c6d8e03510a39c2d5d1d020b5cfe7dabee39cb0 100755 --- a/tests/script/sh/stop_dnodes.sh +++ b/tests/script/sh/stop_dnodes.sh @@ -14,7 +14,7 @@ while [ -n "$PID" ]; do echo kill -9 $PID pkill -9 taosd echo "Killing processes locking on port 6030" - if [[ "$OS_TYPE" != "Darwin" ]]; then + if [ "$OS_TYPE" != "Darwin" ]; then fuser -k -n tcp 6030 else lsof -nti:6030 | xargs kill -9 @@ -26,7 +26,7 @@ PID=`ps -ef|grep -w tarbitrator | grep -v grep | awk '{print $2}'` while [ -n "$PID" ]; do echo kill -9 $PID pkill -9 tarbitrator - if [[ "$OS_TYPE" != "Darwin" ]]; then + if [ "$OS_TYPE" != "Darwin" ]; then fuser -k -n tcp 6040 else lsof -nti:6040 | xargs kill -9