未验证 提交 8cf197fa 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #6882 from taosdata/feature/query

[td-5250]<enhance>: improve the performance of multi-table meta bulk …
......@@ -309,10 +309,9 @@ bool hasMoreClauseToTry(SSqlObj* pSql);
void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeMeta);
void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp);
void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp);
int tscSetMgmtEpSetFromCfg(const char *first, const char *second, SRpcCorEpSet *corEpSet);
int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVgroupNameList, SArray* pUdfList, __async_cb_func_t fp);
int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVgroupNameList, SArray* pUdfList, __async_cb_func_t fp, bool metaClone);
int tscTransferTableNameList(SSqlObj *pSql, const char *pNameList, int32_t length, SArray* pNameArray);
......
......@@ -3106,7 +3106,7 @@ int32_t doGetColumnIndexByName(SStrToken* pToken, SQueryInfo* pQueryInfo, SColum
if (isTablenameToken(pToken)) {
pIndex->columnIndex = TSDB_TBNAME_COLUMN_INDEX;
} else if (strlen(DEFAULT_PRIMARY_TIMESTAMP_COL_NAME) == pToken->n &&
} else if (strlen(DEFAULT_PRIMARY_TIMESTAMP_COL_NAME) == pToken->n &&
strncasecmp(pToken->z, DEFAULT_PRIMARY_TIMESTAMP_COL_NAME, pToken->n) == 0) {
pIndex->columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX; // just make runtime happy, need fix java test case InsertSpecialCharacterJniTest
} else if (pToken->n == 0) {
......@@ -3708,9 +3708,9 @@ static int32_t doExtractColumnFilterInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo,
* make memory sanitizer happy;
*/
if (pRight->value.nLen == 0) {
bufLen = pRight->value.nLen + 2;
bufLen = pRight->value.nLen + 2;
} else {
bufLen = pRight->value.nLen + 1;
bufLen = pRight->value.nLen + 1;
}
}
......@@ -5150,7 +5150,7 @@ int32_t validateWhereNode(SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SSqlObj* pSq
int32_t type = 0;
if ((ret = getQueryCondExpr(&pSql->cmd, pQueryInfo, pExpr, &condExpr, &type, (*pExpr)->tokenId)) != TSDB_CODE_SUCCESS) {
goto PARSE_WHERE_EXIT;
goto PARSE_WHERE_EXIT;
}
tSqlExprCompact(pExpr);
......@@ -5160,7 +5160,7 @@ int32_t validateWhereNode(SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SSqlObj* pSq
// 1. check if it is a join query
if ((ret = validateJoinExpr(&pSql->cmd, pQueryInfo, &condExpr)) != TSDB_CODE_SUCCESS) {
goto PARSE_WHERE_EXIT;
goto PARSE_WHERE_EXIT;
}
// 2. get the query time range
......@@ -8089,7 +8089,7 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) {
// avoid mem leak, may should update pTableMeta
const char* px = tNameGetTableName(pname);
if (taosHashGet(pCmd->pTableMetaMap, px, strlen(px)) == NULL) {
STableMeta* pMeta = tscTableMetaDup(pTableMeta);
STableMeta* pMeta = tscTableMetaDup(pTableMeta);
STableMetaVgroupInfo p = { .pTableMeta = pMeta, .pVgroupInfo = NULL};
taosHashPut(pCmd->pTableMetaMap, px, strlen(px), &p, sizeof(STableMetaVgroupInfo));
}
......@@ -8133,7 +8133,7 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) {
// load the table meta for a given table name list
if (taosArrayGetSize(plist) > 0 || taosArrayGetSize(pVgroupList) > 0 || (pQueryInfo->pUdfInfo && taosArrayGetSize(pQueryInfo->pUdfInfo) > 0)) {
code = getMultiTableMetaFromMnode(pSql, plist, pVgroupList, pQueryInfo->pUdfInfo, tscTableMetaCallBack);
code = getMultiTableMetaFromMnode(pSql, plist, pVgroupList, pQueryInfo->pUdfInfo, tscTableMetaCallBack, true);
}
_end:
......@@ -8630,7 +8630,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
}
taosArrayAddBatch(pQueryInfo->exprList1, (void*) p, numOfExpr);
tfree(p);
tfree(p);
}
#if 0
......
......@@ -1955,7 +1955,6 @@ static int32_t tableMetaMsgConvert(STableMetaMsg* pMetaMsg) {
pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId);
pMetaMsg->uid = htobe64(pMetaMsg->uid);
// pMetaMsg->contLen = htonl(pMetaMsg->contLen);
pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns);
if ((pMetaMsg->tableType != TSDB_SUPER_TABLE) &&
......@@ -2020,13 +2019,14 @@ static void doAddTableMetaToLocalBuf(STableMeta* pTableMeta, STableMetaMsg* pMet
int32_t len = (int32_t) strnlen(pTableMeta->sTableName, TSDB_TABLE_FNAME_LEN);
// The super tableMeta already exists, create it according to tableMeta and add it to hash map
STableMeta* pSupTableMeta = createSuperTableMeta(pMetaMsg);
if (updateSTable) {
STableMeta* pSupTableMeta = createSuperTableMeta(pMetaMsg);
uint32_t size = tscGetTableMetaSize(pSupTableMeta);
int32_t code = taosHashPut(tscTableMetaInfo, pTableMeta->sTableName, len, pSupTableMeta, size);
assert(code == TSDB_CODE_SUCCESS);
uint32_t size = tscGetTableMetaSize(pSupTableMeta);
int32_t code = taosHashPut(tscTableMetaInfo, pTableMeta->sTableName, len, pSupTableMeta, size);
assert(code == TSDB_CODE_SUCCESS);
tfree(pSupTableMeta);
tfree(pSupTableMeta);
}
CChildTableMeta* cMeta = tscCreateChildMeta(pTableMeta);
taosHashPut(tscTableMetaInfo, pMetaMsg->tableFname, strlen(pMetaMsg->tableFname), cMeta, sizeof(CChildTableMeta));
......@@ -2183,10 +2183,10 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) {
}
SSqlCmd *pParentCmd = &pParentSql->cmd;
SHashObj *pSet = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
SHashObj *pSet = taosHashInit(pMultiMeta->numOfVgroup, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
char* pMsg = pMultiMeta->meta;
char* buf = NULL;
char* pMsg = pMultiMeta->meta;
if (pMultiMeta->compressed) {
buf = malloc(pMultiMeta->rawLen - sizeof(SMultiTableMeta));
int32_t len = tsDecompressString(pMultiMeta->meta, pMultiMeta->contLen - sizeof(SMultiTableMeta), 1,
......@@ -2207,6 +2207,7 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) {
return code;
}
bool freeMeta = false;
STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg);
if (!tIsValidSchema(pTableMeta->schema, pTableMeta->tableInfo.numOfColumns, pTableMeta->tableInfo.numOfTags)) {
tscError("0x%"PRIx64" invalid table meta from mnode, name:%s", pSql->self, pMetaMsg->tableFname);
......@@ -2221,22 +2222,27 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) {
SName sn = {0};
tNameFromString(&sn, pMetaMsg->tableFname, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
const char* tableName = tNameGetTableName(&sn);
size_t keyLen = strlen(tableName);
if (pMultiMeta->metaClone == 1 || pTableMeta->tableType == TSDB_SUPER_TABLE) {
STableMetaVgroupInfo p = {.pTableMeta = pTableMeta,};
STableMetaVgroupInfo p = {.pTableMeta = pTableMeta,};
taosHashPut(pParentCmd->pTableMetaMap, tableName, keyLen, &p, sizeof(STableMetaVgroupInfo));
const char* tableName = tNameGetTableName(&sn);
size_t keyLen = strlen(tableName);
taosHashPut(pParentCmd->pTableMetaMap, tableName, keyLen, &p, sizeof(STableMetaVgroupInfo));
} else {
freeMeta = true;
}
bool addToBuf = false;
if (taosHashGet(pSet, &pMetaMsg->uid, sizeof(pMetaMsg->uid)) == NULL) {
addToBuf = true;
taosHashPut(pSet, &pMetaMsg->uid, sizeof(pMetaMsg->uid), "", 0);
// for each super table, only update meta information once
bool updateStableMeta = false;
if (pTableMeta->tableType == TSDB_CHILD_TABLE && taosHashGet(pSet, &pMetaMsg->suid, sizeof(pMetaMsg->suid)) == NULL) {
updateStableMeta = true;
taosHashPut(pSet, &pTableMeta->suid, sizeof(pMetaMsg->suid), "", 0);
}
// create the tableMeta and add it into the TableMeta map
doAddTableMetaToLocalBuf(pTableMeta, pMetaMsg, addToBuf);
doAddTableMetaToLocalBuf(pTableMeta, pMetaMsg, updateStableMeta);
// if the vgroup is not updated in current process, update it.
// for each vgroup, only update the information once.
int64_t vgId = pMetaMsg->vgroup.vgId;
if (pTableMeta->tableType != TSDB_SUPER_TABLE && taosHashGet(pSet, &vgId, sizeof(vgId)) == NULL) {
doUpdateVgroupInfo(pTableMeta, &pMetaMsg->vgroup);
......@@ -2244,6 +2250,9 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) {
}
pMsg += pMetaMsg->contLen;
if (freeMeta) {
tfree(pTableMeta);
}
}
for(int32_t i = 0; i < pMultiMeta->numOfVgroup; ++i) {
......@@ -2683,7 +2692,7 @@ static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaIn
return code;
}
int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVgroupNameList, SArray* pUdfList, __async_cb_func_t fp) {
int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVgroupNameList, SArray* pUdfList, __async_cb_func_t fp, bool metaClone) {
SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
if (NULL == pNew) {
tscError("0x%"PRIx64" failed to allocate sqlobj to get multiple table meta", pSql->self);
......@@ -2706,6 +2715,7 @@ int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVg
}
SMultiTableInfoMsg* pInfo = (SMultiTableInfoMsg*) pNew->cmd.payload;
pInfo->metaClone = metaClone? 1:0;
pInfo->numOfTables = htonl((uint32_t) taosArrayGetSize(pNameList));
pInfo->numOfVgroups = htonl((uint32_t) taosArrayGetSize(pVgroupNameList));
pInfo->numOfUdfs = htonl(numOfUdf);
......
......@@ -945,28 +945,35 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
return TSDB_CODE_TSC_DISCONNECTED;
}
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
pSql->pTscObj = taos;
pSql->signature = pSql;
pSql->cmd.pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
int32_t length = (int32_t)strlen(tableNameList);
if (length == 0) {
return TSDB_CODE_SUCCESS;
}
if (length > MAX_TABLE_NAME_LENGTH) {
tscError("0x%"PRIx64" tableNameList too long, length:%d, maximum allowed:%d", pSql->self, length, MAX_TABLE_NAME_LENGTH);
tscFreeSqlObj(pSql);
tscError("tableNameList too long, length:%d, maximum allowed:%d", length, MAX_TABLE_NAME_LENGTH);
return TSDB_CODE_TSC_INVALID_OPERATION;
}
char *str = calloc(1, length + 1);
if (str == NULL) {
tscError("0x%"PRIx64" failed to allocate sql string buffer", pSql->self);
tscFreeSqlObj(pSql);
tscError("failed to allocate sql string buffer, size:%d", length);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
strtolower(str, tableNameList);
SArray* plist = taosArrayInit(4, POINTER_BYTES);
SArray* vgroupList = taosArrayInit(4, POINTER_BYTES);
if (plist == NULL || vgroupList == NULL) {
tfree(str);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
tscAllocPayload(&pSql->cmd, 1024);
pSql->pTscObj = taos;
pSql->signature = pSql;
int32_t code = (uint8_t) tscTransferTableNameList(pSql, str, length, plist);
free(str);
......@@ -976,10 +983,11 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
return code;
}
pSql->cmd.pTableMetaMap = taosHashInit(taosArrayGetSize(plist), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
registerSqlObj(pSql);
tscDebug("0x%"PRIx64" load multiple table meta, tableNameList: %s pObj:%p", pSql->self, tableNameList, pObj);
code = getMultiTableMetaFromMnode(pSql, plist, vgroupList, NULL, loadMultiTableMetaCallback);
code = getMultiTableMetaFromMnode(pSql, plist, vgroupList, NULL, loadMultiTableMetaCallback, false);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
code = TSDB_CODE_SUCCESS;
}
......
......@@ -2296,6 +2296,7 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
SArray* pColList = pNewQueryInfo->colList;
pNewQueryInfo->colList = NULL;
pNewQueryInfo->fillType = TSDB_FILL_NONE;
tscClearSubqueryInfo(pCmd);
tscFreeSqlResult(pSql);
......
......@@ -4574,8 +4574,13 @@ static int32_t doAddTableName(char* nextStr, char** str, SArray* pNameArray, SSq
strncpy(tablename, *str, TSDB_TABLE_FNAME_LEN);
len = (int32_t) strlen(tablename);
} else {
memcpy(tablename, *str, nextStr - (*str));
len = (int32_t)(nextStr - (*str));
if (len >= TSDB_TABLE_NAME_LEN) {
sprintf(pCmd->payload, "table name too long");
return TSDB_CODE_TSC_INVALID_OPERATION;
}
memcpy(tablename, *str, nextStr - (*str));
tablename[len] = '\0';
}
......@@ -4587,9 +4592,8 @@ static int32_t doAddTableName(char* nextStr, char** str, SArray* pNameArray, SSq
// Check if the table name available or not
if (tscValidateName(&sToken) != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
sprintf(pCmd->payload, "table name is invalid");
return code;
return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
}
SName name = {0};
......@@ -4605,6 +4609,15 @@ static int32_t doAddTableName(char* nextStr, char** str, SArray* pNameArray, SSq
return TSDB_CODE_SUCCESS;
}
int32_t nameComparFn(const void* n1, const void* n2) {
int32_t ret = strcmp(*(char**)n1, *(char**)n2);
if (ret == 0) {
return 0;
} else {
return ret > 0? 1:-1;
}
}
int tscTransferTableNameList(SSqlObj *pSql, const char *pNameList, int32_t length, SArray* pNameArray) {
SSqlCmd *pCmd = &pSql->cmd;
......@@ -4640,12 +4653,44 @@ int tscTransferTableNameList(SSqlObj *pSql, const char *pNameList, int32_t lengt
}
}
if (taosArrayGetSize(pNameArray) > TSDB_MULTI_TABLEMETA_MAX_NUM) {
size_t len = taosArrayGetSize(pNameArray);
if (len == 1) {
return TSDB_CODE_SUCCESS;
}
if (len > TSDB_MULTI_TABLEMETA_MAX_NUM) {
code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
sprintf(pCmd->payload, "tables over the max number");
return code;
}
taosArraySort(pNameArray, nameComparFn);
int32_t pos = 0;
for(int32_t i = 1; i < len; ++i) {
char** p1 = taosArrayGet(pNameArray, pos);
char** p2 = taosArrayGet(pNameArray, i);
if (strcmp(*p1, *p2) == 0) {
// do nothing
} else {
if (pos + 1 != i) {
char* p = taosArrayGetP(pNameArray, pos + 1);
tfree(p);
taosArraySet(pNameArray, pos + 1, p2);
pos += 1;
} else {
pos += 1;
}
}
}
for(int32_t i = pos + 1; i < pNameArray->size; ++i) {
char* p = taosArrayGetP(pNameArray, i);
tfree(p);
}
pNameArray->size = pos + 1;
return TSDB_CODE_SUCCESS;
}
......
Subproject commit 7a26c432f8b4203e42344ff3290b9b9b01b983d5
Subproject commit b8f76da4a708d158ec3cc4b844571dc4414e36b4
Subproject commit 23cf0b933510af5e87b61bace0f6ff2f06d8eaac
Subproject commit 3530c6df097134a410bacec6b3cd013ef38a61aa
Subproject commit b62a26ecc164a310104df57691691b237e091c89
Subproject commit ce5201014136503d34fecbd56494b67b4961056c
......@@ -751,6 +751,7 @@ typedef struct {
} STableInfoMsg;
typedef struct {
uint8_t metaClone; // create local clone of the cached table meta
int32_t numOfVgroups;
int32_t numOfTables;
int32_t numOfUdfs;
......@@ -808,6 +809,7 @@ typedef struct SMultiTableMeta {
int32_t contLen;
uint8_t compressed; // denote if compressed or not
uint32_t rawLen; // size before compress
uint8_t metaClone; // make meta clone after retrieve meta from mnode
char meta[];
} SMultiTableMeta;
......
......@@ -3091,6 +3091,7 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
int32_t len = tsCompressString(pMultiMeta->meta, dataLen, 1, tmp + sizeof(SMultiTableMeta), (int32_t)dataLen + 2,
ONE_STAGE_COMP, NULL, 0);
pMultiMeta->metaClone = pInfo->metaClone;
pMultiMeta->rawLen = pMultiMeta->contLen;
if (len == -1 || len >= dataLen + 2) { // compress failed, do not compress this binary data
pMultiMeta->compressed = 0;
......
......@@ -160,7 +160,8 @@ tSqlExpr *tSqlExprCreateIdValue(SStrToken *pToken, int32_t optrType) {
pSqlExpr->type = SQL_NODE_VALUE;
pSqlExpr->flags |= 1 << EXPR_FLAG_NS_TIMESTAMP;
} else if (optrType == TK_VARIABLE) {
// use nanosecond by default TODO set value after getting database precision
// use nanosecond by default
// TODO set value after getting database precision
int32_t ret = parseAbsoluteDuration(pToken->z, pToken->n, &pSqlExpr->value.i64, TSDB_TIME_PRECISION_NANO);
if (ret != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
......
......@@ -1132,4 +1132,19 @@ if $data92 != t1 then
return -1
endi
print =========================>TD-5190
sql select stddev(f1) from st1 where ts>'2021-07-01 1:1:1' and ts<'2021-07-30 00:00:00' interval(1d) fill(NULL);
if $rows != 29 then
return -1
endi
if $data00 != @21-07-01 00:00:00.000@ then
return -1
endi
if $data01 != NULL then
return -1
endi
sql select derivative(test_column_alias_name, 1s, 0) from (select avg(k) test_column_alias_name from t1 interval(1s));
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册