提交 71319327 编写于 作者: H Haojun Liao

[td-225] fix bug found by regression test.

上级 644bed2d
......@@ -302,8 +302,9 @@ void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp);
void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp);
int tscSetMgmtEpSetFromCfg(const char *first, const char *second, SRpcCorEpSet *corEpSet);
int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVgroupList);
int tscTransferTableNameList(SSqlObj *pSql, const char *pNameList, int32_t length);
int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVgroupNameList, __async_cb_func_t fp);
int tscTransferTableNameList(SSqlObj *pSql, const char *pNameList, int32_t length, SArray* pNameArray);
bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx);
......
......@@ -492,7 +492,7 @@ char *tscGetErrorMsgPayload(SSqlCmd *pCmd);
int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *sql);
int32_t tscSQLSyntaxErrMsg(char* msg, const char* additionalInfo, const char* sql);
int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo);
int32_t tscValidateSqlInfo(SSqlObj *pSql, struct SSqlInfo *pInfo);
extern int32_t sentinel;
extern SHashObj *tscVgroupMap;
......
......@@ -482,28 +482,9 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
return;
}
} else { // stream computing
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
STableMetaInfo *pTableMetaInfo = pQueryInfo->pTableMetaInfo[0];
code = tscGetTableMeta(pSql, pTableMetaInfo);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, pSql->self);
return;
} else if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
tscDebug("0x%"PRIx64" stream:%p meta is updated, start new query, command:%d", pSql->self, pSql->pStream, pCmd->command);
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
code = tscGetSTableVgroupInfo(pSql, pQueryInfo);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, pSql->self);
return;
} else if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
}
tscDebug("0x%"PRIx64" stream:%p meta is updated, start new query, command:%d", pSql->self, pSql->pStream, pSql->cmd.command);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
if (tscNumOfExprs(pQueryInfo) == 0) {
tsParseSql(pSql, false);
}
......
......@@ -1014,12 +1014,16 @@ int validateTableName(char *tblName, int len, SStrToken* psTblToken) {
}
static int32_t validateDataSource(SSqlCmd *pCmd, int32_t type, const char *sql) {
if (pCmd->insertParam.insertType != 0 && !TSDB_QUERY_HAS_TYPE(pCmd->insertParam.insertType, type)) {
return tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES and FILE are not allowed to mix up", sql);
uint32_t *insertType = &pCmd->insertParam.insertType;
if (*insertType == TSDB_QUERY_TYPE_STMT_INSERT && type == TSDB_QUERY_TYPE_INSERT) {
return TSDB_CODE_SUCCESS;
}
if ((*insertType) != 0 && (*insertType) != type) {
return tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES and FILE are not allowed to mixed up", sql);
}
pCmd->insertParam.insertType = type;
*insertType = type;
return TSDB_CODE_SUCCESS;
}
......@@ -1368,18 +1372,18 @@ int tsParseSql(SSqlObj *pSql, bool initial) {
}
}
} else {
SSqlInfo SQLInfo = qSqlParse(pSql->sqlstr);
ret = tscToSQLCmd(pSql, &SQLInfo);
if (ret == TSDB_CODE_TSC_INVALID_OPERATION && pSql->parseRetry < 1 && SQLInfo.type == TSDB_SQL_SELECT) {
SSqlInfo sqlInfo = qSqlParse(pSql->sqlstr);
ret = tscValidateSqlInfo(pSql, &sqlInfo);
if (ret == TSDB_CODE_TSC_INVALID_OPERATION && pSql->parseRetry < 1 && sqlInfo.type == TSDB_SQL_SELECT) {
tscDebug("0x%"PRIx64 " parse query sql statement failed, code:%s, clear meta cache and retry ", pSql->self, tstrerror(ret));
tscResetSqlCmd(pCmd, true);
pSql->parseRetry++;
ret = tscToSQLCmd(pSql, &SQLInfo);
ret = tscValidateSqlInfo(pSql, &sqlInfo);
}
SqlInfoDestroy(&SQLInfo);
SqlInfoDestroy(&sqlInfo);
}
/*
......
......@@ -1219,6 +1219,7 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) {
pStmt->taos = pObj;
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
if (pSql == NULL) {
free(pStmt);
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
......@@ -1613,9 +1614,10 @@ int taos_stmt_execute(TAOS_STMT* stmt) {
ret = TSDB_CODE_TSC_OUT_OF_MEMORY;
} else {
if (pStmt->pSql != NULL) {
taos_free_result(pStmt->pSql);
tscFreeSqlObj(pStmt->pSql);
pStmt->pSql = NULL;
}
pStmt->pSql = taos_query((TAOS*)pStmt->taos, sql);
ret = taos_errno(pStmt->pSql);
free(sql);
......
......@@ -279,7 +279,7 @@ static int32_t normalizeVarDataTypeLength(SSqlCmd* pCmd) {
return TSDB_CODE_SUCCESS;
}
int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
if (pInfo == NULL || pSql == NULL) {
return TSDB_CODE_TSC_APP_ERROR;
}
......@@ -7152,6 +7152,7 @@ static int32_t getTableNameFromSubquery(SSqlNode* pSqlNode, SArray* tableNameLis
return TSDB_CODE_SUCCESS;
}
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code);
static void freeElem(void* p) {
tfree(*(char**)p);
}
......@@ -7244,7 +7245,7 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) {
// load the table meta for a given table name list
if (taosArrayGetSize(plist) > 0 || taosArrayGetSize(pVgroupList) > 0) {
code = getMultiTableMetaFromMnode(pSql, plist, pVgroupList);
code = getMultiTableMetaFromMnode(pSql, plist, pVgroupList, tscTableMetaCallBack);
}
_end:
......@@ -7253,7 +7254,7 @@ _end:
}
if (pVgroupList != NULL) {
taosArrayDestroy(pVgroupList);
taosArrayDestroyEx(pVgroupList, freeElem);
}
if (tableNameList != NULL) {
......@@ -7421,10 +7422,6 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
const char* msg1 = "point interpolation query needs timestamp";
const char* msg2 = "too many tables in from clause";
const char* msg3 = "start(end) time of query range required or time range too large";
// const char* msg5 = "too many columns in selection clause";
// const char* msg6 = "too many tables in from clause";
// const char* msg7 = "invalid table alias name";
// const char* msg8 = "alias name too long";
const char* msg9 = "only tag query not compatible with normal column filter";
int32_t code = TSDB_CODE_SUCCESS;
......
......@@ -2061,11 +2061,22 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
char *pMsg = pRes->pRsp + sizeof(SSTableVgroupRspMsg);
SSqlCmd* pCmd = &parent->cmd;
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
for(int32_t i = 0; i < pStableVgroup->numOfTables; ++i) {
STableMetaInfo *pInfo = tscGetTableMetaInfoFromCmd(pCmd, i);
char* name = pMsg;
pMsg += TSDB_TABLE_NAME_LEN;
SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *) pMsg;
pVgroupMsg->numOfVgroups = htonl(pVgroupMsg->numOfVgroups);
STableMetaInfo *pInfo = NULL;
for(int32_t j = 0; j < pQueryInfo->numOfTables; ++j) {
STableMetaInfo *pInfo1 = tscGetTableMetaInfoFromCmd(pCmd, j);
if (strcmp(name, tNameGetTableName(&pInfo1->name)) != 0) {
continue;
}
pInfo = pInfo1;
break;
}
int32_t size = 0;
pInfo->vgroupList = createVgroupInfoFromMsg(pMsg, &size, pSql->self);
......@@ -2407,7 +2418,7 @@ static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaIn
return code;
}
int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVgroupNameList) {
int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVgroupNameList, __async_cb_func_t fp) {
SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
if (NULL == pNew) {
tscError("0x%"PRIx64" failed to allocate sqlobj to get multiple table meta", pSql->self);
......@@ -2463,7 +2474,7 @@ int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVg
tscDebug("0x%"PRIx64" new pSqlObj:0x%"PRIx64" to get %d tableMeta, vgroupInfo:%d, msg size:%d", pSql->self,
pNew->self, numOfTable, numOfVgroupList, pNew->cmd.payloadLen);
pNew->fp = tscTableMetaCallBack;
pNew->fp = fp;
pNew->param = (void *)pSql->self;
tscDebug("0x%"PRIx64" metaRid from %" PRId64 " to %" PRId64 , pSql->self, pSql->metaRid, pNew->self);
......
......@@ -874,13 +874,9 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
pSql->pTscObj = taos;
pSql->signature = pSql;
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
pRes->numOfTotal = 0;
pRes->numOfClauseTotal = 0;
pCmd->resColumnId = TSDB_RES_COL_ID;
tscDebug("0x%"PRIx64" Valid SQL: %s pObj:%p", pSql->self, sql, pObj);
......@@ -900,7 +896,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
strtolower(pSql->sqlstr, sql);
pCmd->curSql = NULL;
// pCmd->curSql = NULL;
if (NULL != pCmd->insertParam.pTableBlockHashList) {
taosHashCleanup(pCmd->insertParam.pTableBlockHashList);
pCmd->insertParam.pTableBlockHashList = NULL;
......@@ -925,6 +921,21 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
return code;
}
void loadMultiTableMetaCallback(void *param, TAOS_RES *res, int code) {
SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, (int64_t)param);
if (pSql == NULL) {
return;
}
taosReleaseRef(tscObjRef, pSql->self);
pSql->res.code = code;
tsem_post(&pSql->rspSem);
}
static void freeElem(void* p) {
tfree(*(char**)p);
}
int taos_load_table_info(TAOS *taos, const char *tableNameList) {
const int32_t MAX_TABLE_NAME_LENGTH = 12 * 1024 * 1024; // 12MB list
......@@ -937,7 +948,9 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
pSql->pTscObj = taos;
pSql->signature = pSql;
pSql->fp = NULL; // todo set the correct callback function pointer
pSql->cmd.pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
int32_t length = (int32_t)strlen(tableNameList);
if (length > MAX_TABLE_NAME_LENGTH) {
......@@ -954,9 +967,12 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
}
strtolower(str, tableNameList);
int32_t code = (uint8_t) tscTransferTableNameList(pSql, str, length);
SArray* plist = taosArrayInit(4, POINTER_BYTES);
SArray* vgroupList = taosArrayInit(4, POINTER_BYTES);
int32_t code = (uint8_t) tscTransferTableNameList(pSql, str, length, plist);
free(str);
if (code != TSDB_CODE_SUCCESS) {
tscFreeSqlObj(pSql);
return code;
......@@ -964,6 +980,21 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
registerSqlObj(pSql);
tscDebug("0x%"PRIx64" load multiple table meta, tableNameList: %s pObj:%p", pSql->self, tableNameList, pObj);
executeQuery(pSql, NULL);
code = getMultiTableMetaFromMnode(pSql, plist, vgroupList, loadMultiTableMetaCallback);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
code = TSDB_CODE_SUCCESS;
}
taosArrayDestroyEx(plist, freeElem);
taosArrayDestroyEx(vgroupList, freeElem);
if (code != TSDB_CODE_SUCCESS) {
tscFreeRegisteredSqlObj(pSql);
return code;
}
tsem_wait(&pSql->rspSem);
tscFreeRegisteredSqlObj(pSql);
return code;
}
......@@ -94,7 +94,7 @@ static void doLaunchQuery(void* param, TAOS_RES* tres, int32_t code) {
code = tscGetTableMeta(pSql, pTableMetaInfo);
if (code == 0 && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
code = tscGetSTableVgroupInfo(pSql, 0);
code = tscGetSTableVgroupInfo(pSql, pQueryInfo);
}
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
......@@ -619,10 +619,10 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
pStream->callback = callback;
pStream->param = param;
pStream->pSql = pSql;
pSql->pStream = pStream;
pSql->param = pStream;
pSql->maxRetry = TSDB_MAX_REPLICA;
pSql->sqlstr = calloc(1, strlen(sqlstr) + 1);
if (pSql->sqlstr == NULL) {
tscError("0x%"PRIx64" failed to malloc sql string buffer", pSql->self);
......@@ -632,14 +632,14 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
}
strtolower(pSql->sqlstr, sqlstr);
pSql->fp = tscCreateStream;
pSql->fetchFp = tscCreateStream;
pSql->cmd.resColumnId = TSDB_RES_COL_ID;
tsem_init(&pSql->rspSem, 0, 0);
registerSqlObj(pSql);
tscDebugL("0x%"PRIx64" SQL: %s", pSql->self, pSql->sqlstr);
tsem_init(&pSql->rspSem, 0, 0);
pSql->fp = tscCreateStream;
pSql->fetchFp = tscCreateStream;
int32_t code = tsParseSql(pSql, true);
if (code == TSDB_CODE_SUCCESS) {
......
......@@ -151,6 +151,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
strtolower(pSql->sqlstr, pSql->sqlstr);
pRes->qId = 0;
pRes->numOfRows = 1;
pCmd->resColumnId = TSDB_RES_COL_ID;
code = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE);
if (code != TSDB_CODE_SUCCESS) {
......
......@@ -2289,15 +2289,20 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
SSqlObj *pNew = createSubqueryObj(pSql, 0, tscFirstRoundCallback, pSup, TSDB_SQL_SELECT, NULL);
SSqlCmd *pCmd = &pNew->cmd;
tscClearSubqueryInfo(pCmd);
tscFreeSqlResult(pSql);
SQueryInfo* pNewQueryInfo = tscGetQueryInfo(pCmd);
assert(pQueryInfo->numOfTables == 1);
SArray* pColList = pNewQueryInfo->colList;
pNewQueryInfo->colList = NULL;
tscClearSubqueryInfo(pCmd);
tscFreeSqlResult(pSql);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0);
tscInitQueryInfo(pNewQueryInfo);
// add the group cond
pNewQueryInfo->groupbyExpr = pQueryInfo->groupbyExpr;
if (pQueryInfo->groupbyExpr.columnInfo != NULL) {
pNewQueryInfo->groupbyExpr.columnInfo = taosArrayDup(pQueryInfo->groupbyExpr.columnInfo);
......@@ -2307,12 +2312,15 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
}
}
// add the tag filter cond
if (tscTagCondCopy(&pNewQueryInfo->tagCond, &pQueryInfo->tagCond) != 0) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error;
}
pNewQueryInfo->window = pQueryInfo->window;
pNewQueryInfo->interval = pQueryInfo->interval;
pNewQueryInfo->sessionWindow = pQueryInfo->sessionWindow;
pCmd->command = TSDB_SQL_SELECT;
pNew->fp = tscFirstRoundCallback;
......@@ -2374,6 +2382,21 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
}
}
// add the normal column filter cond
if (pColList != NULL) {
size_t s = taosArrayGetSize(pColList);
for (int32_t i = 0; i < s; ++i) {
SColumn *pCol = taosArrayGetP(pColList, i);
if (pCol->info.flist.numOfFilters > 0) { // copy to the pNew->cmd.colList if it is filtered.
SColumn *p = tscColumnClone(pCol);
taosArrayPush(pNewQueryInfo->colList, &p);
}
}
tscColumnListDestroy(pColList);
}
tscInsertPrimaryTsSourceColumn(pNewQueryInfo, pTableMetaInfo->pTableMeta->id.uid);
tscTansformFuncForSTableQuery(pNewQueryInfo);
......
......@@ -648,6 +648,8 @@ static SColumnInfo* extractColumnInfoFromResult(SArray* pTableCols) {
typedef struct SDummyInputInfo {
SSDataBlock *block;
SSqlObj *pSql; // refactor: remove it
int32_t numOfFilterCols;
SSingleColumnFilterInfo *pFilterInfo;
} SDummyInputInfo;
typedef struct SJoinStatus {
......@@ -696,6 +698,18 @@ SSDataBlock* doGetDataBlock(void* param, bool* newgroup) {
pBlock->info.rows = pRes->numOfRows;
if (pRes->numOfRows != 0) {
doSetupSDataBlock(pRes, pBlock);
if (pInput->numOfFilterCols > 0) {
doSetFilterColumnInfo(pInput->pFilterInfo, pInput->numOfFilterCols, pBlock);
int8_t* p = calloc(pBlock->info.rows, sizeof(int8_t));
bool all = doFilterDataBlock(pInput->pFilterInfo, pInput->numOfFilterCols, pBlock->info.rows, p);
if (!all) {
doCompactSDataBlock(pBlock, pBlock->info.rows, p);
}
tfree(p);
}
*newgroup = false;
return pBlock;
}
......@@ -865,11 +879,14 @@ static void destroyDummyInputOperator(void* param, int32_t numOfOutput) {
}
// todo this operator servers as the adapter for Operator tree and SqlRes result, remove it later
SOperatorInfo* createDummyInputOperator(SSqlObj* pSql, SSchema* pSchema, int32_t numOfCols) {
SOperatorInfo* createDummyInputOperator(SSqlObj* pSql, SSchema* pSchema, int32_t numOfCols, SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols) {
assert(numOfCols > 0);
SDummyInputInfo* pInfo = calloc(1, sizeof(SDummyInputInfo));
pInfo->pSql = pSql;
pInfo->pFilterInfo = pFilterInfo;
pInfo->numOfFilterCols = numOfFilterCols;
pInfo->block = calloc(numOfCols, sizeof(SSDataBlock));
pInfo->block->info.numOfCols = numOfCols;
......@@ -901,7 +918,7 @@ static void destroyJoinOperator(void* param, int32_t numOfOutput) {
pInfo->pRes = destroyOutputBuf(pInfo->pRes);
}
SOperatorInfo* createJoinOperator(SOperatorInfo** pUpstream, int32_t numOfUpstream, SSchema* pSchema, int32_t numOfOutput) {
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pUpstream, int32_t numOfUpstream, SSchema* pSchema, int32_t numOfOutput) {
SJoinOperatorInfo* pInfo = calloc(1, sizeof(SJoinOperatorInfo));
pInfo->numOfUpstream = numOfUpstream;
......@@ -985,7 +1002,24 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue
// if it is a join query, create join operator here
int32_t numOfCol1 = px->pTableMetaInfo[0]->pTableMeta->tableInfo.numOfColumns;
SOperatorInfo* pSourceOperator = createDummyInputOperator(pSqlObjList[0], pSchema, numOfCol1);
int32_t numOfFilterCols = 0;
SColumnInfo* tableCols = calloc(numOfCol1, sizeof(SColumnInfo));
for(int32_t i = 0; i < numOfCol1; ++i) {
SColumn* pCol = taosArrayGetP(px->colList, i);
if (pCol->info.flist.numOfFilters > 0) {
numOfFilterCols += 1;
}
tableCols[i] = pCol->info;
}
SSingleColumnFilterInfo* pFilterInfo = NULL;
if (numOfFilterCols > 0) {
doCreateFilterInfo(tableCols, numOfCol1, numOfFilterCols, &pFilterInfo, 0);
}
SOperatorInfo* pSourceOperator = createDummyInputOperator(pSqlObjList[0], pSchema, numOfCol1, pFilterInfo, numOfFilterCols);
SSchema* schema = NULL;
if (px->numOfTables > 1) {
......@@ -1002,13 +1036,28 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue
SSchema* pSchema1 = tscGetTableSchema(px->pTableMetaInfo[i]->pTableMeta);
int32_t n = px->pTableMetaInfo[i]->pTableMeta->tableInfo.numOfColumns;
p[i] = createDummyInputOperator(pSqlObjList[i], pSchema1, n);
int32_t numOfFilterCols1 = 0;
SColumnInfo* tableCols1 = calloc(numOfCol1, sizeof(SColumnInfo));
for(int32_t j = 0; j < numOfCol1; ++j) {
SColumn* pCol = taosArrayGetP(px->colList, j);
if (pCol->info.flist.numOfFilters > 0) {
numOfFilterCols += 1;
}
tableCols1[j] = pCol->info;
}
SSingleColumnFilterInfo* pFilterInfo1 = NULL;
if (numOfFilterCols1 > 0) {
doCreateFilterInfo(tableCols1, numOfCol1, numOfFilterCols1, &pFilterInfo1, 0);
}
p[i] = createDummyInputOperator(pSqlObjList[i], pSchema1, n, pFilterInfo1, numOfFilterCols1);
memcpy(&schema[offset], pSchema1, n * sizeof(SSchema));
offset += n;
}
pSourceOperator = createJoinOperator(p, px->numOfTables, schema, num);
pSourceOperator = createJoinOperatorInfo(p, px->numOfTables, schema, num);
tfree(p);
} else {
size_t num = taosArrayGetSize(px->colList);
......@@ -2669,6 +2718,7 @@ STableMetaInfo* tscGetTableMetaInfoByUid(SQueryInfo* pQueryInfo, uint64_t uid, i
return tscGetMetaInfo(pQueryInfo, k);
}
// todo refactor
void tscInitQueryInfo(SQueryInfo* pQueryInfo) {
assert(pQueryInfo->fieldsInfo.internalField == NULL);
pQueryInfo->fieldsInfo.internalField = taosArrayInit(4, sizeof(SInternalField));
......@@ -4215,11 +4265,53 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
return TSDB_CODE_SUCCESS;
}
int tscTransferTableNameList(SSqlObj *pSql, const char *pNameList, int32_t length) {
static int32_t doAddTableName(char* nextStr, char** str, SArray* pNameArray, SSqlObj* pSql) {
int32_t code = TSDB_CODE_SUCCESS;
SSqlCmd* pCmd = &pSql->cmd;
char tablename[TSDB_TABLE_FNAME_LEN] = {0};
int32_t len = 0;
if (nextStr == NULL) {
strncpy(tablename, *str, TSDB_TABLE_FNAME_LEN);
len = strlen(tablename);
} else {
memcpy(tablename, *str, nextStr - (*str));
len = (int32_t)(nextStr - (*str));
tablename[len] = '\0';
}
(*str) = nextStr + 1;
len = (int32_t)strtrim(tablename);
SStrToken sToken = {.n = len, .type = TK_ID, .z = tablename};
tGetToken(tablename, &sToken.type);
// Check if the table name available or not
if (tscValidateName(&sToken) != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
sprintf(pCmd->payload, "table name is invalid");
return code;
}
SName name = {0};
if ((code = tscSetTableFullName(&name, &sToken, pSql)) != TSDB_CODE_SUCCESS) {
return code;
}
memset(tablename, 0, tListLen(tablename));
tNameExtractFullName(&name, tablename);
char* p = strdup(tablename);
taosArrayPush(pNameArray, &p);
return TSDB_CODE_SUCCESS;
}
int tscTransferTableNameList(SSqlObj *pSql, const char *pNameList, int32_t length, SArray* pNameArray) {
SSqlCmd *pCmd = &pSql->cmd;
pCmd->command = TSDB_SQL_MULTI_META;
pCmd->count = 0;
pCmd->msgType = TSDB_MSG_TYPE_CM_TABLES_META;
int code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
char *str = (char *)pNameList;
......@@ -4230,71 +4322,32 @@ int tscTransferTableNameList(SSqlObj *pSql, const char *pNameList, int32_t lengt
return terrno;
}
STableMetaInfo *pTableMetaInfo = tscAddEmptyMetaInfo(pQueryInfo);
if ((code = tscAllocPayload(pCmd, length + 16)) != TSDB_CODE_SUCCESS) {
return code;
}
char *nextStr;
char tblName[TSDB_TABLE_FNAME_LEN];
int payloadLen = 0;
char *pMsg = pCmd->payload;
while (1) {
nextStr = strchr(str, ',');
if (nextStr == NULL) {
code = doAddTableName(nextStr, &str, pNameArray, pSql);
break;
}
memcpy(tblName, str, nextStr - str);
int32_t len = (int32_t)(nextStr - str);
tblName[len] = '\0';
str = nextStr + 1;
len = (int32_t)strtrim(tblName);
SStrToken sToken = {.n = len, .type = TK_ID, .z = tblName};
tGetToken(tblName, &sToken.type);
// Check if the table name available or not
if (tscValidateName(&sToken) != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
sprintf(pCmd->payload, "table name is invalid");
return code;
}
if ((code = tscSetTableFullName(&pTableMetaInfo->name, &sToken, pSql)) != TSDB_CODE_SUCCESS) {
code = doAddTableName(nextStr, &str, pNameArray, pSql);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (++pCmd->count > TSDB_MULTI_TABLEMETA_MAX_NUM) {
if (taosArrayGetSize(pNameArray) > TSDB_MULTI_TABLEMETA_MAX_NUM) {
code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
sprintf(pCmd->payload, "tables over the max number");
return code;
}
int32_t xlen = tNameLen(&pTableMetaInfo->name);
if (payloadLen + xlen + 128 >= pCmd->allocSize) {
char *pNewMem = realloc(pCmd->payload, pCmd->allocSize + length);
if (pNewMem == NULL) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
sprintf(pCmd->payload, "failed to allocate memory");
return code;
}
pCmd->payload = pNewMem;
pCmd->allocSize = pCmd->allocSize + length;
pMsg = pCmd->payload;
}
char n[TSDB_TABLE_FNAME_LEN] = {0};
tNameExtractFullName(&pTableMetaInfo->name, n);
payloadLen += sprintf(pMsg + payloadLen, "%s,", n);
if (taosArrayGetSize(pNameArray) > TSDB_MULTI_TABLEMETA_MAX_NUM) {
code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
sprintf(pCmd->payload, "tables over the max number");
return code;
}
*(pMsg + payloadLen) = '\0';
pCmd->payloadLen = payloadLen + 1;
return TSDB_CODE_SUCCESS;
}
......@@ -2878,6 +2878,7 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
for (; t < pInfo->numOfTables; ++t) {
char *fullName = nameList[t];
pMsg->pVgroup = NULL;
pMsg->pTable = mnodeGetTable(fullName);
if (pMsg->pTable == NULL) {
mError("msg:%p, app:%p table:%s, failed to get table meta, table not exist", pMsg, pMsg->rpcMsg.ahandle, fullName);
......
......@@ -521,12 +521,17 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr,
int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter);
SOperatorInfo* createJoinOperator(SOperatorInfo** pUpstream, int32_t numOfUpstream, SSchema* pSchema, int32_t numOfOutput);
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pUpstream, int32_t numOfUpstream, SSchema* pSchema, int32_t numOfOutput);
SSDataBlock* doGlobalAggregate(void* param, bool* newgroup);
SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup);
SSDataBlock* doSLimit(void* param, bool* newgroup);
int32_t doCreateFilterInfo(SColumnInfo* pCols, int32_t numOfCols, int32_t numOfFilterCols, SSingleColumnFilterInfo** pFilterInfo, uint64_t qId);
void doSetFilterColumnInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, SSDataBlock* pBlock);
bool doFilterDataBlock(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, int32_t numOfRows, int8_t* p);
void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p);
SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows);
void* destroyOutputBuf(SSDataBlock* pBlock);
......
......@@ -178,8 +178,6 @@ static STsdbQueryCond createTsdbQueryCond(SQueryAttr* pQueryAttr, STimeWindow* w
static STableIdInfo createTableIdInfo(STableQueryInfo* pTableQueryInfo);
static void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInfo* pDownstream);
static int32_t doCreateFilterInfo(SColumnInfo* pCols, int32_t numOfCols, int32_t numOfFilterCols,
SSingleColumnFilterInfo** pFilterInfo, uint64_t qId);
static void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols);
static int32_t getNumOfScanTimes(SQueryAttr* pQueryAttr);
......@@ -1742,7 +1740,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
case OP_Aggregate: {
pRuntimeEnv->proot =
createAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
if (pRuntimeEnv->proot->upstream[0]->operatorType != OP_DummyInput) {
int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType;
if (opType != OP_DummyInput && opType != OP_Join) {
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
}
break;
......@@ -2099,7 +2099,7 @@ static int32_t updateBlockLoadStatus(SQueryAttr *pQuery, int32_t status) {
if (hasFirstLastFunc && status == BLK_DATA_NO_NEEDED) {
if(!hasOtherFunc) {
return BLK_DATA_DISCARD;
} else{
} else {
return BLK_DATA_ALL_NEEDED;
}
}
......@@ -2374,38 +2374,9 @@ static int32_t doTSJoinFilter(SQueryRuntimeEnv *pRuntimeEnv, TSKEY key, bool asc
return TS_JOIN_TS_EQUAL;
}
void filterRowsInDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols,
SSDataBlock* pBlock, bool ascQuery) {
int32_t numOfRows = pBlock->info.rows;
int8_t *p = calloc(numOfRows, sizeof(int8_t));
bool doFilterDataBlock(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, int32_t numOfRows, int8_t* p) {
bool all = true;
if (pRuntimeEnv->pTsBuf != NULL) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, 0);
TSKEY* k = (TSKEY*) pColInfoData->pData;
for (int32_t i = 0; i < numOfRows; ++i) {
int32_t offset = ascQuery? i:(numOfRows - i - 1);
int32_t ret = doTSJoinFilter(pRuntimeEnv, k[offset], ascQuery);
if (ret == TS_JOIN_TAG_NOT_EQUALS) {
break;
} else if (ret == TS_JOIN_TS_NOT_EQUALS) {
all = false;
continue;
} else {
assert(ret == TS_JOIN_TS_EQUAL);
p[offset] = true;
}
if (!tsBufNextPos(pRuntimeEnv->pTsBuf)) {
break;
}
}
// save the cursor status
pRuntimeEnv->current->cur = tsBufGetCursor(pRuntimeEnv->pTsBuf);
} else {
for (int32_t i = 0; i < numOfRows; ++i) {
bool qualified = false;
......@@ -2449,11 +2420,13 @@ void filterRowsInDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SSingleColumnFilterInf
all = false;
}
}
}
if (!all) {
int32_t start = 0;
return all;
}
void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p) {
int32_t len = 0;
int32_t start = 0;
for (int32_t j = 0; j < numOfRows; ++j) {
if (p[j] == 1) {
len++;
......@@ -2461,10 +2434,11 @@ void filterRowsInDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SSingleColumnFilterInf
if (len > 0) {
int32_t cstart = j - len;
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData *pColumnInfoData = taosArrayGet(pBlock->pDataBlock, i);
SColumnInfoData* pColumnInfoData = taosArrayGet(pBlock->pDataBlock, i);
int16_t bytes = pColumnInfoData->info.bytes;
memmove(((char*)pColumnInfoData->pData) + start * bytes, pColumnInfoData->pData + cstart * bytes, len * bytes);
memmove(((char*)pColumnInfoData->pData) + start * bytes, pColumnInfoData->pData + cstart * bytes,
len * bytes);
}
start += len;
......@@ -2476,7 +2450,7 @@ void filterRowsInDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SSingleColumnFilterInf
if (len > 0) {
int32_t cstart = numOfRows - len;
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData *pColumnInfoData = taosArrayGet(pBlock->pDataBlock, i);
SColumnInfoData* pColumnInfoData = taosArrayGet(pBlock->pDataBlock, i);
int16_t bytes = pColumnInfoData->info.bytes;
memmove(pColumnInfoData->pData + start * bytes, pColumnInfoData->pData + cstart * bytes, len * bytes);
......@@ -2497,6 +2471,45 @@ void filterRowsInDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SSingleColumnFilterInf
pBlock->info.window.ekey = *(int64_t*)(pColumnInfoData->pData + TSDB_KEYSIZE * (start - 1));
}
}
}
void filterRowsInDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols,
SSDataBlock* pBlock, bool ascQuery) {
int32_t numOfRows = pBlock->info.rows;
int8_t *p = calloc(numOfRows, sizeof(int8_t));
bool all = true;
if (pRuntimeEnv->pTsBuf != NULL) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, 0);
TSKEY* k = (TSKEY*) pColInfoData->pData;
for (int32_t i = 0; i < numOfRows; ++i) {
int32_t offset = ascQuery? i:(numOfRows - i - 1);
int32_t ret = doTSJoinFilter(pRuntimeEnv, k[offset], ascQuery);
if (ret == TS_JOIN_TAG_NOT_EQUALS) {
break;
} else if (ret == TS_JOIN_TS_NOT_EQUALS) {
all = false;
continue;
} else {
assert(ret == TS_JOIN_TS_EQUAL);
p[offset] = true;
}
if (!tsBufNextPos(pRuntimeEnv->pTsBuf)) {
break;
}
}
// save the cursor status
pRuntimeEnv->current->cur = tsBufGetCursor(pRuntimeEnv->pTsBuf);
} else {
all = doFilterDataBlock(pFilterInfo, numOfFilterCols, numOfRows, p);
}
if (!all) {
doCompactSDataBlock(pBlock, numOfRows, p);
}
tfree(p);
......@@ -2524,7 +2537,7 @@ static uint32_t doFilterByBlockTimeWindow(STableScanInfo* pTableScanInfo, SSData
return status;
}
static void doSetFilterColumnInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, SSDataBlock* pBlock) {
void doSetFilterColumnInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, SSDataBlock* pBlock) {
if (numOfFilterCols > 0 && pFilterInfo[0].pData != NULL) {
return;
}
......@@ -4920,27 +4933,6 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) {
return pBlock;
}
bool doFilterData(SColumnInfoData* p, int32_t rid, SColumnFilterElem *filterElem, __filter_func_t fp) {
char* input = p->pData + p->info.bytes * rid;
bool isnull = isNull(input, p->info.type);
if (isnull) {
return (fp == isNullOperator) ? true : false;
} else {
if (fp == notNullOperator) {
return true;
} else if (fp == isNullOperator) {
return false;
}
}
if (fp(filterElem, input, input, p->info.type)) {
return true;
}
return false;
}
static SSDataBlock* doFilter(void* param, bool* newgroup) {
SOperatorInfo *pOperator = (SOperatorInfo *)param;
if (pOperator->status == OP_EXEC_DONE) {
......@@ -5445,7 +5437,7 @@ SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI
return pOperator;
}
static SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int32_t* numOfFilterCols) {
SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int32_t* numOfFilterCols) {
SColumnInfo* pCols = calloc(numOfOutput, sizeof(SColumnInfo));
int32_t numOfFilter = 0;
......@@ -6663,8 +6655,7 @@ SGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pCo
return pGroupbyExpr;
}
static int32_t doCreateFilterInfo(SColumnInfo* pCols, int32_t numOfCols, int32_t numOfFilterCols,
SSingleColumnFilterInfo** pFilterInfo, uint64_t qId) {
int32_t doCreateFilterInfo(SColumnInfo* pCols, int32_t numOfCols, int32_t numOfFilterCols, SSingleColumnFilterInfo** pFilterInfo, uint64_t qId) {
*pFilterInfo = calloc(1, sizeof(SSingleColumnFilterInfo) * numOfFilterCols);
if (pFilterInfo == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
......
......@@ -7,7 +7,6 @@
#include <taos.h>
#include <unistd.h>
static void prepare_data(TAOS* taos) {
TAOS_RES *result;
result = taos_query(taos, "drop database if exists test;");
......@@ -69,7 +68,6 @@ static void prepare_data(TAOS* taos) {
usleep(1000000);
}
static int print_result(TAOS_RES* res, int blockFetch) {
TAOS_ROW row = NULL;
int num_fields = taos_num_fields(res);
......@@ -99,7 +97,6 @@ static int print_result(TAOS_RES* res, int blockFetch) {
return nRows;
}
static void check_row_count(int line, TAOS_RES* res, int expected) {
int actual = print_result(res, expected % 2);
if (actual != expected) {
......@@ -109,7 +106,6 @@ static void check_row_count(int line, TAOS_RES* res, int expected) {
}
}
static void verify_query(TAOS* taos) {
prepare_data(taos);
......@@ -153,7 +149,6 @@ static void verify_query(TAOS* taos) {
taos_free_result(res);
}
void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) {
int rows = print_result(res, *(int*)param);
printf("%d rows consumed in subscribe_callback\n", rows);
......@@ -235,10 +230,10 @@ static void verify_subscribe(TAOS* taos) {
taos_unsubscribe(tsub, 0);
}
void verify_prepare(TAOS* taos) {
TAOS_RES* result = taos_query(taos, "drop database if exists test;");
taos_free_result(result);
usleep(100000);
result = taos_query(taos, "create database test;");
......@@ -248,6 +243,7 @@ void verify_prepare(TAOS* taos) {
taos_free_result(result);
return;
}
taos_free_result(result);
usleep(100000);
......@@ -369,6 +365,7 @@ void verify_prepare(TAOS* taos) {
taos_stmt_add_batch(stmt);
}
if (taos_stmt_execute(stmt) != 0) {
taos_stmt_close(stmt);
printf("\033[31mfailed to execute insert statement.\033[0m\n");
return;
}
......@@ -381,6 +378,7 @@ void verify_prepare(TAOS* taos) {
v.v2 = 15;
taos_stmt_bind_param(stmt, params + 2);
if (taos_stmt_execute(stmt) != 0) {
taos_stmt_close(stmt);
printf("\033[31mfailed to execute select statement.\033[0m\n");
return;
}
......
......@@ -809,3 +809,5 @@ endi
if $data00 != 1 then
return -1
endi
print ====================> TODO stddev + normal column filter
......@@ -120,22 +120,88 @@ if $data00 != 167 then
endi
print ================>master query + filter
sql select * from (select count(*) a from nest_tb0 interval(10h)) where a <= 520;
sql select t.* from (select count(*) a from nest_tb0 interval(10h)) t where t.a <= 520;
if $rows != 2 then
return -1
endi
print ==================> nest query join
print ===================> nest query interval
print ===================> nest query interval
print ===================> complex query
print ===================> complex query
print ===================> group by + having
print =========================> nest query join
sql select a.ts,a.k,b.ts from (select count(*) k from nest_tb0 interval(30a)) a, (select count(*) f from nest_tb1 interval(30a)) b where a.ts = b.ts ;
if $rows != 10000 then
return -1
endi
if $data00 != @20-09-15 00:00:00.000@ then
return -1
endi
if $data01 != 1 then
return -1
endi
if $data02 != @20-09-15 00:00:00.000@ then
return -1
endi
if $data10 != @20-09-15 00:01:00.000@ then
return -1
endi
if $data11 != 1 then
return -1
endi
if $data12 != @20-09-15 00:01:00.000@ then
return -1
endi
sql select sum(a.k), sum(b.f) from (select count(*) k from nest_tb0 interval(30a)) a, (select count(*) f from nest_tb1 interval(30a)) b where a.ts = b.ts ;
if $rows != 1 then
return -1
endi
if $data00 != 10000 then
return -1
endi
if $data01 != 10000 then
return -1
endi
sql select a.ts,a.k,b.ts,c.ts,c.ts,c.x from (select count(*) k from nest_tb0 interval(30a)) a, (select count(*) f from nest_tb1 interval(30a)) b, (select count(*) x from nest_tb2 interval(30a)) c where a.ts = b.ts and a.ts = c.ts
if $rows != 10000 then
return -1
endi
if $data00 != @20-09-15 00:00:00.000@ then
return -1
endi
if $data01 != 1 then
return -1
endi
if $data02 != @20-09-15 00:00:00.000@ then
return -1
endi
if $data03 != @20-09-15 00:00:00.000@ then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册