未验证 提交 9ca99fa7 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2798 from taosdata/feature/query

[td-225] fix deadlock for client during query.
...@@ -60,15 +60,14 @@ typedef struct SCMCorVgroupInfo { ...@@ -60,15 +60,14 @@ typedef struct SCMCorVgroupInfo {
} SCMCorVgroupInfo; } SCMCorVgroupInfo;
typedef struct STableMeta { typedef struct STableMeta {
STableComInfo tableInfo; STableComInfo tableInfo;
uint8_t tableType; uint8_t tableType;
int16_t sversion; int16_t sversion;
int16_t tversion; int16_t tversion;
SCMVgroupInfo vgroupInfo; SCMVgroupInfo vgroupInfo;
SCMCorVgroupInfo corVgroupInfo; SCMCorVgroupInfo corVgroupInfo;
int32_t sid; // the index of one table in a virtual node STableId id;
uint64_t uid; // unique id of a table SSchema schema[]; // if the table is TSDB_CHILD_TABLE, schema is acquired by super table meta info
SSchema schema[]; // if the table is TSDB_CHILD_TABLE, schema is acquired by super table meta info
} STableMeta; } STableMeta;
typedef struct STableMetaInfo { typedef struct STableMetaInfo {
......
...@@ -629,8 +629,8 @@ int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int3 ...@@ -629,8 +629,8 @@ int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int3
} }
static void tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows) { static void tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows) {
pBlocks->tid = pTableMeta->sid; pBlocks->tid = pTableMeta->id.tid;
pBlocks->uid = pTableMeta->uid; pBlocks->uid = pTableMeta->id.uid;
pBlocks->sversion = pTableMeta->sversion; pBlocks->sversion = pTableMeta->sversion;
pBlocks->numOfRows += numOfRows; pBlocks->numOfRows += numOfRows;
} }
...@@ -686,7 +686,7 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableList, char **st ...@@ -686,7 +686,7 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableList, char **st
STableComInfo tinfo = tscGetTableInfo(pTableMeta); STableComInfo tinfo = tscGetTableInfo(pTableMeta);
STableDataBlocks *dataBuf = NULL; STableDataBlocks *dataBuf = NULL;
int32_t ret = tscGetDataBlockFromList(pTableList, pCmd->pDataBlocks, pTableMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE, int32_t ret = tscGetDataBlockFromList(pTableList, pCmd->pDataBlocks, pTableMeta->id.uid, TSDB_DEFAULT_PAYLOAD_SIZE,
sizeof(SSubmitBlk), tinfo.rowSize, pTableMetaInfo->name, sizeof(SSubmitBlk), tinfo.rowSize, pTableMetaInfo->name,
pTableMeta, &dataBuf); pTableMeta, &dataBuf);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
......
...@@ -635,7 +635,7 @@ int32_t parseIntervalClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQ ...@@ -635,7 +635,7 @@ int32_t parseIntervalClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQ
int32_t tableIndex = COLUMN_INDEX_INITIAL_VAL; int32_t tableIndex = COLUMN_INDEX_INITIAL_VAL;
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i); pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
if (pTableMetaInfo->pTableMeta->uid == uid) { if (pTableMetaInfo->pTableMeta->id.uid == uid) {
tableIndex = i; tableIndex = i;
break; break;
} }
...@@ -3053,7 +3053,7 @@ static int32_t getJoinCondInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSQLExpr* ...@@ -3053,7 +3053,7 @@ static int32_t getJoinCondInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSQLExpr*
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
SSchema* pTagSchema1 = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex); SSchema* pTagSchema1 = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex);
pLeft->uid = pTableMetaInfo->pTableMeta->uid; pLeft->uid = pTableMetaInfo->pTableMeta->id.uid;
pLeft->tagColId = pTagSchema1->colId; pLeft->tagColId = pTagSchema1->colId;
strcpy(pLeft->tableId, pTableMetaInfo->name); strcpy(pLeft->tableId, pTableMetaInfo->name);
...@@ -3065,7 +3065,7 @@ static int32_t getJoinCondInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSQLExpr* ...@@ -3065,7 +3065,7 @@ static int32_t getJoinCondInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSQLExpr*
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
SSchema* pTagSchema2 = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex); SSchema* pTagSchema2 = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex);
pRight->uid = pTableMetaInfo->pTableMeta->uid; pRight->uid = pTableMetaInfo->pTableMeta->id.uid;
pRight->tagColId = pTagSchema2->colId; pRight->tagColId = pTagSchema2->colId;
strcpy(pRight->tableId, pTableMetaInfo->name); strcpy(pRight->tableId, pTableMetaInfo->name);
...@@ -3603,7 +3603,7 @@ static int32_t setTableCondForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, ...@@ -3603,7 +3603,7 @@ static int32_t setTableCondForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo,
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableCondIndex); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableCondIndex);
STagCond* pTagCond = &pQueryInfo->tagCond; STagCond* pTagCond = &pQueryInfo->tagCond;
pTagCond->tbnameCond.uid = pTableMetaInfo->pTableMeta->uid; pTagCond->tbnameCond.uid = pTableMetaInfo->pTableMeta->id.uid;
assert(pExpr->nSQLOptr == TK_LIKE || pExpr->nSQLOptr == TK_IN); assert(pExpr->nSQLOptr == TK_LIKE || pExpr->nSQLOptr == TK_IN);
...@@ -3840,7 +3840,7 @@ static int32_t getTagQueryCondExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SCondE ...@@ -3840,7 +3840,7 @@ static int32_t getTagQueryCondExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SCondE
// add to source column list // add to source column list
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
int64_t uid = pTableMetaInfo->pTableMeta->uid; int64_t uid = pTableMetaInfo->pTableMeta->id.uid;
int32_t numOfCols = tscGetNumOfColumns(pTableMetaInfo->pTableMeta); int32_t numOfCols = tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
size_t num = taosArrayGetSize(colList); size_t num = taosArrayGetSize(colList);
...@@ -4506,8 +4506,8 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -4506,8 +4506,8 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) pCmd->payload; SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) pCmd->payload;
pUpdateMsg->head.vgId = htonl(pTableMeta->vgroupInfo.vgId); pUpdateMsg->head.vgId = htonl(pTableMeta->vgroupInfo.vgId);
pUpdateMsg->tid = htonl(pTableMeta->sid); pUpdateMsg->tid = htonl(pTableMeta->id.tid);
pUpdateMsg->uid = htobe64(pTableMeta->uid); pUpdateMsg->uid = htobe64(pTableMeta->id.uid);
pUpdateMsg->colId = htons(pTagsSchema->colId); pUpdateMsg->colId = htons(pTagsSchema->colId);
pUpdateMsg->type = pTagsSchema->type; pUpdateMsg->type = pTagsSchema->type;
pUpdateMsg->bytes = htons(pTagsSchema->bytes); pUpdateMsg->bytes = htons(pTagsSchema->bytes);
...@@ -5045,7 +5045,7 @@ void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClau ...@@ -5045,7 +5045,7 @@ void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClau
if (pExpr->functionId != TSDB_FUNC_TAG) { if (pExpr->functionId != TSDB_FUNC_TAG) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
int16_t columnInfo = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->uid); int16_t columnInfo = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid);
SColumnIndex index = {.tableIndex = 0, .columnIndex = columnInfo}; SColumnIndex index = {.tableIndex = 0, .columnIndex = columnInfo};
SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta); SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);
......
...@@ -162,8 +162,8 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size ...@@ -162,8 +162,8 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size
.numOfColumns = pTableMetaMsg->numOfColumns, .numOfColumns = pTableMetaMsg->numOfColumns,
}; };
pTableMeta->sid = pTableMetaMsg->sid; pTableMeta->id.tid = pTableMetaMsg->sid;
pTableMeta->uid = pTableMetaMsg->uid; pTableMeta->id.uid = pTableMetaMsg->uid;
pTableMeta->vgroupInfo = pTableMetaMsg->vgroup; pTableMeta->vgroupInfo = pTableMetaMsg->vgroup;
tscInitCorVgroupInfo(&pTableMeta->corVgroupInfo, &pTableMeta->vgroupInfo); tscInitCorVgroupInfo(&pTableMeta->corVgroupInfo, &pTableMeta->vgroupInfo);
......
...@@ -605,9 +605,9 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char ...@@ -605,9 +605,9 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
} }
STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg; STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
pTableIdInfo->tid = htonl(pTableMeta->sid); pTableIdInfo->tid = htonl(pTableMeta->id.tid);
pTableIdInfo->uid = htobe64(pTableMeta->uid); pTableIdInfo->uid = htobe64(pTableMeta->id.uid);
pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid, dfltKey)); pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->id.uid, dfltKey));
pQueryMsg->numOfTables = htonl(1); // set the number of tables pQueryMsg->numOfTables = htonl(1); // set the number of tables
pMsg += sizeof(STableIdInfo); pMsg += sizeof(STableIdInfo);
...@@ -640,7 +640,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char ...@@ -640,7 +640,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
} }
tscDebug("%p vgId:%d, query on table:%s, tid:%d, uid:%" PRIu64, pSql, htonl(pQueryMsg->head.vgId), pTableMetaInfo->name, tscDebug("%p vgId:%d, query on table:%s, tid:%d, uid:%" PRIu64, pSql, htonl(pQueryMsg->head.vgId), pTableMetaInfo->name,
pTableMeta->sid, pTableMeta->uid); pTableMeta->id.tid, pTableMeta->id.uid);
return pMsg; return pMsg;
} }
...@@ -714,8 +714,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -714,8 +714,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || pColSchema->type < TSDB_DATA_TYPE_BOOL || if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || pColSchema->type < TSDB_DATA_TYPE_BOOL ||
pColSchema->type > TSDB_DATA_TYPE_NCHAR) { pColSchema->type > TSDB_DATA_TYPE_NCHAR) {
tscError("%p sid:%d uid:%" PRIu64" id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s", tscError("%p tid:%d uid:%" PRIu64" id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s",
pSql, pTableMeta->sid, pTableMeta->uid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex.columnIndex, pSql, pTableMeta->id.tid, pTableMeta->id.uid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex.columnIndex,
pColSchema->name); pColSchema->name);
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
...@@ -833,8 +833,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -833,8 +833,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
if ((pCol->colIndex.columnIndex >= numOfTagColumns || pCol->colIndex.columnIndex < -1) || if ((pCol->colIndex.columnIndex >= numOfTagColumns || pCol->colIndex.columnIndex < -1) ||
(pColSchema->type < TSDB_DATA_TYPE_BOOL || pColSchema->type > TSDB_DATA_TYPE_NCHAR)) { (pColSchema->type < TSDB_DATA_TYPE_BOOL || pColSchema->type > TSDB_DATA_TYPE_NCHAR)) {
tscError("%p sid:%d uid:%" PRIu64 " id:%s, tag index out of range, totalCols:%d, numOfTags:%d, index:%d, column name:%s", tscError("%p tid:%d uid:%" PRIu64 " id:%s, tag index out of range, totalCols:%d, numOfTags:%d, index:%d, column name:%s",
pSql, pTableMeta->sid, pTableMeta->uid, pTableMetaInfo->name, total, numOfTagColumns, pSql, pTableMeta->id.tid, pTableMeta->id.uid, pTableMetaInfo->name, total, numOfTagColumns,
pCol->colIndex.columnIndex, pColSchema->name); pCol->colIndex.columnIndex, pColSchema->name);
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
...@@ -855,7 +855,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -855,7 +855,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
if (pQueryInfo->tagCond.pCond != NULL && taosArrayGetSize(pQueryInfo->tagCond.pCond) > 0) { if (pQueryInfo->tagCond.pCond != NULL && taosArrayGetSize(pQueryInfo->tagCond.pCond) > 0) {
STagCond* pTagCond = &pQueryInfo->tagCond; STagCond* pTagCond = &pQueryInfo->tagCond;
SCond *pCond = tsGetSTableQueryCond(pTagCond, pTableMeta->uid); SCond *pCond = tsGetSTableQueryCond(pTagCond, pTableMeta->id.uid);
if (pCond != NULL && pCond->cond != NULL) { if (pCond != NULL && pCond->cond != NULL) {
pQueryMsg->tagCondLen = htons(pCond->len); pQueryMsg->tagCondLen = htons(pCond->len);
memcpy(pMsg, pCond->cond, pCond->len); memcpy(pMsg, pCond->cond, pCond->len);
...@@ -1739,7 +1739,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { ...@@ -1739,7 +1739,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
tscDebug("%p recv table meta, uid:%"PRId64 ", tid:%d, name:%s", pSql, pTableMeta->uid, pTableMeta->sid, pTableMetaInfo->name); tscDebug("%p recv table meta, uid:%"PRId64 ", tid:%d, name:%s", pSql, pTableMeta->id.uid, pTableMeta->id.tid, pTableMetaInfo->name);
free(pTableMeta); free(pTableMeta);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -2215,7 +2215,7 @@ int tscRenewTableMeta(SSqlObj *pSql, char *tableId) { ...@@ -2215,7 +2215,7 @@ int tscRenewTableMeta(SSqlObj *pSql, char *tableId) {
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
if (pTableMetaInfo->pTableMeta) { if (pTableMetaInfo->pTableMeta) {
tscDebug("%p update table meta, old meta numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql, tscDebug("%p update table meta, old meta numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->uid, pTableMeta); tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->id.uid, pTableMeta);
} }
taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true); taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
......
...@@ -241,7 +241,7 @@ static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) { ...@@ -241,7 +241,7 @@ static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) { if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) {
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
SSubscriptionProgress target = {.uid = pTableMeta->uid, .key = 0}; SSubscriptionProgress target = {.uid = pTableMeta->id.uid, .key = 0};
SSubscriptionProgress* p = taosArraySearch(pSub->progress, &target, tscCompareSubscriptionProgress); SSubscriptionProgress* p = taosArraySearch(pSub->progress, &target, tscCompareSubscriptionProgress);
if (p == NULL) { if (p == NULL) {
taosArrayClear(pSub->progress); taosArrayClear(pSub->progress);
......
...@@ -180,7 +180,7 @@ SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, in ...@@ -180,7 +180,7 @@ SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, in
pSupporter->limit = pQueryInfo->limit; pSupporter->limit = pQueryInfo->limit;
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, index); STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, index);
pSupporter->uid = pTableMetaInfo->pTableMeta->uid; pSupporter->uid = pTableMetaInfo->pTableMeta->id.uid;
assert (pSupporter->uid != 0); assert (pSupporter->uid != 0);
getTmpfilePath("join-", pSupporter->path); getTmpfilePath("join-", pSupporter->path);
...@@ -355,7 +355,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { ...@@ -355,7 +355,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
// set the join condition tag column info, to do extract method // set the join condition tag column info, to do extract method
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
assert(pQueryInfo->tagCond.joinInfo.hasJoin); assert(pQueryInfo->tagCond.joinInfo.hasJoin);
int16_t colId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->uid); int16_t colId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid);
pExpr->param[0].i64Key = colId; pExpr->param[0].i64Key = colId;
pExpr->numOfParams = 1; pExpr->numOfParams = 1;
...@@ -499,7 +499,7 @@ static void issueTSCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj* ...@@ -499,7 +499,7 @@ static void issueTSCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj*
// set the tags value for ts_comp function // set the tags value for ts_comp function
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, 0); SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, 0);
int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->uid); int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->id.uid);
pExpr->param->i64Key = tagColId; pExpr->param->i64Key = tagColId;
pExpr->numOfParams = 1; pExpr->numOfParams = 1;
} }
...@@ -560,7 +560,7 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar ...@@ -560,7 +560,7 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar
qsort(p2->pIdTagList, p2->num, p2->tagSize, tscCompareTidTags); qsort(p2->pIdTagList, p2->num, p2->tagSize, tscCompareTidTags);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
int16_t tagColId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->uid); int16_t tagColId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid);
SSchema* pColSchema = tscGetTableColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId); SSchema* pColSchema = tscGetTableColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId);
...@@ -1034,7 +1034,7 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) { ...@@ -1034,7 +1034,7 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) {
int32_t tableIndexOfSub = -1; int32_t tableIndexOfSub = -1;
for (int32_t j = 0; j < pQueryInfo->numOfTables; ++j) { for (int32_t j = 0; j < pQueryInfo->numOfTables; ++j) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, j); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, j);
if (pTableMetaInfo->pTableMeta->uid == pExpr->uid) { if (pTableMetaInfo->pTableMeta->id.uid == pExpr->uid) {
tableIndexOfSub = j; tableIndexOfSub = j;
break; break;
} }
...@@ -1205,7 +1205,7 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter ...@@ -1205,7 +1205,7 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
STagCond* pTagCond = &pSupporter->tagCond; STagCond* pTagCond = &pSupporter->tagCond;
assert(pTagCond->joinInfo.hasJoin); assert(pTagCond->joinInfo.hasJoin);
int32_t tagColId = tscGetJoinTagColIdByUid(pTagCond, pTableMetaInfo->pTableMeta->uid); int32_t tagColId = tscGetJoinTagColIdByUid(pTagCond, pTableMetaInfo->pTableMeta->id.uid);
SSchema* s = tscGetTableColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId); SSchema* s = tscGetTableColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId);
int16_t bytes = 0; int16_t bytes = 0;
...@@ -1237,7 +1237,7 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter ...@@ -1237,7 +1237,7 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
SSqlExpr *pExpr = tscSqlExprGet(pNewQueryInfo, 0); SSqlExpr *pExpr = tscSqlExprGet(pNewQueryInfo, 0);
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->uid); int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->id.uid);
pExpr->param->i64Key = tagColId; pExpr->param->i64Key = tagColId;
pExpr->numOfParams = 1; pExpr->numOfParams = 1;
} }
......
...@@ -955,7 +955,7 @@ static SSqlExpr* doBuildSqlExpr(SQueryInfo* pQueryInfo, int16_t functionId, SCol ...@@ -955,7 +955,7 @@ static SSqlExpr* doBuildSqlExpr(SQueryInfo* pQueryInfo, int16_t functionId, SCol
pExpr->interBytes = interSize; pExpr->interBytes = interSize;
if (pTableMetaInfo->pTableMeta) { if (pTableMetaInfo->pTableMeta) {
pExpr->uid = pTableMetaInfo->pTableMeta->uid; pExpr->uid = pTableMetaInfo->pTableMeta->id.uid;
} }
return pExpr; return pExpr;
...@@ -1482,7 +1482,7 @@ STableMetaInfo* tscGetTableMetaInfoByUid(SQueryInfo* pQueryInfo, uint64_t uid, i ...@@ -1482,7 +1482,7 @@ STableMetaInfo* tscGetTableMetaInfoByUid(SQueryInfo* pQueryInfo, uint64_t uid, i
int32_t k = -1; int32_t k = -1;
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
if (pQueryInfo->pTableMetaInfo[i]->pTableMeta->uid == uid) { if (pQueryInfo->pTableMetaInfo[i]->pTableMeta->id.uid == uid) {
k = i; k = i;
break; break;
} }
...@@ -1760,7 +1760,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void ...@@ -1760,7 +1760,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
TSDB_QUERY_SET_TYPE(pNewQueryInfo->type, TSDB_QUERY_TYPE_SUBQUERY);// it must be the subquery TSDB_QUERY_SET_TYPE(pNewQueryInfo->type, TSDB_QUERY_TYPE_SUBQUERY);// it must be the subquery
} }
uint64_t uid = pTableMetaInfo->pTableMeta->uid; uint64_t uid = pTableMetaInfo->pTableMeta->id.uid;
tscSqlExprCopy(pNewQueryInfo->exprList, pQueryInfo->exprList, uid, true); tscSqlExprCopy(pNewQueryInfo->exprList, pQueryInfo->exprList, uid, true);
int32_t numOfOutput = tscSqlExprNumOfExprs(pNewQueryInfo); int32_t numOfOutput = tscSqlExprNumOfExprs(pNewQueryInfo);
......
...@@ -6287,10 +6287,17 @@ void qDestroyQueryInfo(qinfo_t qHandle) { ...@@ -6287,10 +6287,17 @@ void qDestroyQueryInfo(qinfo_t qHandle) {
freeQInfo(pQInfo); freeQInfo(pQInfo);
} }
static void setQueryResultReady(SQInfo* pQInfo) { static bool doBuildResCheck(SQInfo* pQInfo) {
bool buildRes = false;
pthread_mutex_lock(&pQInfo->lock); pthread_mutex_lock(&pQInfo->lock);
pQInfo->dataReady = QUERY_RESULT_READY; pQInfo->dataReady = QUERY_RESULT_READY;
buildRes = (pQInfo->rspContext != NULL);
pthread_mutex_unlock(&pQInfo->lock); pthread_mutex_unlock(&pQInfo->lock);
return buildRes;
} }
bool qTableQuery(qinfo_t qinfo) { bool qTableQuery(qinfo_t qinfo) {
...@@ -6303,16 +6310,13 @@ bool qTableQuery(qinfo_t qinfo) { ...@@ -6303,16 +6310,13 @@ bool qTableQuery(qinfo_t qinfo) {
if (IS_QUERY_KILLED(pQInfo)) { if (IS_QUERY_KILLED(pQInfo)) {
qDebug("QInfo:%p it is already killed, abort", pQInfo); qDebug("QInfo:%p it is already killed, abort", pQInfo);
setQueryResultReady(pQInfo); return doBuildResCheck(pQInfo);
return false;
} }
if (pQInfo->tableqinfoGroupInfo.numOfTables == 0) { if (pQInfo->tableqinfoGroupInfo.numOfTables == 0) {
setQueryStatus(pQInfo->runtimeEnv.pQuery, QUERY_COMPLETED);
setQueryResultReady(pQInfo);
qDebug("QInfo:%p no table exists for query, abort", pQInfo); qDebug("QInfo:%p no table exists for query, abort", pQInfo);
return false; setQueryStatus(pQInfo->runtimeEnv.pQuery, QUERY_COMPLETED);
return doBuildResCheck(pQInfo);
} }
// error occurs, record the error code and return to client // error occurs, record the error code and return to client
...@@ -6320,9 +6324,7 @@ bool qTableQuery(qinfo_t qinfo) { ...@@ -6320,9 +6324,7 @@ bool qTableQuery(qinfo_t qinfo) {
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
pQInfo->code = ret; pQInfo->code = ret;
qDebug("QInfo:%p query abort due to error/cancel occurs, code:%s", pQInfo, tstrerror(pQInfo->code)); qDebug("QInfo:%p query abort due to error/cancel occurs, code:%s", pQInfo, tstrerror(pQInfo->code));
return doBuildResCheck(pQInfo);
setQueryResultReady(pQInfo);
return false;
} }
qDebug("QInfo:%p query task is launched", pQInfo); qDebug("QInfo:%p query task is launched", pQInfo);
...@@ -6347,17 +6349,7 @@ bool qTableQuery(qinfo_t qinfo) { ...@@ -6347,17 +6349,7 @@ bool qTableQuery(qinfo_t qinfo) {
pQInfo, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows); pQInfo, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows);
} }
bool buildRes = false; return doBuildResCheck(pQInfo);
pthread_mutex_lock(&pQInfo->lock);
pQInfo->dataReady = QUERY_RESULT_READY;
if (pQInfo->rspContext != NULL) {
buildRes = true;
}
pthread_mutex_unlock(&pQInfo->lock);
return buildRes;
} }
int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContext) { int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContext) {
...@@ -6484,7 +6476,6 @@ int32_t qKillQuery(qinfo_t qinfo) { ...@@ -6484,7 +6476,6 @@ int32_t qKillQuery(qinfo_t qinfo) {
return TSDB_CODE_QRY_INVALID_QHANDLE; return TSDB_CODE_QRY_INVALID_QHANDLE;
} }
// sem_post(&pQInfo->dataReady);
setQueryKilled(pQInfo); setQueryKilled(pQInfo);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -20,8 +20,6 @@ ...@@ -20,8 +20,6 @@
extern "C" { extern "C" {
#endif #endif
#include "os.h"
#define TD_EQ 0x1 #define TD_EQ 0x1
#define TD_GT 0x2 #define TD_GT 0x2
#define TD_LT 0x4 #define TD_LT 0x4
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册