diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 804e8a2920a1a45c8d24edebfa9f4e91eefd9355..6de9e8f15a1888ee92490c642ffa96825d1680cf 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -302,8 +302,9 @@ void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp); void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows); void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp); int tscSetMgmtEpSetFromCfg(const char *first, const char *second, SRpcCorEpSet *corEpSet); -int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVgroupList); -int tscTransferTableNameList(SSqlObj *pSql, const char *pNameList, int32_t length); +int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVgroupNameList, __async_cb_func_t fp); + +int tscTransferTableNameList(SSqlObj *pSql, const char *pNameList, int32_t length, SArray* pNameArray); bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 3e203bb7155ce8261e99a0550c53a53bcc79a33d..c1a85b6d27aebbc34da29e8ea207b2b62c50e728 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -492,7 +492,7 @@ char *tscGetErrorMsgPayload(SSqlCmd *pCmd); int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *sql); int32_t tscSQLSyntaxErrMsg(char* msg, const char* additionalInfo, const char* sql); -int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo); +int32_t tscValidateSqlInfo(SSqlObj *pSql, struct SSqlInfo *pInfo); extern int32_t sentinel; extern SHashObj *tscVgroupMap; diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 68e41310c6576465d2f75fe39c989f5a86a43239..47798f4e339b6f84231ba4f83ef7dfe702be1677 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -482,28 +482,9 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { return; } } else { // stream computing - SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd); - STableMetaInfo *pTableMetaInfo = pQueryInfo->pTableMetaInfo[0]; + tscDebug("0x%"PRIx64" stream:%p meta is updated, start new query, command:%d", pSql->self, pSql->pStream, pCmd->command); - code = tscGetTableMeta(pSql, pTableMetaInfo); - if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { - taosReleaseRef(tscObjRef, pSql->self); - return; - } else if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - - if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { - code = tscGetSTableVgroupInfo(pSql, pQueryInfo); - if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { - taosReleaseRef(tscObjRef, pSql->self); - return; - } else if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - } - - tscDebug("0x%"PRIx64" stream:%p meta is updated, start new query, command:%d", pSql->self, pSql->pStream, pSql->cmd.command); + SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd); if (tscNumOfExprs(pQueryInfo) == 0) { tsParseSql(pSql, false); } diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index a305c3cf82923c8495a1491c023fa9997085f189..cc89fd6220cfc24faf36157ba7541aa497f2e523 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -1014,12 +1014,16 @@ int validateTableName(char *tblName, int len, SStrToken* psTblToken) { } static int32_t validateDataSource(SSqlCmd *pCmd, int32_t type, const char *sql) { - if (pCmd->insertParam.insertType != 0 && !TSDB_QUERY_HAS_TYPE(pCmd->insertParam.insertType, type)) { - return tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES and FILE are not allowed to mix up", sql); + uint32_t *insertType = &pCmd->insertParam.insertType; + if (*insertType == TSDB_QUERY_TYPE_STMT_INSERT && type == TSDB_QUERY_TYPE_INSERT) { + return TSDB_CODE_SUCCESS; } + if ((*insertType) != 0 && (*insertType) != type) { + return tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES and FILE are not allowed to mixed up", sql); + } - pCmd->insertParam.insertType = type; + *insertType = type; return TSDB_CODE_SUCCESS; } @@ -1368,18 +1372,18 @@ int tsParseSql(SSqlObj *pSql, bool initial) { } } } else { - SSqlInfo SQLInfo = qSqlParse(pSql->sqlstr); - ret = tscToSQLCmd(pSql, &SQLInfo); - if (ret == TSDB_CODE_TSC_INVALID_OPERATION && pSql->parseRetry < 1 && SQLInfo.type == TSDB_SQL_SELECT) { + SSqlInfo sqlInfo = qSqlParse(pSql->sqlstr); + ret = tscValidateSqlInfo(pSql, &sqlInfo); + if (ret == TSDB_CODE_TSC_INVALID_OPERATION && pSql->parseRetry < 1 && sqlInfo.type == TSDB_SQL_SELECT) { tscDebug("0x%"PRIx64 " parse query sql statement failed, code:%s, clear meta cache and retry ", pSql->self, tstrerror(ret)); tscResetSqlCmd(pCmd, true); pSql->parseRetry++; - ret = tscToSQLCmd(pSql, &SQLInfo); + ret = tscValidateSqlInfo(pSql, &sqlInfo); } - SqlInfoDestroy(&SQLInfo); + SqlInfoDestroy(&sqlInfo); } /* diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c index b9950b0776284f202830459b46a0c650a8e81768..4e6285c844ded6dee59658783f7e4dc5c9fe0ecb 100644 --- a/src/client/src/tscPrepare.c +++ b/src/client/src/tscPrepare.c @@ -1219,6 +1219,7 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) { pStmt->taos = pObj; SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); + if (pSql == NULL) { free(pStmt); terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; @@ -1613,9 +1614,10 @@ int taos_stmt_execute(TAOS_STMT* stmt) { ret = TSDB_CODE_TSC_OUT_OF_MEMORY; } else { if (pStmt->pSql != NULL) { - taos_free_result(pStmt->pSql); + tscFreeSqlObj(pStmt->pSql); pStmt->pSql = NULL; } + pStmt->pSql = taos_query((TAOS*)pStmt->taos, sql); ret = taos_errno(pStmt->pSql); free(sql); diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index b3ece963bdacf628101c40a1bebb143b135d8f52..d9be43b412342ec810cb07b1a29d3beaaa75cd89 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -279,7 +279,7 @@ static int32_t normalizeVarDataTypeLength(SSqlCmd* pCmd) { return TSDB_CODE_SUCCESS; } -int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { +int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { if (pInfo == NULL || pSql == NULL) { return TSDB_CODE_TSC_APP_ERROR; } @@ -7152,6 +7152,7 @@ static int32_t getTableNameFromSubquery(SSqlNode* pSqlNode, SArray* tableNameLis return TSDB_CODE_SUCCESS; } +void tscTableMetaCallBack(void *param, TAOS_RES *res, int code); static void freeElem(void* p) { tfree(*(char**)p); } @@ -7244,7 +7245,7 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) { // load the table meta for a given table name list if (taosArrayGetSize(plist) > 0 || taosArrayGetSize(pVgroupList) > 0) { - code = getMultiTableMetaFromMnode(pSql, plist, pVgroupList); + code = getMultiTableMetaFromMnode(pSql, plist, pVgroupList, tscTableMetaCallBack); } _end: @@ -7253,7 +7254,7 @@ _end: } if (pVgroupList != NULL) { - taosArrayDestroy(pVgroupList); + taosArrayDestroyEx(pVgroupList, freeElem); } if (tableNameList != NULL) { @@ -7421,10 +7422,6 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf const char* msg1 = "point interpolation query needs timestamp"; const char* msg2 = "too many tables in from clause"; const char* msg3 = "start(end) time of query range required or time range too large"; - // const char* msg5 = "too many columns in selection clause"; - // const char* msg6 = "too many tables in from clause"; - // const char* msg7 = "invalid table alias name"; - // const char* msg8 = "alias name too long"; const char* msg9 = "only tag query not compatible with normal column filter"; int32_t code = TSDB_CODE_SUCCESS; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index ddac9f3ba4c19efd71836cc3b02201fb79941b1e..a3032a342e1d97278e38033092f05441209c3eb1 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -2061,11 +2061,22 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) { char *pMsg = pRes->pRsp + sizeof(SSTableVgroupRspMsg); SSqlCmd* pCmd = &parent->cmd; + SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd); + for(int32_t i = 0; i < pStableVgroup->numOfTables; ++i) { - STableMetaInfo *pInfo = tscGetTableMetaInfoFromCmd(pCmd, i); + char* name = pMsg; + pMsg += TSDB_TABLE_NAME_LEN; - SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *) pMsg; - pVgroupMsg->numOfVgroups = htonl(pVgroupMsg->numOfVgroups); + STableMetaInfo *pInfo = NULL; + for(int32_t j = 0; j < pQueryInfo->numOfTables; ++j) { + STableMetaInfo *pInfo1 = tscGetTableMetaInfoFromCmd(pCmd, j); + if (strcmp(name, tNameGetTableName(&pInfo1->name)) != 0) { + continue; + } + + pInfo = pInfo1; + break; + } int32_t size = 0; pInfo->vgroupList = createVgroupInfoFromMsg(pMsg, &size, pSql->self); @@ -2407,7 +2418,7 @@ static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaIn return code; } -int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVgroupNameList) { +int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVgroupNameList, __async_cb_func_t fp) { SSqlObj *pNew = calloc(1, sizeof(SSqlObj)); if (NULL == pNew) { tscError("0x%"PRIx64" failed to allocate sqlobj to get multiple table meta", pSql->self); @@ -2463,7 +2474,7 @@ int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVg tscDebug("0x%"PRIx64" new pSqlObj:0x%"PRIx64" to get %d tableMeta, vgroupInfo:%d, msg size:%d", pSql->self, pNew->self, numOfTable, numOfVgroupList, pNew->cmd.payloadLen); - pNew->fp = tscTableMetaCallBack; + pNew->fp = fp; pNew->param = (void *)pSql->self; tscDebug("0x%"PRIx64" metaRid from %" PRId64 " to %" PRId64 , pSql->self, pSql->metaRid, pNew->self); diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 8fe46d9b65beeb977ac4d88789018fe52c488891..1ffa6416b221fdcaf7d8f1ba2a13d6cce14ca027 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -872,15 +872,11 @@ int taos_validate_sql(TAOS *taos, const char *sql) { SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); - pSql->pTscObj = taos; + pSql->pTscObj = taos; pSql->signature = pSql; - - SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; - pRes->numOfTotal = 0; - pRes->numOfClauseTotal = 0; - + pCmd->resColumnId = TSDB_RES_COL_ID; tscDebug("0x%"PRIx64" Valid SQL: %s pObj:%p", pSql->self, sql, pObj); @@ -900,7 +896,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) { strtolower(pSql->sqlstr, sql); - pCmd->curSql = NULL; +// pCmd->curSql = NULL; if (NULL != pCmd->insertParam.pTableBlockHashList) { taosHashCleanup(pCmd->insertParam.pTableBlockHashList); pCmd->insertParam.pTableBlockHashList = NULL; @@ -925,6 +921,21 @@ int taos_validate_sql(TAOS *taos, const char *sql) { return code; } +void loadMultiTableMetaCallback(void *param, TAOS_RES *res, int code) { + SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, (int64_t)param); + if (pSql == NULL) { + return; + } + + taosReleaseRef(tscObjRef, pSql->self); + pSql->res.code = code; + tsem_post(&pSql->rspSem); +} + +static void freeElem(void* p) { + tfree(*(char**)p); +} + int taos_load_table_info(TAOS *taos, const char *tableNameList) { const int32_t MAX_TABLE_NAME_LENGTH = 12 * 1024 * 1024; // 12MB list @@ -937,7 +948,9 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); pSql->pTscObj = taos; pSql->signature = pSql; + pSql->fp = NULL; // todo set the correct callback function pointer + pSql->cmd.pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); int32_t length = (int32_t)strlen(tableNameList); if (length > MAX_TABLE_NAME_LENGTH) { @@ -954,9 +967,12 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { } strtolower(str, tableNameList); - int32_t code = (uint8_t) tscTransferTableNameList(pSql, str, length); + SArray* plist = taosArrayInit(4, POINTER_BYTES); + SArray* vgroupList = taosArrayInit(4, POINTER_BYTES); + int32_t code = (uint8_t) tscTransferTableNameList(pSql, str, length, plist); free(str); + if (code != TSDB_CODE_SUCCESS) { tscFreeSqlObj(pSql); return code; @@ -964,6 +980,21 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { registerSqlObj(pSql); tscDebug("0x%"PRIx64" load multiple table meta, tableNameList: %s pObj:%p", pSql->self, tableNameList, pObj); - executeQuery(pSql, NULL); + + code = getMultiTableMetaFromMnode(pSql, plist, vgroupList, loadMultiTableMetaCallback); + if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { + code = TSDB_CODE_SUCCESS; + } + + taosArrayDestroyEx(plist, freeElem); + taosArrayDestroyEx(vgroupList, freeElem); + + if (code != TSDB_CODE_SUCCESS) { + tscFreeRegisteredSqlObj(pSql); + return code; + } + + tsem_wait(&pSql->rspSem); + tscFreeRegisteredSqlObj(pSql); return code; } diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 00be5b6e3f862ef957928d5925e2b3cdcdbd951c..8e11fd0cfb435e0eadc24ecdcd175163f81e797b 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -94,7 +94,7 @@ static void doLaunchQuery(void* param, TAOS_RES* tres, int32_t code) { code = tscGetTableMeta(pSql, pTableMetaInfo); if (code == 0 && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { - code = tscGetSTableVgroupInfo(pSql, 0); + code = tscGetSTableVgroupInfo(pSql, pQueryInfo); } if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { @@ -614,16 +614,16 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p return NULL; } - pStream->stime = stime; - pStream->fp = fp; + pStream->stime = stime; + pStream->fp = fp; pStream->callback = callback; - pStream->param = param; - pStream->pSql = pSql; - pSql->pStream = pStream; - pSql->param = pStream; - pSql->maxRetry = TSDB_MAX_REPLICA; + pStream->param = param; + pStream->pSql = pSql; - pSql->sqlstr = calloc(1, strlen(sqlstr) + 1); + pSql->pStream = pStream; + pSql->param = pStream; + pSql->maxRetry = TSDB_MAX_REPLICA; + pSql->sqlstr = calloc(1, strlen(sqlstr) + 1); if (pSql->sqlstr == NULL) { tscError("0x%"PRIx64" failed to malloc sql string buffer", pSql->self); tscFreeSqlObj(pSql); @@ -632,14 +632,14 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p } strtolower(pSql->sqlstr, sqlstr); + pSql->fp = tscCreateStream; + pSql->fetchFp = tscCreateStream; + pSql->cmd.resColumnId = TSDB_RES_COL_ID; + tsem_init(&pSql->rspSem, 0, 0); registerSqlObj(pSql); tscDebugL("0x%"PRIx64" SQL: %s", pSql->self, pSql->sqlstr); - tsem_init(&pSql->rspSem, 0, 0); - - pSql->fp = tscCreateStream; - pSql->fetchFp = tscCreateStream; int32_t code = tsParseSql(pSql, true); if (code == TSDB_CODE_SUCCESS) { diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index 539ed85ebea96bace57bef3a547e5927d5e1d105..42330aa18e48eb60389ef3056c96a2aab386a6ca 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -151,6 +151,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* strtolower(pSql->sqlstr, pSql->sqlstr); pRes->qId = 0; pRes->numOfRows = 1; + pCmd->resColumnId = TSDB_RES_COL_ID; code = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE); if (code != TSDB_CODE_SUCCESS) { diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index eb2e1757d78453830c6c2b96122b952edbbae4fa..325281dce59d765e3432cb8732d76ca8067e278b 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -2289,15 +2289,20 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) { SSqlObj *pNew = createSubqueryObj(pSql, 0, tscFirstRoundCallback, pSup, TSDB_SQL_SELECT, NULL); SSqlCmd *pCmd = &pNew->cmd; - tscClearSubqueryInfo(pCmd); - tscFreeSqlResult(pSql); - SQueryInfo* pNewQueryInfo = tscGetQueryInfo(pCmd); assert(pQueryInfo->numOfTables == 1); + SArray* pColList = pNewQueryInfo->colList; + pNewQueryInfo->colList = NULL; + + tscClearSubqueryInfo(pCmd); + tscFreeSqlResult(pSql); + STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0); tscInitQueryInfo(pNewQueryInfo); + + // add the group cond pNewQueryInfo->groupbyExpr = pQueryInfo->groupbyExpr; if (pQueryInfo->groupbyExpr.columnInfo != NULL) { pNewQueryInfo->groupbyExpr.columnInfo = taosArrayDup(pQueryInfo->groupbyExpr.columnInfo); @@ -2307,12 +2312,15 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) { } } + // add the tag filter cond if (tscTagCondCopy(&pNewQueryInfo->tagCond, &pQueryInfo->tagCond) != 0) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; goto _error; } + pNewQueryInfo->window = pQueryInfo->window; pNewQueryInfo->interval = pQueryInfo->interval; + pNewQueryInfo->sessionWindow = pQueryInfo->sessionWindow; pCmd->command = TSDB_SQL_SELECT; pNew->fp = tscFirstRoundCallback; @@ -2374,6 +2382,21 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) { } } + // add the normal column filter cond + if (pColList != NULL) { + size_t s = taosArrayGetSize(pColList); + for (int32_t i = 0; i < s; ++i) { + SColumn *pCol = taosArrayGetP(pColList, i); + + if (pCol->info.flist.numOfFilters > 0) { // copy to the pNew->cmd.colList if it is filtered. + SColumn *p = tscColumnClone(pCol); + taosArrayPush(pNewQueryInfo->colList, &p); + } + } + + tscColumnListDestroy(pColList); + } + tscInsertPrimaryTsSourceColumn(pNewQueryInfo, pTableMetaInfo->pTableMeta->id.uid); tscTansformFuncForSTableQuery(pNewQueryInfo); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 1b04b8201e442c62935804e27bd68701c6d2d326..88615b074027045172ac05044049ddcf9cdbab76 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -646,8 +646,10 @@ static SColumnInfo* extractColumnInfoFromResult(SArray* pTableCols) { } typedef struct SDummyInputInfo { - SSDataBlock *block; - SSqlObj *pSql; // refactor: remove it + SSDataBlock *block; + SSqlObj *pSql; // refactor: remove it + int32_t numOfFilterCols; + SSingleColumnFilterInfo *pFilterInfo; } SDummyInputInfo; typedef struct SJoinStatus { @@ -696,6 +698,18 @@ SSDataBlock* doGetDataBlock(void* param, bool* newgroup) { pBlock->info.rows = pRes->numOfRows; if (pRes->numOfRows != 0) { doSetupSDataBlock(pRes, pBlock); + + if (pInput->numOfFilterCols > 0) { + doSetFilterColumnInfo(pInput->pFilterInfo, pInput->numOfFilterCols, pBlock); + int8_t* p = calloc(pBlock->info.rows, sizeof(int8_t)); + bool all = doFilterDataBlock(pInput->pFilterInfo, pInput->numOfFilterCols, pBlock->info.rows, p); + if (!all) { + doCompactSDataBlock(pBlock, pBlock->info.rows, p); + } + + tfree(p); + } + *newgroup = false; return pBlock; } @@ -865,11 +879,14 @@ static void destroyDummyInputOperator(void* param, int32_t numOfOutput) { } // todo this operator servers as the adapter for Operator tree and SqlRes result, remove it later -SOperatorInfo* createDummyInputOperator(SSqlObj* pSql, SSchema* pSchema, int32_t numOfCols) { +SOperatorInfo* createDummyInputOperator(SSqlObj* pSql, SSchema* pSchema, int32_t numOfCols, SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols) { assert(numOfCols > 0); SDummyInputInfo* pInfo = calloc(1, sizeof(SDummyInputInfo)); - pInfo->pSql = pSql; + pInfo->pSql = pSql; + pInfo->pFilterInfo = pFilterInfo; + pInfo->numOfFilterCols = numOfFilterCols; + pInfo->block = calloc(numOfCols, sizeof(SSDataBlock)); pInfo->block->info.numOfCols = numOfCols; @@ -901,7 +918,7 @@ static void destroyJoinOperator(void* param, int32_t numOfOutput) { pInfo->pRes = destroyOutputBuf(pInfo->pRes); } -SOperatorInfo* createJoinOperator(SOperatorInfo** pUpstream, int32_t numOfUpstream, SSchema* pSchema, int32_t numOfOutput) { +SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pUpstream, int32_t numOfUpstream, SSchema* pSchema, int32_t numOfOutput) { SJoinOperatorInfo* pInfo = calloc(1, sizeof(SJoinOperatorInfo)); pInfo->numOfUpstream = numOfUpstream; @@ -985,7 +1002,24 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue // if it is a join query, create join operator here int32_t numOfCol1 = px->pTableMetaInfo[0]->pTableMeta->tableInfo.numOfColumns; - SOperatorInfo* pSourceOperator = createDummyInputOperator(pSqlObjList[0], pSchema, numOfCol1); + + int32_t numOfFilterCols = 0; + SColumnInfo* tableCols = calloc(numOfCol1, sizeof(SColumnInfo)); + for(int32_t i = 0; i < numOfCol1; ++i) { + SColumn* pCol = taosArrayGetP(px->colList, i); + if (pCol->info.flist.numOfFilters > 0) { + numOfFilterCols += 1; + } + + tableCols[i] = pCol->info; + } + + SSingleColumnFilterInfo* pFilterInfo = NULL; + if (numOfFilterCols > 0) { + doCreateFilterInfo(tableCols, numOfCol1, numOfFilterCols, &pFilterInfo, 0); + } + + SOperatorInfo* pSourceOperator = createDummyInputOperator(pSqlObjList[0], pSchema, numOfCol1, pFilterInfo, numOfFilterCols); SSchema* schema = NULL; if (px->numOfTables > 1) { @@ -1002,13 +1036,28 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue SSchema* pSchema1 = tscGetTableSchema(px->pTableMetaInfo[i]->pTableMeta); int32_t n = px->pTableMetaInfo[i]->pTableMeta->tableInfo.numOfColumns; - p[i] = createDummyInputOperator(pSqlObjList[i], pSchema1, n); + int32_t numOfFilterCols1 = 0; + SColumnInfo* tableCols1 = calloc(numOfCol1, sizeof(SColumnInfo)); + for(int32_t j = 0; j < numOfCol1; ++j) { + SColumn* pCol = taosArrayGetP(px->colList, j); + if (pCol->info.flist.numOfFilters > 0) { + numOfFilterCols += 1; + } + + tableCols1[j] = pCol->info; + } + SSingleColumnFilterInfo* pFilterInfo1 = NULL; + if (numOfFilterCols1 > 0) { + doCreateFilterInfo(tableCols1, numOfCol1, numOfFilterCols1, &pFilterInfo1, 0); + } + + p[i] = createDummyInputOperator(pSqlObjList[i], pSchema1, n, pFilterInfo1, numOfFilterCols1); memcpy(&schema[offset], pSchema1, n * sizeof(SSchema)); offset += n; } - pSourceOperator = createJoinOperator(p, px->numOfTables, schema, num); + pSourceOperator = createJoinOperatorInfo(p, px->numOfTables, schema, num); tfree(p); } else { size_t num = taosArrayGetSize(px->colList); @@ -2669,6 +2718,7 @@ STableMetaInfo* tscGetTableMetaInfoByUid(SQueryInfo* pQueryInfo, uint64_t uid, i return tscGetMetaInfo(pQueryInfo, k); } +// todo refactor void tscInitQueryInfo(SQueryInfo* pQueryInfo) { assert(pQueryInfo->fieldsInfo.internalField == NULL); pQueryInfo->fieldsInfo.internalField = taosArrayInit(4, sizeof(SInternalField)); @@ -4215,11 +4265,53 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt return TSDB_CODE_SUCCESS; } -int tscTransferTableNameList(SSqlObj *pSql, const char *pNameList, int32_t length) { +static int32_t doAddTableName(char* nextStr, char** str, SArray* pNameArray, SSqlObj* pSql) { + int32_t code = TSDB_CODE_SUCCESS; + SSqlCmd* pCmd = &pSql->cmd; + + char tablename[TSDB_TABLE_FNAME_LEN] = {0}; + int32_t len = 0; + + if (nextStr == NULL) { + strncpy(tablename, *str, TSDB_TABLE_FNAME_LEN); + len = strlen(tablename); + } else { + memcpy(tablename, *str, nextStr - (*str)); + len = (int32_t)(nextStr - (*str)); + tablename[len] = '\0'; + } + + (*str) = nextStr + 1; + len = (int32_t)strtrim(tablename); + + SStrToken sToken = {.n = len, .type = TK_ID, .z = tablename}; + tGetToken(tablename, &sToken.type); + + // Check if the table name available or not + if (tscValidateName(&sToken) != TSDB_CODE_SUCCESS) { + code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH; + sprintf(pCmd->payload, "table name is invalid"); + return code; + } + + SName name = {0}; + if ((code = tscSetTableFullName(&name, &sToken, pSql)) != TSDB_CODE_SUCCESS) { + return code; + } + + memset(tablename, 0, tListLen(tablename)); + tNameExtractFullName(&name, tablename); + + char* p = strdup(tablename); + taosArrayPush(pNameArray, &p); + return TSDB_CODE_SUCCESS; +} + +int tscTransferTableNameList(SSqlObj *pSql, const char *pNameList, int32_t length, SArray* pNameArray) { SSqlCmd *pCmd = &pSql->cmd; pCmd->command = TSDB_SQL_MULTI_META; - pCmd->count = 0; + pCmd->msgType = TSDB_MSG_TYPE_CM_TABLES_META; int code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH; char *str = (char *)pNameList; @@ -4230,70 +4322,31 @@ int tscTransferTableNameList(SSqlObj *pSql, const char *pNameList, int32_t lengt return terrno; } - STableMetaInfo *pTableMetaInfo = tscAddEmptyMetaInfo(pQueryInfo); - - if ((code = tscAllocPayload(pCmd, length + 16)) != TSDB_CODE_SUCCESS) { - return code; - } - char *nextStr; - char tblName[TSDB_TABLE_FNAME_LEN]; - int payloadLen = 0; - char *pMsg = pCmd->payload; while (1) { nextStr = strchr(str, ','); if (nextStr == NULL) { + code = doAddTableName(nextStr, &str, pNameArray, pSql); break; } - memcpy(tblName, str, nextStr - str); - int32_t len = (int32_t)(nextStr - str); - tblName[len] = '\0'; - - str = nextStr + 1; - len = (int32_t)strtrim(tblName); - - SStrToken sToken = {.n = len, .type = TK_ID, .z = tblName}; - tGetToken(tblName, &sToken.type); - - // Check if the table name available or not - if (tscValidateName(&sToken) != TSDB_CODE_SUCCESS) { - code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH; - sprintf(pCmd->payload, "table name is invalid"); - return code; - } - - if ((code = tscSetTableFullName(&pTableMetaInfo->name, &sToken, pSql)) != TSDB_CODE_SUCCESS) { + code = doAddTableName(nextStr, &str, pNameArray, pSql); + if (code != TSDB_CODE_SUCCESS) { return code; } - if (++pCmd->count > TSDB_MULTI_TABLEMETA_MAX_NUM) { + if (taosArrayGetSize(pNameArray) > TSDB_MULTI_TABLEMETA_MAX_NUM) { code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH; sprintf(pCmd->payload, "tables over the max number"); return code; } - - int32_t xlen = tNameLen(&pTableMetaInfo->name); - if (payloadLen + xlen + 128 >= pCmd->allocSize) { - char *pNewMem = realloc(pCmd->payload, pCmd->allocSize + length); - if (pNewMem == NULL) { - code = TSDB_CODE_TSC_OUT_OF_MEMORY; - sprintf(pCmd->payload, "failed to allocate memory"); - return code; - } - - pCmd->payload = pNewMem; - pCmd->allocSize = pCmd->allocSize + length; - pMsg = pCmd->payload; - } - - char n[TSDB_TABLE_FNAME_LEN] = {0}; - tNameExtractFullName(&pTableMetaInfo->name, n); - payloadLen += sprintf(pMsg + payloadLen, "%s,", n); } - *(pMsg + payloadLen) = '\0'; - pCmd->payloadLen = payloadLen + 1; + if (taosArrayGetSize(pNameArray) > TSDB_MULTI_TABLEMETA_MAX_NUM) { + code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH; + sprintf(pCmd->payload, "tables over the max number"); + return code; + } return TSDB_CODE_SUCCESS; } diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index 429a6a0b572702371082ebc4aaf90d8e018f7872..1584df16fe1b0af84ae1bcc5123a9d71be447ca0 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -2878,6 +2878,7 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) { for (; t < pInfo->numOfTables; ++t) { char *fullName = nameList[t]; + pMsg->pVgroup = NULL; pMsg->pTable = mnodeGetTable(fullName); if (pMsg->pTable == NULL) { mError("msg:%p, app:%p table:%s, failed to get table meta, table not exist", pMsg, pMsg->rpcMsg.ahandle, fullName); diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index a39819cb5124cca46f3cd0a7ac6d9eace716fbe5..4ab718e09de962d74b792d12aed6bb5844008b9f 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -521,12 +521,17 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter); -SOperatorInfo* createJoinOperator(SOperatorInfo** pUpstream, int32_t numOfUpstream, SSchema* pSchema, int32_t numOfOutput); +SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pUpstream, int32_t numOfUpstream, SSchema* pSchema, int32_t numOfOutput); SSDataBlock* doGlobalAggregate(void* param, bool* newgroup); SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup); SSDataBlock* doSLimit(void* param, bool* newgroup); +int32_t doCreateFilterInfo(SColumnInfo* pCols, int32_t numOfCols, int32_t numOfFilterCols, SSingleColumnFilterInfo** pFilterInfo, uint64_t qId); +void doSetFilterColumnInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, SSDataBlock* pBlock); +bool doFilterDataBlock(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, int32_t numOfRows, int8_t* p); +void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p); + SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows); void* destroyOutputBuf(SSDataBlock* pBlock); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 8a7506cb9773c10f29050151afda5224ea23be6e..334edefe1ef891396e347ca4d9b882791d836800 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -178,8 +178,6 @@ static STsdbQueryCond createTsdbQueryCond(SQueryAttr* pQueryAttr, STimeWindow* w static STableIdInfo createTableIdInfo(STableQueryInfo* pTableQueryInfo); static void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInfo* pDownstream); -static int32_t doCreateFilterInfo(SColumnInfo* pCols, int32_t numOfCols, int32_t numOfFilterCols, - SSingleColumnFilterInfo** pFilterInfo, uint64_t qId); static void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols); static int32_t getNumOfScanTimes(SQueryAttr* pQueryAttr); @@ -1742,7 +1740,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf case OP_Aggregate: { pRuntimeEnv->proot = createAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); - if (pRuntimeEnv->proot->upstream[0]->operatorType != OP_DummyInput) { + + int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; + if (opType != OP_DummyInput && opType != OP_Join) { setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); } break; @@ -2099,7 +2099,7 @@ static int32_t updateBlockLoadStatus(SQueryAttr *pQuery, int32_t status) { if (hasFirstLastFunc && status == BLK_DATA_NO_NEEDED) { if(!hasOtherFunc) { return BLK_DATA_DISCARD; - } else{ + } else { return BLK_DATA_ALL_NEEDED; } } @@ -2374,6 +2374,105 @@ static int32_t doTSJoinFilter(SQueryRuntimeEnv *pRuntimeEnv, TSKEY key, bool asc return TS_JOIN_TS_EQUAL; } +bool doFilterDataBlock(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, int32_t numOfRows, int8_t* p) { + bool all = true; + + for (int32_t i = 0; i < numOfRows; ++i) { + bool qualified = false; + + for (int32_t k = 0; k < numOfFilterCols; ++k) { + char* pElem = (char*)pFilterInfo[k].pData + pFilterInfo[k].info.bytes * i; + + qualified = false; + for (int32_t j = 0; j < pFilterInfo[k].numOfFilters; ++j) { + SColumnFilterElem* pFilterElem = &pFilterInfo[k].pFilters[j]; + + bool isnull = isNull(pElem, pFilterInfo[k].info.type); + if (isnull) { + if (pFilterElem->fp == isNullOperator) { + qualified = true; + break; + } else { + continue; + } + } else { + if (pFilterElem->fp == notNullOperator) { + qualified = true; + break; + } else if (pFilterElem->fp == isNullOperator) { + continue; + } + } + + if (pFilterElem->fp(pFilterElem, pElem, pElem, pFilterInfo[k].info.type)) { + qualified = true; + break; + } + } + + if (!qualified) { + break; + } + } + + p[i] = qualified ? 1 : 0; + if (!qualified) { + all = false; + } + } + + return all; +} + +void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p) { + int32_t len = 0; + int32_t start = 0; + for (int32_t j = 0; j < numOfRows; ++j) { + if (p[j] == 1) { + len++; + } else { + if (len > 0) { + int32_t cstart = j - len; + for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { + SColumnInfoData* pColumnInfoData = taosArrayGet(pBlock->pDataBlock, i); + + int16_t bytes = pColumnInfoData->info.bytes; + memmove(((char*)pColumnInfoData->pData) + start * bytes, pColumnInfoData->pData + cstart * bytes, + len * bytes); + } + + start += len; + len = 0; + } + } + } + + if (len > 0) { + int32_t cstart = numOfRows - len; + for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { + SColumnInfoData* pColumnInfoData = taosArrayGet(pBlock->pDataBlock, i); + + int16_t bytes = pColumnInfoData->info.bytes; + memmove(pColumnInfoData->pData + start * bytes, pColumnInfoData->pData + cstart * bytes, len * bytes); + } + + start += len; + len = 0; + } + + pBlock->info.rows = start; + pBlock->pBlockStatis = NULL; // clean the block statistics info + + if (start > 0) { + SColumnInfoData* pColumnInfoData = taosArrayGet(pBlock->pDataBlock, 0); + if (pColumnInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP && + pColumnInfoData->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { + pBlock->info.window.skey = *(int64_t*)pColumnInfoData->pData; + pBlock->info.window.ekey = *(int64_t*)(pColumnInfoData->pData + TSDB_KEYSIZE * (start - 1)); + } + } +} + void filterRowsInDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, SSDataBlock* pBlock, bool ascQuery) { int32_t numOfRows = pBlock->info.rows; @@ -2406,97 +2505,11 @@ void filterRowsInDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SSingleColumnFilterInf // save the cursor status pRuntimeEnv->current->cur = tsBufGetCursor(pRuntimeEnv->pTsBuf); } else { - for (int32_t i = 0; i < numOfRows; ++i) { - bool qualified = false; - - for (int32_t k = 0; k < numOfFilterCols; ++k) { - char* pElem = (char*)pFilterInfo[k].pData + pFilterInfo[k].info.bytes * i; - - qualified = false; - for (int32_t j = 0; j < pFilterInfo[k].numOfFilters; ++j) { - SColumnFilterElem* pFilterElem = &pFilterInfo[k].pFilters[j]; - - bool isnull = isNull(pElem, pFilterInfo[k].info.type); - if (isnull) { - if (pFilterElem->fp == isNullOperator) { - qualified = true; - break; - } else { - continue; - } - } else { - if (pFilterElem->fp == notNullOperator) { - qualified = true; - break; - } else if (pFilterElem->fp == isNullOperator) { - continue; - } - } - - if (pFilterElem->fp(pFilterElem, pElem, pElem, pFilterInfo[k].info.type)) { - qualified = true; - break; - } - } - - if (!qualified) { - break; - } - } - - p[i] = qualified ? 1 : 0; - if (!qualified) { - all = false; - } - } + all = doFilterDataBlock(pFilterInfo, numOfFilterCols, numOfRows, p); } if (!all) { - int32_t start = 0; - int32_t len = 0; - for (int32_t j = 0; j < numOfRows; ++j) { - if (p[j] == 1) { - len++; - } else { - if (len > 0) { - int32_t cstart = j - len; - for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { - SColumnInfoData *pColumnInfoData = taosArrayGet(pBlock->pDataBlock, i); - - int16_t bytes = pColumnInfoData->info.bytes; - memmove(((char*)pColumnInfoData->pData) + start * bytes, pColumnInfoData->pData + cstart * bytes, len * bytes); - } - - start += len; - len = 0; - } - } - } - - if (len > 0) { - int32_t cstart = numOfRows - len; - for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { - SColumnInfoData *pColumnInfoData = taosArrayGet(pBlock->pDataBlock, i); - - int16_t bytes = pColumnInfoData->info.bytes; - memmove(pColumnInfoData->pData + start * bytes, pColumnInfoData->pData + cstart * bytes, len * bytes); - } - - start += len; - len = 0; - } - - pBlock->info.rows = start; - pBlock->pBlockStatis = NULL; // clean the block statistics info - - if (start > 0) { - SColumnInfoData* pColumnInfoData = taosArrayGet(pBlock->pDataBlock, 0); - if (pColumnInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP && - pColumnInfoData->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { - pBlock->info.window.skey = *(int64_t*)pColumnInfoData->pData; - pBlock->info.window.ekey = *(int64_t*)(pColumnInfoData->pData + TSDB_KEYSIZE * (start - 1)); - } - } + doCompactSDataBlock(pBlock, numOfRows, p); } tfree(p); @@ -2524,7 +2537,7 @@ static uint32_t doFilterByBlockTimeWindow(STableScanInfo* pTableScanInfo, SSData return status; } -static void doSetFilterColumnInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, SSDataBlock* pBlock) { +void doSetFilterColumnInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, SSDataBlock* pBlock) { if (numOfFilterCols > 0 && pFilterInfo[0].pData != NULL) { return; } @@ -4920,27 +4933,6 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) { return pBlock; } - -bool doFilterData(SColumnInfoData* p, int32_t rid, SColumnFilterElem *filterElem, __filter_func_t fp) { - char* input = p->pData + p->info.bytes * rid; - bool isnull = isNull(input, p->info.type); - if (isnull) { - return (fp == isNullOperator) ? true : false; - } else { - if (fp == notNullOperator) { - return true; - } else if (fp == isNullOperator) { - return false; - } - } - - if (fp(filterElem, input, input, p->info.type)) { - return true; - } - - return false; -} - static SSDataBlock* doFilter(void* param, bool* newgroup) { SOperatorInfo *pOperator = (SOperatorInfo *)param; if (pOperator->status == OP_EXEC_DONE) { @@ -5445,7 +5437,7 @@ SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI return pOperator; } -static SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int32_t* numOfFilterCols) { +SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int32_t* numOfFilterCols) { SColumnInfo* pCols = calloc(numOfOutput, sizeof(SColumnInfo)); int32_t numOfFilter = 0; @@ -5454,7 +5446,7 @@ static SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutpu numOfFilter += 1; } - pCols[i].type = pExpr[i].base.resType; + pCols[i].type = pExpr[i].base.resType; pCols[i].bytes = pExpr[i].base.resBytes; pCols[i].colId = pExpr[i].base.resColId; @@ -6663,8 +6655,7 @@ SGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pCo return pGroupbyExpr; } -static int32_t doCreateFilterInfo(SColumnInfo* pCols, int32_t numOfCols, int32_t numOfFilterCols, - SSingleColumnFilterInfo** pFilterInfo, uint64_t qId) { +int32_t doCreateFilterInfo(SColumnInfo* pCols, int32_t numOfCols, int32_t numOfFilterCols, SSingleColumnFilterInfo** pFilterInfo, uint64_t qId) { *pFilterInfo = calloc(1, sizeof(SSingleColumnFilterInfo) * numOfFilterCols); if (pFilterInfo == NULL) { return TSDB_CODE_QRY_OUT_OF_MEMORY; diff --git a/tests/examples/c/apitest.c b/tests/examples/c/apitest.c index f20c0321c455da9c430aa3a4eb1d32af7d71da8b..0ca92eaf1d018bc8b370706752cf0d139a9bfaf8 100644 --- a/tests/examples/c/apitest.c +++ b/tests/examples/c/apitest.c @@ -7,7 +7,6 @@ #include #include - static void prepare_data(TAOS* taos) { TAOS_RES *result; result = taos_query(taos, "drop database if exists test;"); @@ -69,7 +68,6 @@ static void prepare_data(TAOS* taos) { usleep(1000000); } - static int print_result(TAOS_RES* res, int blockFetch) { TAOS_ROW row = NULL; int num_fields = taos_num_fields(res); @@ -99,7 +97,6 @@ static int print_result(TAOS_RES* res, int blockFetch) { return nRows; } - static void check_row_count(int line, TAOS_RES* res, int expected) { int actual = print_result(res, expected % 2); if (actual != expected) { @@ -109,7 +106,6 @@ static void check_row_count(int line, TAOS_RES* res, int expected) { } } - static void verify_query(TAOS* taos) { prepare_data(taos); @@ -153,7 +149,6 @@ static void verify_query(TAOS* taos) { taos_free_result(res); } - void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) { int rows = print_result(res, *(int*)param); printf("%d rows consumed in subscribe_callback\n", rows); @@ -235,10 +230,10 @@ static void verify_subscribe(TAOS* taos) { taos_unsubscribe(tsub, 0); } - void verify_prepare(TAOS* taos) { TAOS_RES* result = taos_query(taos, "drop database if exists test;"); taos_free_result(result); + usleep(100000); result = taos_query(taos, "create database test;"); @@ -248,6 +243,7 @@ void verify_prepare(TAOS* taos) { taos_free_result(result); return; } + taos_free_result(result); usleep(100000); @@ -369,6 +365,7 @@ void verify_prepare(TAOS* taos) { taos_stmt_add_batch(stmt); } if (taos_stmt_execute(stmt) != 0) { + taos_stmt_close(stmt); printf("\033[31mfailed to execute insert statement.\033[0m\n"); return; } @@ -381,6 +378,7 @@ void verify_prepare(TAOS* taos) { v.v2 = 15; taos_stmt_bind_param(stmt, params + 2); if (taos_stmt_execute(stmt) != 0) { + taos_stmt_close(stmt); printf("\033[31mfailed to execute select statement.\033[0m\n"); return; } diff --git a/tests/script/general/parser/function.sim b/tests/script/general/parser/function.sim index 65058333fb6a6dea4d0dd583b86f2927bdcc7979..56ce15c36fbbe71753028f60a03d5cdf73a03571 100644 --- a/tests/script/general/parser/function.sim +++ b/tests/script/general/parser/function.sim @@ -809,3 +809,5 @@ endi if $data00 != 1 then return -1 endi + +print ====================> TODO stddev + normal column filter diff --git a/tests/script/general/parser/nestquery.sim b/tests/script/general/parser/nestquery.sim index eab1392d1c670f148c9754f48701536b03b1e71c..9e2736833f56cb6224cc1a1b351b2171213f5f8f 100644 --- a/tests/script/general/parser/nestquery.sim +++ b/tests/script/general/parser/nestquery.sim @@ -120,22 +120,88 @@ if $data00 != 167 then endi print ================>master query + filter -sql select * from (select count(*) a from nest_tb0 interval(10h)) where a <= 520; +sql select t.* from (select count(*) a from nest_tb0 interval(10h)) t where t.a <= 520; if $rows != 2 then return -1 endi -print ==================> nest query join +print ===================> nest query interval -print ===================> nest query interval +print ===================> complex query -print ===================> complex query +print ===================> group by + having + + + + + +print =========================> nest query join +sql select a.ts,a.k,b.ts from (select count(*) k from nest_tb0 interval(30a)) a, (select count(*) f from nest_tb1 interval(30a)) b where a.ts = b.ts ; +if $rows != 10000 then + return -1 +endi + +if $data00 != @20-09-15 00:00:00.000@ then + return -1 +endi + +if $data01 != 1 then + return -1 +endi + +if $data02 != @20-09-15 00:00:00.000@ then + return -1 +endi + +if $data10 != @20-09-15 00:01:00.000@ then + return -1 +endi + +if $data11 != 1 then + return -1 +endi +if $data12 != @20-09-15 00:01:00.000@ then + return -1 +endi + +sql select sum(a.k), sum(b.f) from (select count(*) k from nest_tb0 interval(30a)) a, (select count(*) f from nest_tb1 interval(30a)) b where a.ts = b.ts ; +if $rows != 1 then + return -1 +endi + +if $data00 != 10000 then + return -1 +endi +if $data01 != 10000 then + return -1 +endi + +sql select a.ts,a.k,b.ts,c.ts,c.ts,c.x from (select count(*) k from nest_tb0 interval(30a)) a, (select count(*) f from nest_tb1 interval(30a)) b, (select count(*) x from nest_tb2 interval(30a)) c where a.ts = b.ts and a.ts = c.ts +if $rows != 10000 then + return -1 +endi + +if $data00 != @20-09-15 00:00:00.000@ then + return -1 +endi + +if $data01 != 1 then + return -1 +endi + +if $data02 != @20-09-15 00:00:00.000@ then + return -1 +endi + +if $data03 != @20-09-15 00:00:00.000@ then + return -1 +endi system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file