提交 5389bdaa 编写于 作者: H hjxilinx

[td-186] fix bugs in table join.

上级 c03e84c1
......@@ -397,7 +397,7 @@ void tscFreeSqlResult(SSqlObj *pSql);
* Note: this function is multi-thread safe.
* @param pObj
*/
void tscFreeSqlObjPartial(SSqlObj *pObj);
void tscPartiallyFreeSqlObj(SSqlObj *pObj);
/**
* free sql object, release allocated resource
......
......@@ -1301,7 +1301,7 @@ int tsParseSql(SSqlObj *pSql, bool initialParse) {
char* p = pSql->sqlstr;
pSql->sqlstr = NULL;
tscFreeSqlObjPartial(pSql);
tscPartiallyFreeSqlObj(pSql);
pSql->sqlstr = p;
} else {
tscTrace("continue parse sql: %s", pSql->cmd.curSql);
......
......@@ -455,7 +455,7 @@ static int insertStmtExecute(STscStmt* stmt) {
// tscTrace("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj);
if (pRes->code != TSDB_CODE_SUCCESS) {
tscFreeSqlObjPartial(pSql);
tscPartiallyFreeSqlObj(pSql);
}
return pRes->code;
......
......@@ -2363,6 +2363,7 @@ bool hasUnsupportFunctionsForSTableQuery(SQueryInfo* pQueryInfo) {
static bool functionCompatibleCheck(SQueryInfo* pQueryInfo) {
int32_t startIdx = 0;
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, startIdx);
int32_t functionID = pExpr->functionId;
......@@ -2378,14 +2379,14 @@ static bool functionCompatibleCheck(SQueryInfo* pQueryInfo) {
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = startIdx + 1; i < size; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
SSqlExpr* pExpr1 = tscSqlExprGet(pQueryInfo, i);
int16_t functionId = pExpr->functionId;
int16_t functionId = pExpr1->functionId;
if (functionId == TSDB_FUNC_TAGPRJ || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TS) {
continue;
}
if (functionId == TSDB_FUNC_PRJ && pExpr->colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
if (functionId == TSDB_FUNC_PRJ && pExpr1->colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
continue;
}
......@@ -3531,7 +3532,7 @@ static int32_t setTableCondForSTableQuery(SQueryInfo* pQueryInfo, const char* ac
return TSDB_CODE_SUCCESS;
}
SStringBuilder sb1 = { 0 };
SStringBuilder sb1 = {0};
taosStringBuilderAppendStringLen(&sb1, QUERY_COND_REL_PREFIX_IN, QUERY_COND_REL_PREFIX_IN_LEN);
char db[TSDB_TABLE_ID_LEN] = {0};
......@@ -4030,9 +4031,7 @@ int32_t parseFillClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySQL) {
return invalidSqlErrMsg(pQueryInfo->msg, msg);
}
}
size_t size = taosArrayGetSize(pQueryInfo->exprList);
if ((pFillToken->nExpr < size) ||
((pFillToken->nExpr - 1 < size) && (tscIsPointInterpQuery(pQueryInfo)))) {
tVariantListItem* lastItem = &pFillToken->a[pFillToken->nExpr - 1];
......@@ -4228,7 +4227,24 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
const char* msg4 = "set tag value only available for table";
const char* msg5 = "only support add one tag";
const char* msg6 = "column can only be modified by super table";
const char* msg7 = "no tags can be dropped";
const char* msg8 = "only support one tag";
const char* msg9 = "tag name too long";
const char* msg10 = "invalid tag name";
const char* msg11 = "primary tag cannot be dropped";
const char* msg12 = "update normal column not supported";
const char* msg13 = "invalid tag value";
const char* msg14 = "tag value too long";
const char* msg15 = "no columns can be dropped";
const char* msg16 = "only support one column";
const char* msg17 = "invalid column name";
const char* msg18 = "primary timestamp column cannot be dropped";
SSqlCmd* pCmd = &pSql->cmd;
SAlterTableSQL* pAlterSQL = pInfo->pAlterInfo;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
......@@ -4274,24 +4290,18 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &pFieldList->p[0]);
} else if (pAlterSQL->type == TSDB_ALTER_TABLE_DROP_TAG_COLUMN) {
const char* msg1 = "no tags can be dropped";
const char* msg2 = "only support one tag";
const char* msg3 = "tag name too long";
const char* msg4 = "illegal tag name";
const char* msg5 = "primary tag cannot be dropped";
if (tscGetNumOfTags(pTableMeta) == 1) {
return invalidSqlErrMsg(pQueryInfo->msg, msg1);
return invalidSqlErrMsg(pQueryInfo->msg, msg7);
}
// numOfTags == 1
if (pAlterSQL->varList->nExpr > 1) {
return invalidSqlErrMsg(pQueryInfo->msg, msg2);
return invalidSqlErrMsg(pQueryInfo->msg, msg8);
}
tVariantListItem* pItem = &pAlterSQL->varList->a[0];
if (pItem->pVar.nLen > TSDB_COL_NAME_LEN) {
return invalidSqlErrMsg(pQueryInfo->msg, msg3);
return invalidSqlErrMsg(pQueryInfo->msg, msg9);
}
SColumnIndex index = COLUMN_INDEX_INITIALIZER;
......@@ -4302,9 +4312,9 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
}
if (index.columnIndex < tscGetNumOfColumns(pTableMeta)) {
return invalidSqlErrMsg(pQueryInfo->msg, msg4);
return invalidSqlErrMsg(pQueryInfo->msg, msg10);
} else if (index.columnIndex == 0) {
return invalidSqlErrMsg(pQueryInfo->msg, msg5);
return invalidSqlErrMsg(pQueryInfo->msg, msg11);
}
char name1[128] = {0};
......@@ -4313,9 +4323,6 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
TAOS_FIELD f = tscCreateField(TSDB_DATA_TYPE_INT, name1, tDataTypeDesc[TSDB_DATA_TYPE_INT].nSize);
tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
} else if (pAlterSQL->type == TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN) {
const char* msg1 = "tag name too long";
const char* msg2 = "invalid tag name";
tVariantList* pVarList = pAlterSQL->varList;
if (pVarList->nExpr > 2) {
return TSDB_CODE_INVALID_SQL;
......@@ -4325,11 +4332,11 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
tVariantListItem* pDstItem = &pAlterSQL->varList->a[1];
if (pSrcItem->pVar.nLen >= TSDB_COL_NAME_LEN || pDstItem->pVar.nLen >= TSDB_COL_NAME_LEN) {
return invalidSqlErrMsg(pQueryInfo->msg, msg1);
return invalidSqlErrMsg(pQueryInfo->msg, msg9);
}
if (pSrcItem->pVar.nType != TSDB_DATA_TYPE_BINARY || pDstItem->pVar.nType != TSDB_DATA_TYPE_BINARY) {
return invalidSqlErrMsg(pQueryInfo->msg, msg2);
return invalidSqlErrMsg(pQueryInfo->msg, msg10);
}
SColumnIndex srcIndex = COLUMN_INDEX_INITIALIZER;
......@@ -4355,10 +4362,6 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
f = tscCreateField(TSDB_DATA_TYPE_INT, name, tDataTypeDesc[TSDB_DATA_TYPE_INT].nSize);
tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
} else if (pAlterSQL->type == TSDB_ALTER_TABLE_UPDATE_TAG_VAL) {
const char* msg1 = "invalid tag value";
const char* msg2 = "update normal column not supported";
const char* msg3 = "tag value too long";
// Note: update can only be applied to table not super table.
// the following is handle display tags value for meters created according to super table
tVariantList* pVarList = pAlterSQL->varList;
......@@ -4371,19 +4374,19 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
}
if (columnIndex.columnIndex < tscGetNumOfColumns(pTableMeta)) {
return invalidSqlErrMsg(pQueryInfo->msg, msg2);
return invalidSqlErrMsg(pQueryInfo->msg, msg12);
}
SSchema* pTagsSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, columnIndex.columnIndex);
if (tVariantDump(&pVarList->a[1].pVar, pAlterSQL->tagData.data /*pCmd->payload*/, pTagsSchema->type) !=
TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(pQueryInfo->msg, msg1);
return invalidSqlErrMsg(pQueryInfo->msg, msg13);
}
// validate the length of binary
if ((pTagsSchema->type == TSDB_DATA_TYPE_BINARY || pTagsSchema->type == TSDB_DATA_TYPE_NCHAR) &&
pVarList->a[1].pVar.nLen > pTagsSchema->bytes) {
return invalidSqlErrMsg(pQueryInfo->msg, msg3);
return invalidSqlErrMsg(pQueryInfo->msg, msg14);
}
char name1[128] = {0};
......@@ -4405,17 +4408,12 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &pFieldList->p[0]);
} else if (pAlterSQL->type == TSDB_ALTER_TABLE_DROP_COLUMN) {
const char* msg1 = "no columns can be dropped";
const char* msg2 = "only support one column";
const char* msg4 = "illegal column name";
const char* msg3 = "primary timestamp column cannot be dropped";
if (tscGetNumOfColumns(pTableMeta) == TSDB_MIN_COLUMNS) { //
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg15);
}
if (pAlterSQL->varList->nExpr > 1) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg16);
}
tVariantListItem* pItem = &pAlterSQL->varList->a[0];
......@@ -4423,11 +4421,11 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
SColumnIndex columnIndex = COLUMN_INDEX_INITIALIZER;
SSQLToken name = {.type = TK_STRING, .z = pItem->pVar.pz, .n = pItem->pVar.nLen};
if (getColumnIndexByName(&name, pQueryInfo, &columnIndex) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(pQueryInfo->msg, msg4);
return invalidSqlErrMsg(pQueryInfo->msg, msg17);
}
if (columnIndex.columnIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
return invalidSqlErrMsg(pQueryInfo->msg, msg3);
return invalidSqlErrMsg(pQueryInfo->msg, msg18);
}
char name1[TSDB_COL_NAME_LEN + 1] = {0};
......
......@@ -502,7 +502,7 @@ void tscKillSTableQuery(SSqlObj *pSql) {
}
}
tscTrace("%p metric query is cancelled", pSql);
tscTrace("%p super table query cancelled", pSql);
}
int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
......@@ -649,7 +649,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
numOfTables = 1;
tscSetDnodeIpList(pSql, pTableMeta);
pQueryMsg->head.vgId = htonl(pTableMeta->vgroupInfo.vgId);
tscTrace("%p queried tables:%d, table id: %s", pSql, 1, pTableMetaInfo->name);
tscTrace("%p queried tables:%d, table name: %s", pSql, 1, pTableMetaInfo->name);
} else { // query super table
int32_t index = pTableMetaInfo->vgroupIndex;
if (index < 0) {
......
......@@ -270,7 +270,7 @@ int taos_query_imp(STscObj *pObj, SSqlObj *pSql) {
}
if (pRes->code != TSDB_CODE_SUCCESS) {
tscFreeSqlObjPartial(pSql);
tscPartiallyFreeSqlObj(pSql);
}
return pRes->code;
......@@ -576,7 +576,7 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) {
if (keepCmd) {
tscFreeSqlResult(pSql);
} else {
tscFreeSqlObjPartial(pSql);
tscPartiallyFreeSqlObj(pSql);
}
}
......@@ -586,7 +586,7 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) {
// set freeFlag to 1 in retrieve message if there are un-retrieved results
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
if (pQueryInfo == NULL) {
tscFreeSqlObjPartial(pSql);
tscPartiallyFreeSqlObj(pSql);
return;
}
......@@ -638,7 +638,7 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) {
tscFreeSqlResult(pSql);
tscTrace("%p sql result is freed by app while sql command is kept", pSql);
} else {
tscFreeSqlObjPartial(pSql);
tscPartiallyFreeSqlObj(pSql);
tscTrace("%p sql result is freed by app", pSql);
}
} else { // for async release, remove its link
......@@ -659,7 +659,7 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) {
tscFreeSqlResult(pSql);
tscTrace("%p sql result is freed while sql command is kept", pSql);
} else {
tscFreeSqlObjPartial(pSql);
tscPartiallyFreeSqlObj(pSql);
tscTrace("%p sql result is freed by app", pSql);
}
}
......@@ -1004,7 +1004,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
tscTrace("%p load multi metermeta result:%d %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj);
if (pRes->code != TSDB_CODE_SUCCESS) {
tscFreeSqlObjPartial(pSql);
tscPartiallyFreeSqlObj(pSql);
}
return pRes->code;
......
......@@ -237,8 +237,8 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
SJoinSubquerySupporter* pSupporter = NULL;
/*
* If the columns are not involved in the final select clause, the secondary query will not be launched
* for the subquery.
* If the columns are not involved in the final select clause,
* the corresponding query will not be issued.
*/
SSubqueryState* pState = NULL;
......@@ -269,7 +269,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
pSupporter = pPrevSub->param;
if (taosArrayGetSize(pSupporter->exprList) == 0) {
tscTrace("%p subIndex: %d, not need to launch query, ignore it", pSql, i);
tscTrace("%p subIndex: %d, no need to launch query, ignore it", pSql, i);
tscDestroyJoinSupporter(pSupporter);
tscFreeSqlObj(pPrevSub);
......@@ -314,15 +314,6 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
pSupporter->exprList = NULL;
pSupporter->colList = NULL;
memset(&pSupporter->fieldsInfo, 0, sizeof(SFieldInfo));
/*
* if the first column of the secondary query is not ts function, add this function.
* Because this column is required to filter with timestamp after intersecting.
*/
// SSqlExpr* pExpr = taosArrayGet(pQueryInfo->exprList, 0);
// if (pExpr->functionId != TSDB_FUNC_TS) {
// tscAddTimestampColumn(pQueryInfo, TSDB_FUNC_TS, 0);
// }
SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
assert(pNew->numOfSubs == 0 && pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
......@@ -348,8 +339,6 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
pExpr->numOfParams = 1;
}
tscPrintSelectClause(pNew, 0);
size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList);
tscTrace("%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s",
pSql, pNew, 0, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type,
......@@ -917,7 +906,7 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu
size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList);
tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, transfer to ts_comp query to retrieve timestamps, "
tscTrace("%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, transfer to ts_comp query to retrieve timestamps, "
"exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s",
pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type,
tscSqlExprNumOfExprs(pNewQueryInfo), numOfCols,
......@@ -928,7 +917,6 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu
pNewQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY;
}
tscPrintSelectClause(pNew, 0);
return tscProcessSql(pNew);
}
......@@ -938,9 +926,9 @@ int32_t tscHandleMasterJoinQuery(SSqlObj* pSql) {
assert((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0);
SSubqueryState *pState = calloc(1, sizeof(SSubqueryState));
pState->numOfTotal = pQueryInfo->numOfTables;
tscTrace("%p start launched subquery, total:%d", pSql, pQueryInfo->numOfTables);
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
SJoinSubquerySupporter *pSupporter = tscCreateJoinSupporter(pSql, pState, i);
......
......@@ -301,12 +301,10 @@ int32_t tscCreateResPointerInfo(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
}
void tscDestroyResPointerInfo(SSqlRes* pRes) {
if (pRes->buffer != NULL) {
// free all buffers containing the multibyte string
if (pRes->buffer != NULL) { // free all buffers containing the multibyte string
for (int i = 0; i < pRes->numOfCols; i++) {
tfree(pRes->buffer[i]);
}
pRes->numOfCols = 0;
}
......@@ -335,50 +333,15 @@ void tscResetSqlCmdObj(SSqlCmd* pCmd) {
tscFreeQueryInfo(pCmd);
}
/*
* this function must not change the pRes->code value, since it may be used later.
*/
void tscFreeResData(SSqlObj* pSql) {
SSqlRes* pRes = &pSql->res;
pRes->row = 0;
pRes->rspType = 0;
pRes->rspLen = 0;
pRes->row = 0;
pRes->numOfRows = 0;
pRes->numOfTotal = 0;
pRes->numOfTotalInCurrentClause = 0;
pRes->numOfGroups = 0;
pRes->precision = 0;
pRes->qhandle = 0;
pRes->offset = 0;
pRes->useconds = 0;
void tscFreeSqlResult(SSqlObj* pSql) {
tscDestroyLocalReducer(pSql);
SSqlRes* pRes = &pSql->res;
tscDestroyResPointerInfo(pRes);
memset(&pSql->res, 0, sizeof(SSqlRes));
}
void tscFreeSqlResult(SSqlObj* pSql) {
tfree(pSql->res.pRsp);
pSql->res.row = 0;
pSql->res.numOfRows = 0;
pSql->res.numOfTotal = 0;
pSql->res.numOfGroups = 0;
tfree(pSql->res.pGroupRec);
tscDestroyLocalReducer(pSql);
tscDestroyResPointerInfo(&pSql->res);
tfree(pSql->res.pColumnIndex);
}
void tscFreeSqlObjPartial(SSqlObj* pSql) {
void tscPartiallyFreeSqlObj(SSqlObj* pSql) {
if (pSql == NULL || pSql->signature != pSql) {
return;
}
......@@ -412,7 +375,7 @@ void tscFreeSqlObj(SSqlObj* pSql) {
if (pSql == NULL || pSql->signature != pSql) return;
tscTrace("%p start to free sql object", pSql);
tscFreeSqlObjPartial(pSql);
tscPartiallyFreeSqlObj(pSql);
pSql->signature = NULL;
pSql->fp = NULL;
......@@ -1897,7 +1860,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
size_t size = taosArrayGetSize(pNewQueryInfo->colList);
tscTrace(
"%p new subquery: %p, tableIndex:%d, vnodeIdx:%d, type:%d, exprInfo:%d, colList:%d,"
"%p new subquery:%p, tableIndex:%d, vgroupIndex:%d, type:%d, exprInfo:%d, colList:%d,"
"fieldInfo:%d, name:%s, qrang:%" PRId64 " - %" PRId64 " order:%d, limit:%" PRId64,
pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscSqlExprNumOfExprs(pNewQueryInfo),
size, pNewQueryInfo->fieldsInfo.numOfOutput, pFinalInfo->name, pNewQueryInfo->window.skey,
......@@ -2107,7 +2070,7 @@ void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)()) {
//backup the total number of result first
int64_t num = pRes->numOfTotal + pRes->numOfTotalInCurrentClause;
tscFreeResData(pSql);
tscFreeSqlResult(pSql);
pRes->numOfTotal = num;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册