未验证 提交 958be012 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #7010 from taosdata/feature/query

Feature/query
...@@ -23,7 +23,7 @@ ...@@ -23,7 +23,7 @@
static SBnThread tsBnThread; static SBnThread tsBnThread;
static void *bnThreadFunc(void *arg) { static void *bnThreadFunc(void *arg) {
setThreadName("bnThreadd"); setThreadName("balance");
while (1) { while (1) {
pthread_mutex_lock(&tsBnThread.mutex); pthread_mutex_lock(&tsBnThread.mutex);
......
...@@ -355,6 +355,8 @@ char* strdup_throw(const char* str); ...@@ -355,6 +355,8 @@ char* strdup_throw(const char* str);
bool vgroupInfoIdentical(SNewVgroupInfo *pExisted, SVgroupMsg* src); bool vgroupInfoIdentical(SNewVgroupInfo *pExisted, SVgroupMsg* src);
SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg); SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg);
void tscRemoveTableMetaBuf(STableMetaInfo* pTableMetaInfo, uint64_t id);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -151,7 +151,8 @@ typedef struct STableDataBlocks { ...@@ -151,7 +151,8 @@ typedef struct STableDataBlocks {
typedef struct { typedef struct {
STableMeta *pTableMeta; STableMeta *pTableMeta;
SVgroupsInfo *pVgroupInfo; SArray *vgroupIdList;
// SVgroupsInfo *pVgroupsInfo;
} STableMetaVgroupInfo; } STableMetaVgroupInfo;
typedef struct SInsertStatementParam { typedef struct SInsertStatementParam {
...@@ -375,6 +376,8 @@ void tscResetSqlCmd(SSqlCmd *pCmd, bool removeMeta); ...@@ -375,6 +376,8 @@ void tscResetSqlCmd(SSqlCmd *pCmd, bool removeMeta);
*/ */
void tscFreeSqlResult(SSqlObj *pSql); void tscFreeSqlResult(SSqlObj *pSql);
void* tscCleanupTableMetaMap(SHashObj* pTableMetaMap);
/** /**
* free sql object, release allocated resource * free sql object, release allocated resource
* @param pObj * @param pObj
...@@ -415,7 +418,8 @@ int32_t tscValidateSqlInfo(SSqlObj *pSql, struct SSqlInfo *pInfo); ...@@ -415,7 +418,8 @@ int32_t tscValidateSqlInfo(SSqlObj *pSql, struct SSqlInfo *pInfo);
int32_t tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows); int32_t tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows);
extern int32_t sentinel; extern int32_t sentinel;
extern SHashObj *tscVgroupMap; extern SHashObj *tscVgroupMap;
extern SHashObj *tscTableMetaInfo; extern SHashObj *tscTableMetaMap;
extern SCacheObj *tscVgroupListBuf;
extern int tscObjRef; extern int tscObjRef;
extern void *tscTmr; extern void *tscTmr;
......
...@@ -325,61 +325,6 @@ void tscAsyncResultOnError(SSqlObj* pSql) { ...@@ -325,61 +325,6 @@ void tscAsyncResultOnError(SSqlObj* pSql) {
int tscSendMsgToServer(SSqlObj *pSql); int tscSendMsgToServer(SSqlObj *pSql);
static int32_t updateMetaBeforeRetryQuery(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SQueryInfo* pQueryInfo) {
// handle the invalid table error code for super table.
// update the pExpr info, colList info, number of table columns
// TODO Re-parse this sql and issue the corresponding subquery as an alternative for this case.
if (pSql->retryReason == TSDB_CODE_TDB_INVALID_TABLE_ID) {
int32_t numOfExprs = (int32_t) tscNumOfExprs(pQueryInfo);
int32_t numOfCols = tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
int32_t numOfTags = tscGetNumOfTags(pTableMetaInfo->pTableMeta);
SSchema *pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
SSchema *pTagSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);
for (int32_t i = 0; i < numOfExprs; ++i) {
SSqlExpr *pExpr = &(tscExprGet(pQueryInfo, i)->base);
// update the table uid
pExpr->uid = pTableMetaInfo->pTableMeta->id.uid;
if (pExpr->colInfo.colIndex >= 0) {
int32_t index = pExpr->colInfo.colIndex;
if ((TSDB_COL_IS_NORMAL_COL(pExpr->colInfo.flag) && index >= numOfCols) ||
(TSDB_COL_IS_TAG(pExpr->colInfo.flag) && (index < 0 || index >= numOfTags))) {
return pSql->retryReason;
}
if (TSDB_COL_IS_TAG(pExpr->colInfo.flag)) {
if ((pTagSchema[pExpr->colInfo.colIndex].colId != pExpr->colInfo.colId) &&
strcasecmp(pExpr->colInfo.name, pTagSchema[pExpr->colInfo.colIndex].name) != 0) {
return pSql->retryReason;
}
} else if (TSDB_COL_IS_NORMAL_COL(pExpr->colInfo.flag)) {
if ((pSchema[pExpr->colInfo.colIndex].colId != pExpr->colInfo.colId) &&
strcasecmp(pExpr->colInfo.name, pSchema[pExpr->colInfo.colIndex].name) != 0) {
return pSql->retryReason;
}
} else { // do nothing for udc
}
}
}
// validate the table columns information
for (int32_t i = 0; i < taosArrayGetSize(pQueryInfo->colList); ++i) {
SColumn *pCol = taosArrayGetP(pQueryInfo->colList, i);
if (pCol->columnIndex >= numOfCols) {
return pSql->retryReason;
}
}
} else {
// do nothing
}
return TSDB_CODE_SUCCESS;
}
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, (int64_t)param); SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, (int64_t)param);
if (pSql == NULL) return; if (pSql == NULL) return;
...@@ -391,7 +336,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -391,7 +336,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
pRes->code = code; pRes->code = code;
SSqlObj *sub = (SSqlObj*) res; SSqlObj *sub = (SSqlObj*) res;
const char* msg = (sub->cmd.command == TSDB_SQL_STABLEVGROUP)? "vgroup-list":"table-meta"; const char* msg = (sub->cmd.command == TSDB_SQL_STABLEVGROUP)? "vgroup-list":"multi-tableMeta";
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tscError("0x%"PRIx64" get %s failed, code:%s", pSql->self, msg, tstrerror(code)); tscError("0x%"PRIx64" get %s failed, code:%s", pSql->self, msg, tstrerror(code));
goto _error; goto _error;
...@@ -401,85 +346,56 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -401,85 +346,56 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
if (pSql->pStream == NULL) { if (pSql->pStream == NULL) {
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd); SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
// check if it is a sub-query of super table query first, if true, enter another routine if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT)) {
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY | TSDB_QUERY_TYPE_SUBQUERY | tscDebug("0x%" PRIx64 " continue parse sql after get table-meta", pSql->self);
TSDB_QUERY_TYPE_TAG_FILTER_QUERY))) {
tscDebug("0x%" PRIx64 " update cached table-meta, continue to process sql and send the corresponding query", pSql->self);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo);
assert(code == TSDB_CODE_TSC_ACTION_IN_PROGRESS || code == TSDB_CODE_SUCCESS);
code = tsParseSql(pSql, false);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, pSql->self); taosReleaseRef(tscObjRef, pSql->self);
return; return;
} } else if (code != TSDB_CODE_SUCCESS) {
assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0));
code = updateMetaBeforeRetryQuery(pSql, pTableMetaInfo, pQueryInfo);
if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
// tscBuildAndSendRequest can add error into async res if (TSDB_QUERY_HAS_TYPE(pCmd->insertParam.insertType, TSDB_QUERY_TYPE_STMT_INSERT)) { // stmt insert
tscBuildAndSendRequest(pSql, NULL); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
taosReleaseRef(tscObjRef, pSql->self); code = tscGetTableMeta(pSql, pTableMetaInfo);
return;
} else { // continue to process normal async query
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT)) {
tscDebug("0x%" PRIx64 " continue parse sql after get table-meta", pSql->self);
code = tsParseSql(pSql, false);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, pSql->self); taosReleaseRef(tscObjRef, pSql->self);
return; return;
} else if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
if (TSDB_QUERY_HAS_TYPE(pCmd->insertParam.insertType, TSDB_QUERY_TYPE_STMT_INSERT)) {
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, pSql->self);
return;
} else {
assert(code == TSDB_CODE_SUCCESS);
}
(*pSql->fp)(pSql->param, pSql, code);
} else { } else {
if (TSDB_QUERY_HAS_TYPE(pCmd->insertParam.insertType, TSDB_QUERY_TYPE_FILE_INSERT)) { assert(code == TSDB_CODE_SUCCESS);
tscImportDataFromFile(pSql);
} else {
tscHandleMultivnodeInsert(pSql);
}
}
} else {
if (pSql->retryReason != TSDB_CODE_SUCCESS) {
tscDebug("0x%" PRIx64 " update cached table-meta, re-validate sql statement and send query again",
pSql->self);
tscResetSqlCmd(pCmd, false);
pSql->retryReason = TSDB_CODE_SUCCESS;
} else {
tscDebug("0x%" PRIx64 " cached table-meta, continue validate sql statement and send query", pSql->self);
} }
code = tsParseSql(pSql, true); (*pSql->fp)(pSql->param, pSql, code);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { } else if (TSDB_QUERY_HAS_TYPE(pCmd->insertParam.insertType, TSDB_QUERY_TYPE_FILE_INSERT)) { // file insert
taosReleaseRef(tscObjRef, pSql->self); tscImportDataFromFile(pSql);
return; } else { // sql string insert
} else if (code != TSDB_CODE_SUCCESS) { tscHandleMultivnodeInsert(pSql);
goto _error; }
} } else {
if (pSql->retryReason != TSDB_CODE_SUCCESS) {
tscDebug("0x%" PRIx64 " update cached table-meta, re-validate sql statement and send query again", pSql->self);
tscResetSqlCmd(pCmd, false);
pSql->retryReason = TSDB_CODE_SUCCESS;
} else {
tscDebug("0x%" PRIx64 " cached table-meta, continue validate sql statement and send query", pSql->self);
}
SQueryInfo *pQueryInfo1 = tscGetQueryInfo(pCmd); code = tsParseSql(pSql, true);
executeQuery(pSql, pQueryInfo1); if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, pSql->self);
return;
} else if (code != TSDB_CODE_SUCCESS) {
goto _error;
} }
taosReleaseRef(tscObjRef, pSql->self); SQueryInfo *pQueryInfo1 = tscGetQueryInfo(pCmd);
return; executeQuery(pSql, pQueryInfo1);
} }
taosReleaseRef(tscObjRef, pSql->self);
return;
} else { // stream computing } else { // stream computing
tscDebug("0x%"PRIx64" stream:%p meta is updated, start new query, command:%d", pSql->self, pSql->pStream, pCmd->command); tscDebug("0x%"PRIx64" stream:%p meta is updated, start new query, command:%d", pSql->self, pSql->pStream, pCmd->command);
......
...@@ -920,7 +920,8 @@ int tscProcessLocalCmd(SSqlObj *pSql) { ...@@ -920,7 +920,8 @@ int tscProcessLocalCmd(SSqlObj *pSql) {
} else if (pCmd->command == TSDB_SQL_SHOW_CREATE_DATABASE) { } else if (pCmd->command == TSDB_SQL_SHOW_CREATE_DATABASE) {
pRes->code = tscProcessShowCreateDatabase(pSql); pRes->code = tscProcessShowCreateDatabase(pSql);
} else if (pCmd->command == TSDB_SQL_RESET_CACHE) { } else if (pCmd->command == TSDB_SQL_RESET_CACHE) {
taosHashClear(tscTableMetaInfo); taosHashClear(tscTableMetaMap);
taosCacheEmpty(tscVgroupListBuf);
pRes->code = TSDB_CODE_SUCCESS; pRes->code = TSDB_CODE_SUCCESS;
} else if (pCmd->command == TSDB_SQL_SERV_VERSION) { } else if (pCmd->command == TSDB_SQL_SERV_VERSION) {
pRes->code = tscProcessServerVer(pSql); pRes->code = tscProcessServerVer(pSql);
......
...@@ -457,7 +457,7 @@ int32_t loadTableMeta(TAOS* taos, char* tableName, SSmlSTableSchema* schema, SSm ...@@ -457,7 +457,7 @@ int32_t loadTableMeta(TAOS* taos, char* tableName, SSmlSTableSchema* schema, SSm
uint32_t size = tscGetTableMetaMaxSize(); uint32_t size = tscGetTableMetaMaxSize();
STableMeta* tableMeta = calloc(1, size); STableMeta* tableMeta = calloc(1, size);
taosHashGetClone(tscTableMetaInfo, fullTableName, strlen(fullTableName), NULL, tableMeta, -1); taosHashGetClone(tscTableMetaMap, fullTableName, strlen(fullTableName), NULL, tableMeta);
tstrncpy(schema->sTableName, tableName, strlen(tableName)+1); tstrncpy(schema->sTableName, tableName, strlen(tableName)+1);
schema->precision = tableMeta->tableInfo.precision; schema->precision = tableMeta->tableInfo.precision;
......
...@@ -80,8 +80,8 @@ static void getColumnName(tSqlExprItem* pItem, char* resultFieldName, char* rawN ...@@ -80,8 +80,8 @@ static void getColumnName(tSqlExprItem* pItem, char* resultFieldName, char* rawN
static int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t colIndex, tSqlExprItem* pItem, static int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t colIndex, tSqlExprItem* pItem,
bool finalResult, SUdfInfo* pUdfInfo); bool finalResult, SUdfInfo* pUdfInfo);
static int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SColumnList* pIdList, int16_t bytes, static int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SColumnList* pColList, int16_t bytes,
int8_t type, char* fieldName, SExprInfo* pSqlExpr); int8_t type, char* fieldName, SExprInfo* pSqlExpr);
static uint8_t convertRelationalOperator(SStrToken *pToken); static uint8_t convertRelationalOperator(SStrToken *pToken);
...@@ -7247,7 +7247,7 @@ void tscPrintSelNodeList(SSqlObj* pSql, int32_t subClauseIndex) { ...@@ -7247,7 +7247,7 @@ void tscPrintSelNodeList(SSqlObj* pSql, int32_t subClauseIndex) {
} }
tmpLen = tmpLen =
sprintf(tmpBuf, "%s(uid:%" PRId64 ", %d)", name, pExpr->base.uid, pExpr->base.colInfo.colId); sprintf(tmpBuf, "%s(uid:%" PRIu64 ", %d)", name, pExpr->base.uid, pExpr->base.colInfo.colId);
if (tmpLen + offset >= totalBufSize - 1) break; if (tmpLen + offset >= totalBufSize - 1) break;
...@@ -8123,6 +8123,7 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -8123,6 +8123,7 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
} }
pTableMeta = calloc(1, maxSize); pTableMeta = calloc(1, maxSize);
plist = taosArrayInit(4, POINTER_BYTES); plist = taosArrayInit(4, POINTER_BYTES);
...@@ -8138,9 +8139,13 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -8138,9 +8139,13 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) {
size_t len = strlen(name); size_t len = strlen(name);
memset(pTableMeta, 0, maxSize); memset(pTableMeta, 0, maxSize);
taosHashGetClone(tscTableMetaInfo, name, len, NULL, pTableMeta, -1); taosHashGetClone(tscTableMetaMap, name, len, NULL, pTableMeta);
if (pTableMeta->id.uid > 0) { if (pTableMeta->id.uid > 0) {
tscDebug("0x%"PRIx64" retrieve table meta %s from local buf", pSql->self, name);
// avoid mem leak, may should update pTableMeta
void* pVgroupIdList = NULL;
if (pTableMeta->tableType == TSDB_CHILD_TABLE) { if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
code = tscCreateTableMetaFromSTableMeta(pTableMeta, name, pSql->pBuf); code = tscCreateTableMetaFromSTableMeta(pTableMeta, name, pSql->pBuf);
...@@ -8152,23 +8157,34 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -8152,23 +8157,34 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) {
} }
} else if (pTableMeta->tableType == TSDB_SUPER_TABLE) { } else if (pTableMeta->tableType == TSDB_SUPER_TABLE) {
// the vgroup list of super table is not kept in local buffer, so here need retrieve it from the mnode each time // the vgroup list of super table is not kept in local buffer, so here need retrieve it from the mnode each time
char* t = strdup(name); tscDebug("0x%"PRIx64" try to acquire cached super table %s vgroup id list", pSql->self, name);
taosArrayPush(pVgroupList, &t); void* pv = taosCacheAcquireByKey(tscVgroupListBuf, name, len);
} if (pv == NULL) {
char* t = strdup(name);
taosArrayPush(pVgroupList, &t);
tscDebug("0x%"PRIx64" failed to retrieve stable %s vgroup id list in cache, try fetch from mnode", pSql->self, name);
} else {
tFilePage* pdata = (tFilePage*) pv;
pVgroupIdList = taosArrayInit((size_t) pdata->num, sizeof(int32_t));
if (pVgroupIdList == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
//STableMeta* pMeta = tscTableMetaDup(pTableMeta); taosArrayAddBatch(pVgroupIdList, pdata->data, (int32_t) pdata->num);
//STableMetaVgroupInfo p = { .pTableMeta = pMeta }; taosCacheRelease(tscVgroupListBuf, &pv, false);
}
}
//const char* px = tNameGetTableName(pname); if (taosHashGet(pCmd->pTableMetaMap, name, len) == NULL) {
//taosHashPut(pCmd->pTableMetaMap, px, strlen(px), &p, sizeof(STableMetaVgroupInfo));
// avoid mem leak, may should update pTableMeta
const char* px = tNameGetTableName(pname);
if (taosHashGet(pCmd->pTableMetaMap, px, strlen(px)) == NULL) {
STableMeta* pMeta = tscTableMetaDup(pTableMeta); STableMeta* pMeta = tscTableMetaDup(pTableMeta);
STableMetaVgroupInfo p = { .pTableMeta = pMeta, .pVgroupInfo = NULL}; STableMetaVgroupInfo tvi = { .pTableMeta = pMeta, .vgroupIdList = pVgroupIdList};
taosHashPut(pCmd->pTableMetaMap, px, strlen(px), &p, sizeof(STableMetaVgroupInfo)); taosHashPut(pCmd->pTableMetaMap, name, len, &tvi, sizeof(STableMetaVgroupInfo));
} }
} else { // add to the retrieve table meta array list. } else {
// Add to the retrieve table meta array list.
// If the tableMeta is missing, the cached vgroup list for the corresponding super table will be ignored.
tscDebug("0x%"PRIx64" failed to retrieve table meta %s from local buf", pSql->self, name);
char* t = strdup(name); char* t = strdup(name);
taosArrayPush(plist, &t); taosArrayPush(plist, &t);
} }
...@@ -8282,22 +8298,44 @@ static int32_t doLoadAllTableMeta(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNod ...@@ -8282,22 +8298,44 @@ static int32_t doLoadAllTableMeta(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNod
strncpy(pTableMetaInfo->aliasName, tNameGetTableName(&pTableMetaInfo->name), tListLen(pTableMetaInfo->aliasName)); strncpy(pTableMetaInfo->aliasName, tNameGetTableName(&pTableMetaInfo->name), tListLen(pTableMetaInfo->aliasName));
} }
const char* name = tNameGetTableName(&pTableMetaInfo->name); char fname[TSDB_TABLE_FNAME_LEN] = {0};
STableMetaVgroupInfo* p = taosHashGet(pCmd->pTableMetaMap, name, strlen(name)); tNameExtractFullName(&pTableMetaInfo->name, fname);
STableMetaVgroupInfo* p = taosHashGet(pCmd->pTableMetaMap, fname, strnlen(fname, TSDB_TABLE_FNAME_LEN));
pTableMetaInfo->pTableMeta = tscTableMetaDup(p->pTableMeta); pTableMetaInfo->pTableMeta = tscTableMetaDup(p->pTableMeta);
assert(pTableMetaInfo->pTableMeta != NULL); assert(pTableMetaInfo->pTableMeta != NULL);
if (p->pVgroupInfo != NULL) { if (p->vgroupIdList != NULL) {
pTableMetaInfo->vgroupList = tscVgroupsInfoDup(p->pVgroupInfo); size_t s = taosArrayGetSize(p->vgroupIdList);
}
if (code != TSDB_CODE_SUCCESS) { size_t vgroupsz = sizeof(SVgroupInfo) * s + sizeof(SVgroupsInfo);
return code; pTableMetaInfo->vgroupList = calloc(1, vgroupsz);
if (pTableMetaInfo->vgroupList == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
pTableMetaInfo->vgroupList->numOfVgroups = (int32_t) s;
for(int32_t j = 0; j < s; ++j) {
int32_t* id = taosArrayGet(p->vgroupIdList, j);
// check if current buffer contains the vgroup info. If not, add it
SNewVgroupInfo existVgroupInfo = {.inUse = -1,};
taosHashGetClone(tscVgroupMap, id, sizeof(*id), NULL, &existVgroupInfo);
assert(existVgroupInfo.inUse >= 0);
SVgroupInfo *pVgroup = &pTableMetaInfo->vgroupList->vgroups[j];
pVgroup->numOfEps = existVgroupInfo.numOfEps;
pVgroup->vgId = existVgroupInfo.vgId;
for (int32_t k = 0; k < existVgroupInfo.numOfEps; ++k) {
pVgroup->epAddr[k].port = existVgroupInfo.ep[k].port;
pVgroup->epAddr[k].fqdn = strndup(existVgroupInfo.ep[k].fqdn, TSDB_FQDN_LEN);
}
}
} }
} }
return TSDB_CODE_SUCCESS; return code;
} }
static STableMeta* extractTempTableMetaFromSubquery(SQueryInfo* pUpstream) { static STableMeta* extractTempTableMetaFromSubquery(SQueryInfo* pUpstream) {
......
此差异已折叠。
...@@ -206,7 +206,7 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf ...@@ -206,7 +206,7 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
char name[TSDB_TABLE_FNAME_LEN] = {0}; char name[TSDB_TABLE_FNAME_LEN] = {0};
tNameExtractFullName(&pTableMetaInfo->name, name); tNameExtractFullName(&pTableMetaInfo->name, name);
taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
tfree(pTableMetaInfo->pTableMeta); tfree(pTableMetaInfo->pTableMeta);
......
...@@ -2704,8 +2704,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO ...@@ -2704,8 +2704,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
tstrerror(pParentSql->res.code)); tstrerror(pParentSql->res.code));
// release allocated resource // release allocated resource
tscDestroyGlobalMergerEnv(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, tscDestroyGlobalMergerEnv(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, pState->numOfSub);
pState->numOfSub);
tscFreeRetrieveSup(pSql); tscFreeRetrieveSup(pSql);
...@@ -2713,7 +2712,35 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO ...@@ -2713,7 +2712,35 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pParentSql->cmd); SQueryInfo *pQueryInfo = tscGetQueryInfo(&pParentSql->cmd);
if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) { if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
(*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.code);
int32_t code = pParentSql->res.code;
if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && pParentSql->retry < pParentSql->maxRetry) {
// remove the cached tableMeta and vgroup id list, and then parse the sql again
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pParentSql->cmd, 0);
tscRemoveTableMetaBuf(pTableMetaInfo, pParentSql->self);
tscResetSqlCmd(&pParentSql->cmd, true);
pParentSql->res.code = TSDB_CODE_SUCCESS;
pParentSql->retry++;
tscDebug("0x%"PRIx64" retry parse sql and send query, prev error: %s, retry:%d", pParentSql->self,
tstrerror(code), pParentSql->retry);
code = tsParseSql(pParentSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
return;
}
if (code != TSDB_CODE_SUCCESS) {
pParentSql->res.code = code;
tscAsyncResultOnError(pParentSql);
return;
}
executeQuery(pParentSql, pQueryInfo);
} else {
(*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.code);
}
} else { // regular super table query } else { // regular super table query
if (pParentSql->res.code != TSDB_CODE_SUCCESS) { if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
tscAsyncResultOnError(pParentSql); tscAsyncResultOnError(pParentSql);
...@@ -2996,7 +3023,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { ...@@ -2996,7 +3023,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
if (taos_errno(pSql) != TSDB_CODE_SUCCESS) { if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
assert(code == taos_errno(pSql)); assert(code == taos_errno(pSql));
if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY) { if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && (code != TSDB_CODE_TDB_INVALID_TABLE_ID)) {
tscError("0x%"PRIx64" sub:0x%"PRIx64" failed code:%s, retry:%d", pParentSql->self, pSql->self, tstrerror(code), trsupport->numOfRetry); tscError("0x%"PRIx64" sub:0x%"PRIx64" failed code:%s, retry:%d", pParentSql->self, pSql->self, tstrerror(code), trsupport->numOfRetry);
int32_t sent = 0; int32_t sent = 0;
...@@ -3005,7 +3032,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { ...@@ -3005,7 +3032,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
return; return;
} }
} else { } else {
tscError("0x%"PRIx64" sub:0x%"PRIx64" reach the max retry times, set global code:%s", pParentSql->self, pSql->self, tstrerror(code)); tscError("0x%"PRIx64" sub:0x%"PRIx64" reach the max retry times or no need to retry, set global code:%s", pParentSql->self, pSql->self, tstrerror(code));
atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, code); // set global code and abort atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, code); // set global code and abort
} }
...@@ -3125,12 +3152,10 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) ...@@ -3125,12 +3152,10 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
for(int32_t i = 0; i < pParentObj->cmd.insertParam.numOfTables; ++i) { for(int32_t i = 0; i < pParentObj->cmd.insertParam.numOfTables; ++i) {
char name[TSDB_TABLE_FNAME_LEN] = {0}; char name[TSDB_TABLE_FNAME_LEN] = {0};
tNameExtractFullName(pParentObj->cmd.insertParam.pTableNameList[i], name); tNameExtractFullName(pParentObj->cmd.insertParam.pTableNameList[i], name);
taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
} }
pParentObj->res.code = TSDB_CODE_SUCCESS; pParentObj->res.code = TSDB_CODE_SUCCESS;
// pParentObj->cmd.parseFinished = false;
tscResetSqlCmd(&pParentObj->cmd, false); tscResetSqlCmd(&pParentObj->cmd, false);
// in case of insert, redo parsing the sql string and build new submit data block for two reasons: // in case of insert, redo parsing the sql string and build new submit data block for two reasons:
......
...@@ -19,15 +19,12 @@ ...@@ -19,15 +19,12 @@
#include "trpc.h" #include "trpc.h"
#include "tnote.h" #include "tnote.h"
#include "ttimer.h" #include "ttimer.h"
#include "tutil.h"
#include "tsched.h" #include "tsched.h"
#include "tscLog.h" #include "tscLog.h"
#include "tscUtil.h"
#include "tsclient.h" #include "tsclient.h"
#include "tglobal.h" #include "tglobal.h"
#include "tconfig.h" #include "tconfig.h"
#include "ttimezone.h" #include "ttimezone.h"
#include "tlocale.h"
#include "qScript.h" #include "qScript.h"
// global, not configurable // global, not configurable
...@@ -36,8 +33,10 @@ ...@@ -36,8 +33,10 @@
int32_t sentinel = TSC_VAR_NOT_RELEASE; int32_t sentinel = TSC_VAR_NOT_RELEASE;
SHashObj *tscVgroupMap; // hash map to keep the global vgroup info SHashObj *tscVgroupMap; // hash map to keep the vgroup info from mnode
SHashObj *tscTableMetaInfo; // table meta info SHashObj *tscTableMetaMap; // table meta info buffer
SCacheObj *tscVgroupListBuf; // super table vgroup list information, only survives 5 seconds for each super table vgroup list
int32_t tscObjRef = -1; int32_t tscObjRef = -1;
void *tscTmr; void *tscTmr;
void *tscQhandle; void *tscQhandle;
...@@ -45,17 +44,21 @@ int32_t tscRefId = -1; ...@@ -45,17 +44,21 @@ int32_t tscRefId = -1;
int32_t tscNumOfObj = 0; // number of sqlObj in current process. int32_t tscNumOfObj = 0; // number of sqlObj in current process.
static void *tscCheckDiskUsageTmr; static void *tscCheckDiskUsageTmr;
void *tscRpcCache; // cache to keep rpc obj void *tscRpcCache; // cache to keep rpc obj
int32_t tscNumOfThreads = 1; // num of rpc threads int32_t tscNumOfThreads = 1; // num of rpc threads
char tscLogFileName[12] = "taoslog"; char tscLogFileName[12] = "taoslog";
int tscLogFileNum = 10; int tscLogFileNum = 10;
static pthread_mutex_t rpcObjMutex; // mutex to protect open the rpc obj concurrently
static pthread_once_t tscinit = PTHREAD_ONCE_INIT; static pthread_mutex_t rpcObjMutex; // mutex to protect open the rpc obj concurrently
static pthread_once_t tscinit = PTHREAD_ONCE_INIT;
// pthread_once can not return result code, so result code is set to a global variable.
static volatile int tscInitRes = 0; static volatile int tscInitRes = 0;
void tscCheckDiskUsage(void *UNUSED_PARAM(para), void *UNUSED_PARAM(param)) { void tscCheckDiskUsage(void *UNUSED_PARAM(para), void *UNUSED_PARAM(param)) {
taosGetDisk(); taosGetDisk();
taosTmrReset(tscCheckDiskUsage, 20 * 1000, NULL, tscTmr, &tscCheckDiskUsageTmr); taosTmrReset(tscCheckDiskUsage, 20 * 1000, NULL, tscTmr, &tscCheckDiskUsageTmr);
} }
void tscFreeRpcObj(void *param) { void tscFreeRpcObj(void *param) {
assert(param); assert(param);
SRpcObj *pRpcObj = (SRpcObj *)(param); SRpcObj *pRpcObj = (SRpcObj *)(param);
...@@ -67,10 +70,9 @@ void tscReleaseRpc(void *param) { ...@@ -67,10 +70,9 @@ void tscReleaseRpc(void *param) {
if (param == NULL) { if (param == NULL) {
return; return;
} }
pthread_mutex_lock(&rpcObjMutex);
taosCacheRelease(tscRpcCache, (void *)&param, false); taosCacheRelease(tscRpcCache, (void *)&param, false);
pthread_mutex_unlock(&rpcObjMutex); }
}
int32_t tscAcquireRpc(const char *key, const char *user, const char *secretEncrypt, void **ppRpcObj) { int32_t tscAcquireRpc(const char *key, const char *user, const char *secretEncrypt, void **ppRpcObj) {
pthread_mutex_lock(&rpcObjMutex); pthread_mutex_lock(&rpcObjMutex);
...@@ -80,7 +82,7 @@ int32_t tscAcquireRpc(const char *key, const char *user, const char *secretEncry ...@@ -80,7 +82,7 @@ int32_t tscAcquireRpc(const char *key, const char *user, const char *secretEncry
*ppRpcObj = pRpcObj; *ppRpcObj = pRpcObj;
pthread_mutex_unlock(&rpcObjMutex); pthread_mutex_unlock(&rpcObjMutex);
return 0; return 0;
} }
SRpcInit rpcInit; SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
...@@ -104,7 +106,8 @@ int32_t tscAcquireRpc(const char *key, const char *user, const char *secretEncry ...@@ -104,7 +106,8 @@ int32_t tscAcquireRpc(const char *key, const char *user, const char *secretEncry
pthread_mutex_unlock(&rpcObjMutex); pthread_mutex_unlock(&rpcObjMutex);
tscError("failed to init connection to TDengine"); tscError("failed to init connection to TDengine");
return -1; return -1;
} }
pRpcObj = taosCachePut(tscRpcCache, rpcObj.key, strlen(rpcObj.key), &rpcObj, sizeof(rpcObj), 1000*5); pRpcObj = taosCachePut(tscRpcCache, rpcObj.key, strlen(rpcObj.key), &rpcObj, sizeof(rpcObj), 1000*5);
if (pRpcObj == NULL) { if (pRpcObj == NULL) {
rpcClose(rpcObj.pDnodeConn); rpcClose(rpcObj.pDnodeConn);
...@@ -118,7 +121,7 @@ int32_t tscAcquireRpc(const char *key, const char *user, const char *secretEncry ...@@ -118,7 +121,7 @@ int32_t tscAcquireRpc(const char *key, const char *user, const char *secretEncry
} }
void taos_init_imp(void) { void taos_init_imp(void) {
char temp[128] = {0}; char temp[128] = {0};
errno = TSDB_CODE_SUCCESS; errno = TSDB_CODE_SUCCESS;
srand(taosGetTimestampSec()); srand(taosGetTimestampSec());
...@@ -151,36 +154,41 @@ void taos_init_imp(void) { ...@@ -151,36 +154,41 @@ void taos_init_imp(void) {
rpcInit(); rpcInit();
scriptEnvPoolInit(); scriptEnvPoolInit();
tscDebug("starting to initialize TAOS client ..."); tscDebug("starting to initialize TAOS client ...");
tscDebug("Local End Point is:%s", tsLocalEp); tscDebug("Local End Point is:%s", tsLocalEp);
} }
taosSetCoreDump(); taosSetCoreDump();
tscInitMsgsFp(); tscInitMsgsFp();
int queueSize = tsMaxConnections*2;
double factor = (tscEmbedded == 0)? 2.0:4.0; double factor = (tscEmbedded == 0)? 2.0:4.0;
tscNumOfThreads = (int)(tsNumOfCores * tsNumOfThreadsPerCore / factor); tscNumOfThreads = (int)(tsNumOfCores * tsNumOfThreadsPerCore / factor);
if (tscNumOfThreads < 2) { if (tscNumOfThreads < 2) {
tscNumOfThreads = 2; tscNumOfThreads = 2;
} }
int32_t queueSize = tsMaxConnections*2;
tscQhandle = taosInitScheduler(queueSize, tscNumOfThreads, "tsc"); tscQhandle = taosInitScheduler(queueSize, tscNumOfThreads, "tsc");
if (NULL == tscQhandle) { if (NULL == tscQhandle) {
tscError("failed to init scheduler"); tscError("failed to init task queue");
tscInitRes = -1; tscInitRes = -1;
return; return;
} }
tscDebug("client task queue is initialized, numOfWorkers: %d", tscNumOfThreads);
tscTmr = taosTmrInit(tsMaxConnections * 2, 200, 60000, "TSC"); tscTmr = taosTmrInit(tsMaxConnections * 2, 200, 60000, "TSC");
if(0 == tscEmbedded){ if(0 == tscEmbedded){
taosTmrReset(tscCheckDiskUsage, 20 * 1000, NULL, tscTmr, &tscCheckDiskUsageTmr); taosTmrReset(tscCheckDiskUsage, 20 * 1000, NULL, tscTmr, &tscCheckDiskUsageTmr);
} }
if (tscTableMetaInfo == NULL) { if (tscTableMetaMap == NULL) {
tscObjRef = taosOpenRef(40960, tscFreeRegisteredSqlObj); tscObjRef = taosOpenRef(40960, tscFreeRegisteredSqlObj);
tscVgroupMap = taosHashInit(256, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); tscVgroupMap = taosHashInit(256, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
tscTableMetaInfo = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); tscTableMetaMap = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
tscDebug("TableMeta:%p", tscTableMetaInfo); tscVgroupListBuf = taosCacheInit(TSDB_DATA_TYPE_BINARY, 5, false, NULL, "stable-vgroup-list");
tscDebug("TableMeta:%p, vgroup:%p is initialized", tscTableMetaMap, tscVgroupMap);
} }
int refreshTime = 5; int refreshTime = 5;
...@@ -189,14 +197,17 @@ void taos_init_imp(void) { ...@@ -189,14 +197,17 @@ void taos_init_imp(void) {
tscRefId = taosOpenRef(200, tscCloseTscObj); tscRefId = taosOpenRef(200, tscCloseTscObj);
// in other language APIs, taos_cleanup is not available yet. // In the APIs of other program language, taos_cleanup is not available yet.
// So, to make sure taos_cleanup will be invoked to clean up the allocated // So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning.
// resource to suppress the valgrind warning.
atexit(taos_cleanup); atexit(taos_cleanup);
tscDebug("client is initialized successfully"); tscDebug("client is initialized successfully");
} }
int taos_init() { pthread_once(&tscinit, taos_init_imp); return tscInitRes;} int taos_init() {
pthread_once(&tscinit, taos_init_imp);
return tscInitRes;
}
// this function may be called by user or system, or by both simultaneously. // this function may be called by user or system, or by both simultaneously.
void taos_cleanup(void) { void taos_cleanup(void) {
...@@ -205,11 +216,13 @@ void taos_cleanup(void) { ...@@ -205,11 +216,13 @@ void taos_cleanup(void) {
if (atomic_val_compare_exchange_32(&sentinel, TSC_VAR_NOT_RELEASE, TSC_VAR_RELEASED) != TSC_VAR_NOT_RELEASE) { if (atomic_val_compare_exchange_32(&sentinel, TSC_VAR_NOT_RELEASE, TSC_VAR_RELEASED) != TSC_VAR_NOT_RELEASE) {
return; return;
} }
if (tscEmbedded == 0) { if (tscEmbedded == 0) {
scriptEnvPoolCleanup(); scriptEnvPoolCleanup();
} }
taosHashCleanup(tscTableMetaInfo);
tscTableMetaInfo = NULL; taosHashCleanup(tscTableMetaMap);
tscTableMetaMap = NULL;
taosHashCleanup(tscVgroupMap); taosHashCleanup(tscVgroupMap);
tscVgroupMap = NULL; tscVgroupMap = NULL;
...@@ -236,6 +249,9 @@ void taos_cleanup(void) { ...@@ -236,6 +249,9 @@ void taos_cleanup(void) {
pthread_mutex_destroy(&rpcObjMutex); pthread_mutex_destroy(&rpcObjMutex);
} }
taosCacheCleanup(tscVgroupListBuf);
tscVgroupListBuf = NULL;
if (tscEmbedded == 0) { if (tscEmbedded == 0) {
rpcCleanup(); rpcCleanup();
taosCloseLog(); taosCloseLog();
......
...@@ -1388,7 +1388,7 @@ void tscResetSqlCmd(SSqlCmd* pCmd, bool clearCachedMeta) { ...@@ -1388,7 +1388,7 @@ void tscResetSqlCmd(SSqlCmd* pCmd, bool clearCachedMeta) {
if (pCmd->pTableMetaMap != NULL) { if (pCmd->pTableMetaMap != NULL) {
STableMetaVgroupInfo* p = taosHashIterate(pCmd->pTableMetaMap, NULL); STableMetaVgroupInfo* p = taosHashIterate(pCmd->pTableMetaMap, NULL);
while (p) { while (p) {
tscVgroupInfoClear(p->pVgroupInfo); taosArrayDestroy(p->vgroupIdList);
tfree(p->pTableMeta); tfree(p->pTableMeta);
p = taosHashIterate(pCmd->pTableMetaMap, p); p = taosHashIterate(pCmd->pTableMetaMap, p);
} }
...@@ -1398,6 +1398,22 @@ void tscResetSqlCmd(SSqlCmd* pCmd, bool clearCachedMeta) { ...@@ -1398,6 +1398,22 @@ void tscResetSqlCmd(SSqlCmd* pCmd, bool clearCachedMeta) {
} }
} }
void* tscCleanupTableMetaMap(SHashObj* pTableMetaMap) {
if (pTableMetaMap == NULL) {
return NULL;
}
STableMetaVgroupInfo* p = taosHashIterate(pTableMetaMap, NULL);
while (p) {
taosArrayDestroy(p->vgroupIdList);
tfree(p->pTableMeta);
p = taosHashIterate(pTableMetaMap, p);
}
taosHashCleanup(pTableMetaMap);
return NULL;
}
void tscFreeSqlResult(SSqlObj* pSql) { void tscFreeSqlResult(SSqlObj* pSql) {
SSqlRes* pRes = &pSql->res; SSqlRes* pRes = &pSql->res;
...@@ -1522,7 +1538,7 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) { ...@@ -1522,7 +1538,7 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) {
char name[TSDB_TABLE_FNAME_LEN] = {0}; char name[TSDB_TABLE_FNAME_LEN] = {0};
tNameExtractFullName(&pDataBlock->tableName, name); tNameExtractFullName(&pDataBlock->tableName, name);
taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
} }
if (!pDataBlock->cloned) { if (!pDataBlock->cloned) {
...@@ -3365,7 +3381,7 @@ void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, bool removeMeta) { ...@@ -3365,7 +3381,7 @@ void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, bool removeMeta) {
if (removeMeta) { if (removeMeta) {
char name[TSDB_TABLE_FNAME_LEN] = {0}; char name[TSDB_TABLE_FNAME_LEN] = {0};
tNameExtractFullName(&pTableMetaInfo->name, name); tNameExtractFullName(&pTableMetaInfo->name, name);
taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
} }
tscFreeVgroupTableInfo(pTableMetaInfo->pVgroupTables); tscFreeVgroupTableInfo(pTableMetaInfo->pVgroupTables);
...@@ -3481,11 +3497,9 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, __async_cb_func_t fp, void* param, in ...@@ -3481,11 +3497,9 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, __async_cb_func_t fp, void* param, in
SSqlCmd* pCmd = &pNew->cmd; SSqlCmd* pCmd = &pNew->cmd;
pCmd->command = cmd; pCmd->command = cmd;
tsem_init(&pNew->rspSem, 0 ,0);
if (tscAddQueryInfo(pCmd) != TSDB_CODE_SUCCESS) { if (tscAddQueryInfo(pCmd) != TSDB_CODE_SUCCESS) {
#ifdef __APPLE__
// to satisfy later tsem_destroy in taos_free_result
tsem_init(&pNew->rspSem, 0, 0);
#endif // __APPLE__
tscFreeSqlObj(pNew); tscFreeSqlObj(pNew);
return NULL; return NULL;
} }
...@@ -4360,7 +4374,7 @@ int32_t tscCreateTableMetaFromSTableMeta(STableMeta* pChild, const char* name, v ...@@ -4360,7 +4374,7 @@ int32_t tscCreateTableMetaFromSTableMeta(STableMeta* pChild, const char* name, v
assert(pChild != NULL && buf != NULL); assert(pChild != NULL && buf != NULL);
STableMeta* p = buf; STableMeta* p = buf;
taosHashGetClone(tscTableMetaInfo, pChild->sTableName, strnlen(pChild->sTableName, TSDB_TABLE_FNAME_LEN), NULL, p, -1); taosHashGetClone(tscTableMetaMap, pChild->sTableName, strnlen(pChild->sTableName, TSDB_TABLE_FNAME_LEN), NULL, p);
// tableMeta exists, build child table meta according to the super table meta // tableMeta exists, build child table meta according to the super table meta
// the uid need to be checked in addition to the general name of the super table. // the uid need to be checked in addition to the general name of the super table.
...@@ -4374,7 +4388,7 @@ int32_t tscCreateTableMetaFromSTableMeta(STableMeta* pChild, const char* name, v ...@@ -4374,7 +4388,7 @@ int32_t tscCreateTableMetaFromSTableMeta(STableMeta* pChild, const char* name, v
memcpy(pChild->schema, p->schema, sizeof(SSchema) *total); memcpy(pChild->schema, p->schema, sizeof(SSchema) *total);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { // super table has been removed, current tableMeta is also expired. remove it here } else { // super table has been removed, current tableMeta is also expired. remove it here
taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
return -1; return -1;
} }
} }
...@@ -4873,3 +4887,19 @@ SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg) { ...@@ -4873,3 +4887,19 @@ SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg) {
return info; return info;
} }
void tscRemoveTableMetaBuf(STableMetaInfo* pTableMetaInfo, uint64_t id) {
char fname[TSDB_TABLE_FNAME_LEN] = {0};
tNameExtractFullName(&pTableMetaInfo->name, fname);
int32_t len = (int32_t) strnlen(fname, TSDB_TABLE_FNAME_LEN);
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
void* pv = taosCacheAcquireByKey(tscVgroupListBuf, fname, len);
if (pv != NULL) {
taosCacheRelease(tscVgroupListBuf, &pv, true);
}
}
taosHashRemove(tscTableMetaMap, fname, len);
tscDebug("0x%"PRIx64" remove table meta %s, numOfRemain:%d", id, fname, (int32_t) taosHashGetSize(tscTableMetaMap));
}
\ No newline at end of file
...@@ -88,10 +88,7 @@ enum { ...@@ -88,10 +88,7 @@ enum {
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_SHOW_CREATE_STABLE, "show-create-stable") TSDB_DEFINE_SQL_TYPE( TSDB_SQL_SHOW_CREATE_STABLE, "show-create-stable")
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_SHOW_CREATE_DATABASE, "show-create-database") TSDB_DEFINE_SQL_TYPE( TSDB_SQL_SHOW_CREATE_DATABASE, "show-create-database")
/* // build empty result instead of accessing dnode to fetch result reset the client cache
* build empty result instead of accessing dnode to fetch result
* reset the client cache
*/
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_RETRIEVE_EMPTY_RESULT, "retrieve-empty-result" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_RETRIEVE_EMPTY_RESULT, "retrieve-empty-result" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_RESET_CACHE, "reset-cache" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_RESET_CACHE, "reset-cache" )
......
...@@ -245,7 +245,7 @@ static void* telemetryThread(void* param) { ...@@ -245,7 +245,7 @@ static void* telemetryThread(void* param) {
clock_gettime(CLOCK_REALTIME, &end); clock_gettime(CLOCK_REALTIME, &end);
end.tv_sec += 300; // wait 5 minutes before send first report end.tv_sec += 300; // wait 5 minutes before send first report
setThreadName("telemetryThrd"); setThreadName("telemetry");
while (!tsExit) { while (!tsExit) {
int r = 0; int r = 0;
......
...@@ -118,10 +118,11 @@ static void *dnodeProcessReadQueue(void *wparam) { ...@@ -118,10 +118,11 @@ static void *dnodeProcessReadQueue(void *wparam) {
SVReadMsg * pRead; SVReadMsg * pRead;
int32_t qtype; int32_t qtype;
void * pVnode; void * pVnode;
char name[16];
memset(name, 0, 16); char* threadname = strcmp(pPool->name, "vquery") == 0? "dnodeQueryQ":"dnodeFetchQ";
snprintf(name, 16, "%s-dnReadQ", pPool->name);
char name[16] = {0};
snprintf(name, tListLen(name), "%s", threadname);
setThreadName(name); setThreadName(name);
while (1) { while (1) {
......
...@@ -90,7 +90,6 @@ static void *dnodeOpenVnode(void *param) { ...@@ -90,7 +90,6 @@ static void *dnodeOpenVnode(void *param) {
char stepDesc[TSDB_STEP_DESC_LEN] = {0}; char stepDesc[TSDB_STEP_DESC_LEN] = {0};
dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum); dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
setThreadName("dnodeOpenVnode"); setThreadName("dnodeOpenVnode");
for (int32_t v = 0; v < pThread->vnodeNum; ++v) { for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
......
...@@ -809,7 +809,7 @@ typedef struct SMultiTableMeta { ...@@ -809,7 +809,7 @@ typedef struct SMultiTableMeta {
int32_t contLen; int32_t contLen;
uint8_t compressed; // denote if compressed or not uint8_t compressed; // denote if compressed or not
uint32_t rawLen; // size before compress uint32_t rawLen; // size before compress
uint8_t metaClone; // make meta clone after retrieve meta from mnode uint8_t metaClone; // make meta clone after retrieve meta from mnode
char meta[]; char meta[];
} SMultiTableMeta; } SMultiTableMeta;
......
...@@ -1812,12 +1812,8 @@ static int32_t getVgroupInfoLength(SSTableVgroupMsg* pInfo, int32_t numOfTable) ...@@ -1812,12 +1812,8 @@ static int32_t getVgroupInfoLength(SSTableVgroupMsg* pInfo, int32_t numOfTable)
} }
static char* serializeVgroupInfo(SSTableObj *pTable, char* name, char* msg, SMnodeMsg* pMsgBody, void* handle) { static char* serializeVgroupInfo(SSTableObj *pTable, char* name, char* msg, SMnodeMsg* pMsgBody, void* handle) {
SName sn = {0}; strncpy(msg, name, TSDB_TABLE_FNAME_LEN);
tNameFromString(&sn, name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); msg += TSDB_TABLE_FNAME_LEN;
const char* tableName = tNameGetTableName(&sn);
strncpy(msg, tableName, TSDB_TABLE_NAME_LEN);
msg += TSDB_TABLE_NAME_LEN;
if (pTable->vgHash == NULL) { if (pTable->vgHash == NULL) {
mDebug("msg:%p, app:%p stable:%s, no vgroup exist while get stable vgroup info", pMsgBody, handle, name); mDebug("msg:%p, app:%p stable:%s, no vgroup exist while get stable vgroup info", pMsgBody, handle, name);
......
...@@ -114,7 +114,7 @@ int32_t monStartSystem() { ...@@ -114,7 +114,7 @@ int32_t monStartSystem() {
static void *monThreadFunc(void *param) { static void *monThreadFunc(void *param) {
monDebug("starting to initialize monitor module ..."); monDebug("starting to initialize monitor module ...");
setThreadName("monThrd"); setThreadName("monitor");
while (1) { while (1) {
static int32_t accessTimes = 0; static int32_t accessTimes = 0;
......
...@@ -6599,9 +6599,11 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) { ...@@ -6599,9 +6599,11 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) {
return NULL; return NULL;
} }
SDistinctOperatorInfo* pInfo = pOperator->info; SDistinctOperatorInfo* pInfo = pOperator->info;
SSDataBlock* pRes = pInfo->pRes; SSDataBlock* pRes = pInfo->pRes;
pRes->info.rows = 0; pRes->info.rows = 0;
SSDataBlock* pBlock = NULL; SSDataBlock* pBlock = NULL;
while(1) { while(1) {
......
...@@ -342,6 +342,7 @@ int32_t scriptEnvPoolInit() { ...@@ -342,6 +342,7 @@ int32_t scriptEnvPoolInit() {
env->lua_state = createLuaEnv(); env->lua_state = createLuaEnv();
tdListAppend(pool->scriptEnvs, (void *)(&env)); tdListAppend(pool->scriptEnvs, (void *)(&env));
} }
pool->mSize = size; pool->mSize = size;
pool->cSize = size; pool->cSize = size;
return 0; return 0;
......
...@@ -529,10 +529,9 @@ static void *taosProcessTcpData(void *param) { ...@@ -529,10 +529,9 @@ static void *taosProcessTcpData(void *param) {
SFdObj *pFdObj; SFdObj *pFdObj;
struct epoll_event events[maxEvents]; struct epoll_event events[maxEvents];
SRecvInfo recvInfo; SRecvInfo recvInfo;
char name[16];
memset(name, 0, sizeof(name)); char name[16] = {0};
snprintf(name, 16, "%s-tcpData", pThreadObj->label); snprintf(name, tListLen(name), "%s-tcp", pThreadObj->label);
setThreadName(name); setThreadName(name);
while (1) { while (1) {
......
...@@ -48,8 +48,6 @@ static void *sendRequest(void *param) { ...@@ -48,8 +48,6 @@ static void *sendRequest(void *param) {
SInfo *pInfo = (SInfo *)param; SInfo *pInfo = (SInfo *)param;
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
setThreadName("sendCliReq");
tDebug("thread:%d, start to send request", pInfo->index); tDebug("thread:%d, start to send request", pInfo->index);
while ( pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) { while ( pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) {
......
...@@ -40,9 +40,7 @@ static int terror = 0; ...@@ -40,9 +40,7 @@ static int terror = 0;
static void *sendRequest(void *param) { static void *sendRequest(void *param) {
SInfo *pInfo = (SInfo *)param; SInfo *pInfo = (SInfo *)param;
SRpcMsg rpcMsg, rspMsg; SRpcMsg rpcMsg, rspMsg;
setThreadName("sendSrvReq");
tDebug("thread:%d, start to send request", pInfo->index); tDebug("thread:%d, start to send request", pInfo->index);
while ( pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) { while ( pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) {
......
...@@ -415,7 +415,6 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) { ...@@ -415,7 +415,6 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) {
} }
void *syncRetrieveData(void *param) { void *syncRetrieveData(void *param) {
setThreadName("syncRetrievData");
int64_t rid = (int64_t)param; int64_t rid = (int64_t)param;
SSyncPeer *pPeer = syncAcquirePeer(rid); SSyncPeer *pPeer = syncAcquirePeer(rid);
if (pPeer == NULL) { if (pPeer == NULL) {
......
...@@ -48,8 +48,6 @@ void *sendRequest(void *param) { ...@@ -48,8 +48,6 @@ void *sendRequest(void *param) {
SInfo * pInfo = (SInfo *)param; SInfo * pInfo = (SInfo *)param;
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
setThreadName("sendCliReq");
uDebug("thread:%d, start to send request", pInfo->index); uDebug("thread:%d, start to send request", pInfo->index);
while (pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) { while (pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) {
......
...@@ -178,7 +178,7 @@ void *processWriteQueue(void *param) { ...@@ -178,7 +178,7 @@ void *processWriteQueue(void *param) {
int type; int type;
void *item; void *item;
setThreadName("writeQ"); setThreadName("syncWrite");
while (1) { while (1) {
int ret = taosReadQitem(qhandle, &type, &item); int ret = taosReadQitem(qhandle, &type, &item);
......
...@@ -123,10 +123,9 @@ void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen); ...@@ -123,10 +123,9 @@ void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen);
* @param keyLen * @param keyLen
* @param fp * @param fp
* @param d * @param d
* @param dsize
* @return * @return
*/ */
void* taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void* d, size_t dsize); void* taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void* d);
/** /**
* remove item with the specified key * remove item with the specified key
......
...@@ -294,10 +294,10 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da ...@@ -294,10 +294,10 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
} }
void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen) { void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen) {
return taosHashGetClone(pHashObj, key, keyLen, NULL, NULL, 0); return taosHashGetClone(pHashObj, key, keyLen, NULL, NULL);
} }
void* taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void* d, size_t dsize) { void* taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void* d) {
if (taosHashTableEmpty(pHashObj) || keyLen == 0 || key == NULL) { if (taosHashTableEmpty(pHashObj) || keyLen == 0 || key == NULL) {
return NULL; return NULL;
} }
......
...@@ -132,11 +132,11 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNo ...@@ -132,11 +132,11 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNo
return; return;
} }
pCacheObj->totalSize -= pNode->size; atomic_sub_fetch_64(&pCacheObj->totalSize, pNode->size);
int32_t size = (int32_t)taosHashGetSize(pCacheObj->pHashTable); int32_t size = (int32_t)taosHashGetSize(pCacheObj->pHashTable);
assert(size > 0); assert(size > 0);
uDebug("cache:%s, key:%p, %p is destroyed from cache, size:%dbytes, totalNum:%d size:%" PRId64 "bytes", uDebug("cache:%s, key:%p, %p is destroyed from cache, size:%dbytes, total num:%d size:%" PRId64 "bytes",
pCacheObj->name, pNode->key, pNode->data, pNode->size, size - 1, pCacheObj->totalSize); pCacheObj->name, pNode->key, pNode->data, pNode->size, size - 1, pCacheObj->totalSize);
if (pCacheObj->freeFp) { if (pCacheObj->freeFp) {
...@@ -252,6 +252,7 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v ...@@ -252,6 +252,7 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v
pCacheObj->freeFp(p->data); pCacheObj->freeFp(p->data);
} }
atomic_sub_fetch_64(&pCacheObj->totalSize, p->size);
tfree(p); tfree(p);
} else { } else {
taosAddToTrashcan(pCacheObj, p); taosAddToTrashcan(pCacheObj, p);
...@@ -302,7 +303,7 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen ...@@ -302,7 +303,7 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen
} }
SCacheDataNode* ptNode = NULL; SCacheDataNode* ptNode = NULL;
taosHashGetClone(pCacheObj->pHashTable, key, keyLen, incRefFn, &ptNode, sizeof(void*)); taosHashGetClone(pCacheObj->pHashTable, key, keyLen, incRefFn, &ptNode);
void* pData = (ptNode != NULL)? ptNode->data:NULL; void* pData = (ptNode != NULL)? ptNode->data:NULL;
...@@ -679,7 +680,7 @@ void* taosCacheTimedRefresh(void *handle) { ...@@ -679,7 +680,7 @@ void* taosCacheTimedRefresh(void *handle) {
assert(pCacheArrayList != NULL); assert(pCacheArrayList != NULL);
uDebug("cache refresh thread starts"); uDebug("cache refresh thread starts");
setThreadName("cacheTimedRefre"); setThreadName("cacheRefresh");
const int32_t SLEEP_DURATION = 500; //500 ms const int32_t SLEEP_DURATION = 500; //500 ms
int64_t count = 0; int64_t count = 0;
......
...@@ -178,8 +178,6 @@ static void *taosThreadToOpenNewFile(void *param) { ...@@ -178,8 +178,6 @@ static void *taosThreadToOpenNewFile(void *param) {
char keepName[LOG_FILE_NAME_LEN + 20]; char keepName[LOG_FILE_NAME_LEN + 20];
sprintf(keepName, "%s.%d", tsLogObj.logName, tsLogObj.flag); sprintf(keepName, "%s.%d", tsLogObj.logName, tsLogObj.flag);
setThreadName("openNewFile");
tsLogObj.flag ^= 1; tsLogObj.flag ^= 1;
tsLogObj.lines = 0; tsLogObj.lines = 0;
char name[LOG_FILE_NAME_LEN + 20]; char name[LOG_FILE_NAME_LEN + 20];
...@@ -689,12 +687,9 @@ static void taosWriteLog(SLogBuff *tLogBuff) { ...@@ -689,12 +687,9 @@ static void taosWriteLog(SLogBuff *tLogBuff) {
static void *taosAsyncOutputLog(void *param) { static void *taosAsyncOutputLog(void *param) {
SLogBuff *tLogBuff = (SLogBuff *)param; SLogBuff *tLogBuff = (SLogBuff *)param;
setThreadName("log");
setThreadName("asyncOutputLog");
while (1) { while (1) {
//tsem_wait(&(tLogBuff->buffNotEmpty));
taosMsleep(writeInterval); taosMsleep(writeInterval);
// Polling the buffer // Polling the buffer
......
...@@ -122,7 +122,9 @@ void *taosProcessSchedQueue(void *scheduler) { ...@@ -122,7 +122,9 @@ void *taosProcessSchedQueue(void *scheduler) {
SSchedQueue *pSched = (SSchedQueue *)scheduler; SSchedQueue *pSched = (SSchedQueue *)scheduler;
int ret = 0; int ret = 0;
setThreadName("schedQ"); char name[16] = {0};
snprintf(name, tListLen(name), "%s-taskQ", pSched->label);
setThreadName(name);
while (1) { while (1) {
if ((ret = tsem_wait(&pSched->fullSem)) != 0) { if ((ret = tsem_wait(&pSched->fullSem)) != 0) {
......
...@@ -35,8 +35,6 @@ void *addRef(void *param) { ...@@ -35,8 +35,6 @@ void *addRef(void *param) {
SRefSpace *pSpace = (SRefSpace *)param; SRefSpace *pSpace = (SRefSpace *)param;
int id; int id;
setThreadName("addRef");
for (int i=0; i < pSpace->steps; ++i) { for (int i=0; i < pSpace->steps; ++i) {
printf("a"); printf("a");
id = random() % pSpace->refNum; id = random() % pSpace->refNum;
...@@ -54,8 +52,6 @@ void *removeRef(void *param) { ...@@ -54,8 +52,6 @@ void *removeRef(void *param) {
SRefSpace *pSpace = (SRefSpace *)param; SRefSpace *pSpace = (SRefSpace *)param;
int id, code; int id, code;
setThreadName("removeRef");
for (int i=0; i < pSpace->steps; ++i) { for (int i=0; i < pSpace->steps; ++i) {
printf("d"); printf("d");
id = random() % pSpace->refNum; id = random() % pSpace->refNum;
...@@ -74,8 +70,6 @@ void *acquireRelease(void *param) { ...@@ -74,8 +70,6 @@ void *acquireRelease(void *param) {
SRefSpace *pSpace = (SRefSpace *)param; SRefSpace *pSpace = (SRefSpace *)param;
int id; int id;
setThreadName("acquireRelease");
for (int i=0; i < pSpace->steps; ++i) { for (int i=0; i < pSpace->steps; ++i) {
printf("a"); printf("a");
...@@ -97,8 +91,6 @@ void myfree(void *p) { ...@@ -97,8 +91,6 @@ void myfree(void *p) {
void *openRefSpace(void *param) { void *openRefSpace(void *param) {
SRefSpace *pSpace = (SRefSpace *)param; SRefSpace *pSpace = (SRefSpace *)param;
setThreadName("openRefSpace");
printf("c"); printf("c");
pSpace->rsetId = taosOpenRef(50, myfree); pSpace->rsetId = taosOpenRef(50, myfree);
......
...@@ -93,7 +93,7 @@ static void vnodeIncRef(void *ptNode) { ...@@ -93,7 +93,7 @@ static void vnodeIncRef(void *ptNode) {
void *vnodeAcquire(int32_t vgId) { void *vnodeAcquire(int32_t vgId) {
SVnodeObj *pVnode = NULL; SVnodeObj *pVnode = NULL;
if (tsVnodesHash != NULL) { if (tsVnodesHash != NULL) {
taosHashGetClone(tsVnodesHash, &vgId, sizeof(int32_t), vnodeIncRef, &pVnode, sizeof(void *)); taosHashGetClone(tsVnodesHash, &vgId, sizeof(int32_t), vnodeIncRef, &pVnode);
} }
if (pVnode == NULL) { if (pVnode == NULL) {
......
...@@ -25,7 +25,7 @@ ...@@ -25,7 +25,7 @@
typedef enum { typedef enum {
VNODE_WORKER_ACTION_CLEANUP, VNODE_WORKER_ACTION_CLEANUP,
VNODE_WORKER_ACTION_DESTROUY VNODE_WORKER_ACTION_DESTROY
} EVMWorkerAction; } EVMWorkerAction;
typedef struct { typedef struct {
...@@ -155,7 +155,7 @@ int32_t vnodeCleanupInMWorker(SVnodeObj *pVnode) { ...@@ -155,7 +155,7 @@ int32_t vnodeCleanupInMWorker(SVnodeObj *pVnode) {
int32_t vnodeDestroyInMWorker(SVnodeObj *pVnode) { int32_t vnodeDestroyInMWorker(SVnodeObj *pVnode) {
vTrace("vgId:%d, will destroy in vmworker", pVnode->vgId); vTrace("vgId:%d, will destroy in vmworker", pVnode->vgId);
return vnodeWriteIntoMWorker(pVnode, VNODE_WORKER_ACTION_DESTROUY, NULL); return vnodeWriteIntoMWorker(pVnode, VNODE_WORKER_ACTION_DESTROY, NULL);
} }
static void vnodeFreeMWorkerMsg(SVMWorkerMsg *pMsg) { static void vnodeFreeMWorkerMsg(SVMWorkerMsg *pMsg) {
...@@ -179,7 +179,7 @@ static void vnodeProcessMWorkerMsg(SVMWorkerMsg *pMsg) { ...@@ -179,7 +179,7 @@ static void vnodeProcessMWorkerMsg(SVMWorkerMsg *pMsg) {
case VNODE_WORKER_ACTION_CLEANUP: case VNODE_WORKER_ACTION_CLEANUP:
vnodeCleanUp(pMsg->pVnode); vnodeCleanUp(pMsg->pVnode);
break; break;
case VNODE_WORKER_ACTION_DESTROUY: case VNODE_WORKER_ACTION_DESTROY:
vnodeDestroy(pMsg->pVnode); vnodeDestroy(pMsg->pVnode);
break; break;
default: default:
......
...@@ -192,7 +192,7 @@ static void walFsyncAll() { ...@@ -192,7 +192,7 @@ static void walFsyncAll() {
static void *walThreadFunc(void *param) { static void *walThreadFunc(void *param) {
int stop = 0; int stop = 0;
setThreadName("walThrd"); setThreadName("wal");
while (1) { while (1) {
walUpdateSeq(); walUpdateSeq();
walFsyncAll(); walFsyncAll();
......
...@@ -1030,8 +1030,8 @@ int main(int argc, char *argv[]) { ...@@ -1030,8 +1030,8 @@ int main(int argc, char *argv[]) {
printf("server info: %s\n", info); printf("server info: %s\n", info);
info = taos_get_client_info(taos); info = taos_get_client_info(taos);
printf("client info: %s\n", info); printf("client info: %s\n", info);
printf("************ verify shemaless *************\n"); printf("************ verify schema-less *************\n");
verify_schema_less(taos); verify_schema_less(taos);
......
...@@ -77,6 +77,8 @@ class TDTestCase: ...@@ -77,6 +77,8 @@ class TDTestCase:
"sth,t1=4i64,t2=5f64,t4=5f64,ID=\"childtable\" c1=3i64,c3=L\"passitagin_stf\",c2=false,c5=5f64,c6=7u64 1626006933641ms", "sth,t1=4i64,t2=5f64,t4=5f64,ID=\"childtable\" c1=3i64,c3=L\"passitagin_stf\",c2=false,c5=5f64,c6=7u64 1626006933641ms",
"sth,t1=4i64,t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin_stf\",c2=false,c5=5f64,c6=7u64 1626006933654ms" "sth,t1=4i64,t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin_stf\",c2=false,c5=5f64,c6=7u64 1626006933654ms"
]) ])
tdSql.execute('reset query cache')
tdSql.query('select tbname, * from sth') tdSql.query('select tbname, * from sth')
tdSql.checkRows(2) tdSql.checkRows(2)
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
"password": "taosdata", "password": "taosdata",
"confirm_parameter_prompt": "no", "confirm_parameter_prompt": "no",
"databases": "db", "databases": "db",
"query_times":3, "query_times": 3,
"specified_table_query": { "specified_table_query": {
"query_interval": 0, "query_interval": 0,
"concurrent": 1, "concurrent": 1,
...@@ -34,4 +34,4 @@ ...@@ -34,4 +34,4 @@
] ]
} }
} }
\ No newline at end of file
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
"password": "taosdata", "password": "taosdata",
"confirm_parameter_prompt": "no", "confirm_parameter_prompt": "no",
"databases": "db", "databases": "db",
"query_times":3, "query_times": 3,
"specified_table_query": { "specified_table_query": {
"query_interval": 0, "query_interval": 0,
"concurrent": 1, "concurrent": 1,
...@@ -34,4 +34,4 @@ ...@@ -34,4 +34,4 @@
] ]
} }
} }
\ No newline at end of file
...@@ -14,7 +14,7 @@ while [ -n "$PID" ]; do ...@@ -14,7 +14,7 @@ while [ -n "$PID" ]; do
echo kill -9 $PID echo kill -9 $PID
pkill -9 taosd pkill -9 taosd
echo "Killing processes locking on port 6030" echo "Killing processes locking on port 6030"
if [[ "$OS_TYPE" != "Darwin" ]]; then if [ "$OS_TYPE" != "Darwin" ]; then
fuser -k -n tcp 6030 fuser -k -n tcp 6030
else else
lsof -nti:6030 | xargs kill -9 lsof -nti:6030 | xargs kill -9
...@@ -26,7 +26,7 @@ PID=`ps -ef|grep -w tarbitrator | grep -v grep | awk '{print $2}'` ...@@ -26,7 +26,7 @@ PID=`ps -ef|grep -w tarbitrator | grep -v grep | awk '{print $2}'`
while [ -n "$PID" ]; do while [ -n "$PID" ]; do
echo kill -9 $PID echo kill -9 $PID
pkill -9 tarbitrator pkill -9 tarbitrator
if [[ "$OS_TYPE" != "Darwin" ]]; then if [ "$OS_TYPE" != "Darwin" ]; then
fuser -k -n tcp 6040 fuser -k -n tcp 6040
else else
lsof -nti:6040 | xargs kill -9 lsof -nti:6040 | xargs kill -9
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册