提交 b3efb97e 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/feature/query' into feature/platform

......@@ -60,15 +60,14 @@ typedef struct SCMCorVgroupInfo {
} SCMCorVgroupInfo;
typedef struct STableMeta {
STableComInfo tableInfo;
uint8_t tableType;
int16_t sversion;
int16_t tversion;
STableComInfo tableInfo;
uint8_t tableType;
int16_t sversion;
int16_t tversion;
SCMVgroupInfo vgroupInfo;
SCMCorVgroupInfo corVgroupInfo;
int32_t sid; // the index of one table in a virtual node
uint64_t uid; // unique id of a table
SSchema schema[]; // if the table is TSDB_CHILD_TABLE, schema is acquired by super table meta info
STableId id;
SSchema schema[]; // if the table is TSDB_CHILD_TABLE, schema is acquired by super table meta info
} STableMeta;
typedef struct STableMetaInfo {
......
......@@ -629,8 +629,8 @@ int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int3
}
static void tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows) {
pBlocks->tid = pTableMeta->sid;
pBlocks->uid = pTableMeta->uid;
pBlocks->tid = pTableMeta->id.tid;
pBlocks->uid = pTableMeta->id.uid;
pBlocks->sversion = pTableMeta->sversion;
pBlocks->numOfRows += numOfRows;
}
......@@ -686,7 +686,7 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableList, char **st
STableComInfo tinfo = tscGetTableInfo(pTableMeta);
STableDataBlocks *dataBuf = NULL;
int32_t ret = tscGetDataBlockFromList(pTableList, pCmd->pDataBlocks, pTableMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE,
int32_t ret = tscGetDataBlockFromList(pTableList, pCmd->pDataBlocks, pTableMeta->id.uid, TSDB_DEFAULT_PAYLOAD_SIZE,
sizeof(SSubmitBlk), tinfo.rowSize, pTableMetaInfo->name,
pTableMeta, &dataBuf);
if (ret != TSDB_CODE_SUCCESS) {
......
......@@ -635,7 +635,7 @@ int32_t parseIntervalClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQ
int32_t tableIndex = COLUMN_INDEX_INITIAL_VAL;
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
if (pTableMetaInfo->pTableMeta->uid == uid) {
if (pTableMetaInfo->pTableMeta->id.uid == uid) {
tableIndex = i;
break;
}
......@@ -3053,7 +3053,7 @@ static int32_t getJoinCondInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSQLExpr*
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
SSchema* pTagSchema1 = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex);
pLeft->uid = pTableMetaInfo->pTableMeta->uid;
pLeft->uid = pTableMetaInfo->pTableMeta->id.uid;
pLeft->tagColId = pTagSchema1->colId;
strcpy(pLeft->tableId, pTableMetaInfo->name);
......@@ -3065,7 +3065,7 @@ static int32_t getJoinCondInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSQLExpr*
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
SSchema* pTagSchema2 = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex);
pRight->uid = pTableMetaInfo->pTableMeta->uid;
pRight->uid = pTableMetaInfo->pTableMeta->id.uid;
pRight->tagColId = pTagSchema2->colId;
strcpy(pRight->tableId, pTableMetaInfo->name);
......@@ -3603,7 +3603,7 @@ static int32_t setTableCondForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo,
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableCondIndex);
STagCond* pTagCond = &pQueryInfo->tagCond;
pTagCond->tbnameCond.uid = pTableMetaInfo->pTableMeta->uid;
pTagCond->tbnameCond.uid = pTableMetaInfo->pTableMeta->id.uid;
assert(pExpr->nSQLOptr == TK_LIKE || pExpr->nSQLOptr == TK_IN);
......@@ -3840,7 +3840,7 @@ static int32_t getTagQueryCondExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SCondE
// add to source column list
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
int64_t uid = pTableMetaInfo->pTableMeta->uid;
int64_t uid = pTableMetaInfo->pTableMeta->id.uid;
int32_t numOfCols = tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
size_t num = taosArrayGetSize(colList);
......@@ -4506,8 +4506,8 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) pCmd->payload;
pUpdateMsg->head.vgId = htonl(pTableMeta->vgroupInfo.vgId);
pUpdateMsg->tid = htonl(pTableMeta->sid);
pUpdateMsg->uid = htobe64(pTableMeta->uid);
pUpdateMsg->tid = htonl(pTableMeta->id.tid);
pUpdateMsg->uid = htobe64(pTableMeta->id.uid);
pUpdateMsg->colId = htons(pTagsSchema->colId);
pUpdateMsg->type = pTagsSchema->type;
pUpdateMsg->bytes = htons(pTagsSchema->bytes);
......@@ -5045,7 +5045,7 @@ void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClau
if (pExpr->functionId != TSDB_FUNC_TAG) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
int16_t columnInfo = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->uid);
int16_t columnInfo = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid);
SColumnIndex index = {.tableIndex = 0, .columnIndex = columnInfo};
SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);
......
......@@ -162,8 +162,8 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size
.numOfColumns = pTableMetaMsg->numOfColumns,
};
pTableMeta->sid = pTableMetaMsg->sid;
pTableMeta->uid = pTableMetaMsg->uid;
pTableMeta->id.tid = pTableMetaMsg->sid;
pTableMeta->id.uid = pTableMetaMsg->uid;
pTableMeta->vgroupInfo = pTableMetaMsg->vgroup;
tscInitCorVgroupInfo(&pTableMeta->corVgroupInfo, &pTableMeta->vgroupInfo);
......
......@@ -605,9 +605,9 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
}
STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
pTableIdInfo->tid = htonl(pTableMeta->sid);
pTableIdInfo->uid = htobe64(pTableMeta->uid);
pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid, dfltKey));
pTableIdInfo->tid = htonl(pTableMeta->id.tid);
pTableIdInfo->uid = htobe64(pTableMeta->id.uid);
pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->id.uid, dfltKey));
pQueryMsg->numOfTables = htonl(1); // set the number of tables
pMsg += sizeof(STableIdInfo);
......@@ -640,7 +640,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
}
tscDebug("%p vgId:%d, query on table:%s, tid:%d, uid:%" PRIu64, pSql, htonl(pQueryMsg->head.vgId), pTableMetaInfo->name,
pTableMeta->sid, pTableMeta->uid);
pTableMeta->id.tid, pTableMeta->id.uid);
return pMsg;
}
......@@ -714,8 +714,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || pColSchema->type < TSDB_DATA_TYPE_BOOL ||
pColSchema->type > TSDB_DATA_TYPE_NCHAR) {
tscError("%p sid:%d uid:%" PRIu64" id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s",
pSql, pTableMeta->sid, pTableMeta->uid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex.columnIndex,
tscError("%p tid:%d uid:%" PRIu64" id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s",
pSql, pTableMeta->id.tid, pTableMeta->id.uid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex.columnIndex,
pColSchema->name);
return TSDB_CODE_TSC_INVALID_SQL;
......@@ -833,8 +833,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
if ((pCol->colIndex.columnIndex >= numOfTagColumns || pCol->colIndex.columnIndex < -1) ||
(pColSchema->type < TSDB_DATA_TYPE_BOOL || pColSchema->type > TSDB_DATA_TYPE_NCHAR)) {
tscError("%p sid:%d uid:%" PRIu64 " id:%s, tag index out of range, totalCols:%d, numOfTags:%d, index:%d, column name:%s",
pSql, pTableMeta->sid, pTableMeta->uid, pTableMetaInfo->name, total, numOfTagColumns,
tscError("%p tid:%d uid:%" PRIu64 " id:%s, tag index out of range, totalCols:%d, numOfTags:%d, index:%d, column name:%s",
pSql, pTableMeta->id.tid, pTableMeta->id.uid, pTableMetaInfo->name, total, numOfTagColumns,
pCol->colIndex.columnIndex, pColSchema->name);
return TSDB_CODE_TSC_INVALID_SQL;
......@@ -855,7 +855,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
if (pQueryInfo->tagCond.pCond != NULL && taosArrayGetSize(pQueryInfo->tagCond.pCond) > 0) {
STagCond* pTagCond = &pQueryInfo->tagCond;
SCond *pCond = tsGetSTableQueryCond(pTagCond, pTableMeta->uid);
SCond *pCond = tsGetSTableQueryCond(pTagCond, pTableMeta->id.uid);
if (pCond != NULL && pCond->cond != NULL) {
pQueryMsg->tagCondLen = htons(pCond->len);
memcpy(pMsg, pCond->cond, pCond->len);
......@@ -1739,7 +1739,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
tscDebug("%p recv table meta, uid:%"PRId64 ", tid:%d, name:%s", pSql, pTableMeta->uid, pTableMeta->sid, pTableMetaInfo->name);
tscDebug("%p recv table meta, uid:%"PRId64 ", tid:%d, name:%s", pSql, pTableMeta->id.uid, pTableMeta->id.tid, pTableMetaInfo->name);
free(pTableMeta);
return TSDB_CODE_SUCCESS;
......@@ -2215,7 +2215,7 @@ int tscRenewTableMeta(SSqlObj *pSql, char *tableId) {
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
if (pTableMetaInfo->pTableMeta) {
tscDebug("%p update table meta, old meta numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->uid, pTableMeta);
tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->id.uid, pTableMeta);
}
taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
......
......@@ -241,7 +241,7 @@ static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) {
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
SSubscriptionProgress target = {.uid = pTableMeta->uid, .key = 0};
SSubscriptionProgress target = {.uid = pTableMeta->id.uid, .key = 0};
SSubscriptionProgress* p = taosArraySearch(pSub->progress, &target, tscCompareSubscriptionProgress);
if (p == NULL) {
taosArrayClear(pSub->progress);
......
......@@ -180,7 +180,7 @@ SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, in
pSupporter->limit = pQueryInfo->limit;
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, index);
pSupporter->uid = pTableMetaInfo->pTableMeta->uid;
pSupporter->uid = pTableMetaInfo->pTableMeta->id.uid;
assert (pSupporter->uid != 0);
getTmpfilePath("join-", pSupporter->path);
......@@ -355,7 +355,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
// set the join condition tag column info, to do extract method
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
assert(pQueryInfo->tagCond.joinInfo.hasJoin);
int16_t colId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->uid);
int16_t colId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid);
pExpr->param[0].i64Key = colId;
pExpr->numOfParams = 1;
......@@ -499,7 +499,7 @@ static void issueTSCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj*
// set the tags value for ts_comp function
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, 0);
int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->uid);
int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->id.uid);
pExpr->param->i64Key = tagColId;
pExpr->numOfParams = 1;
}
......@@ -560,7 +560,7 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar
qsort(p2->pIdTagList, p2->num, p2->tagSize, tscCompareTidTags);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
int16_t tagColId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->uid);
int16_t tagColId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid);
SSchema* pColSchema = tscGetTableColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId);
......@@ -1034,7 +1034,7 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) {
int32_t tableIndexOfSub = -1;
for (int32_t j = 0; j < pQueryInfo->numOfTables; ++j) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, j);
if (pTableMetaInfo->pTableMeta->uid == pExpr->uid) {
if (pTableMetaInfo->pTableMeta->id.uid == pExpr->uid) {
tableIndexOfSub = j;
break;
}
......@@ -1205,7 +1205,7 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
STagCond* pTagCond = &pSupporter->tagCond;
assert(pTagCond->joinInfo.hasJoin);
int32_t tagColId = tscGetJoinTagColIdByUid(pTagCond, pTableMetaInfo->pTableMeta->uid);
int32_t tagColId = tscGetJoinTagColIdByUid(pTagCond, pTableMetaInfo->pTableMeta->id.uid);
SSchema* s = tscGetTableColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId);
int16_t bytes = 0;
......@@ -1237,7 +1237,7 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
SSqlExpr *pExpr = tscSqlExprGet(pNewQueryInfo, 0);
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->uid);
int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->id.uid);
pExpr->param->i64Key = tagColId;
pExpr->numOfParams = 1;
}
......
......@@ -955,7 +955,7 @@ static SSqlExpr* doBuildSqlExpr(SQueryInfo* pQueryInfo, int16_t functionId, SCol
pExpr->interBytes = interSize;
if (pTableMetaInfo->pTableMeta) {
pExpr->uid = pTableMetaInfo->pTableMeta->uid;
pExpr->uid = pTableMetaInfo->pTableMeta->id.uid;
}
return pExpr;
......@@ -1482,7 +1482,7 @@ STableMetaInfo* tscGetTableMetaInfoByUid(SQueryInfo* pQueryInfo, uint64_t uid, i
int32_t k = -1;
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
if (pQueryInfo->pTableMetaInfo[i]->pTableMeta->uid == uid) {
if (pQueryInfo->pTableMetaInfo[i]->pTableMeta->id.uid == uid) {
k = i;
break;
}
......@@ -1760,7 +1760,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
TSDB_QUERY_SET_TYPE(pNewQueryInfo->type, TSDB_QUERY_TYPE_SUBQUERY);// it must be the subquery
}
uint64_t uid = pTableMetaInfo->pTableMeta->uid;
uint64_t uid = pTableMetaInfo->pTableMeta->id.uid;
tscSqlExprCopy(pNewQueryInfo->exprList, pQueryInfo->exprList, uid, true);
int32_t numOfOutput = tscSqlExprNumOfExprs(pNewQueryInfo);
......
......@@ -6287,10 +6287,17 @@ void qDestroyQueryInfo(qinfo_t qHandle) {
freeQInfo(pQInfo);
}
static void setQueryResultReady(SQInfo* pQInfo) {
static bool doBuildResCheck(SQInfo* pQInfo) {
bool buildRes = false;
pthread_mutex_lock(&pQInfo->lock);
pQInfo->dataReady = QUERY_RESULT_READY;
buildRes = (pQInfo->rspContext != NULL);
pthread_mutex_unlock(&pQInfo->lock);
return buildRes;
}
bool qTableQuery(qinfo_t qinfo) {
......@@ -6303,16 +6310,13 @@ bool qTableQuery(qinfo_t qinfo) {
if (IS_QUERY_KILLED(pQInfo)) {
qDebug("QInfo:%p it is already killed, abort", pQInfo);
setQueryResultReady(pQInfo);
return false;
return doBuildResCheck(pQInfo);
}
if (pQInfo->tableqinfoGroupInfo.numOfTables == 0) {
setQueryStatus(pQInfo->runtimeEnv.pQuery, QUERY_COMPLETED);
setQueryResultReady(pQInfo);
qDebug("QInfo:%p no table exists for query, abort", pQInfo);
return false;
setQueryStatus(pQInfo->runtimeEnv.pQuery, QUERY_COMPLETED);
return doBuildResCheck(pQInfo);
}
// error occurs, record the error code and return to client
......@@ -6320,9 +6324,7 @@ bool qTableQuery(qinfo_t qinfo) {
if (ret != TSDB_CODE_SUCCESS) {
pQInfo->code = ret;
qDebug("QInfo:%p query abort due to error/cancel occurs, code:%s", pQInfo, tstrerror(pQInfo->code));
setQueryResultReady(pQInfo);
return false;
return doBuildResCheck(pQInfo);
}
qDebug("QInfo:%p query task is launched", pQInfo);
......@@ -6347,17 +6349,7 @@ bool qTableQuery(qinfo_t qinfo) {
pQInfo, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows);
}
bool buildRes = false;
pthread_mutex_lock(&pQInfo->lock);
pQInfo->dataReady = QUERY_RESULT_READY;
if (pQInfo->rspContext != NULL) {
buildRes = true;
}
pthread_mutex_unlock(&pQInfo->lock);
return buildRes;
return doBuildResCheck(pQInfo);
}
int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContext) {
......@@ -6484,7 +6476,6 @@ int32_t qKillQuery(qinfo_t qinfo) {
return TSDB_CODE_QRY_INVALID_QHANDLE;
}
// sem_post(&pQInfo->dataReady);
setQueryKilled(pQInfo);
return TSDB_CODE_SUCCESS;
}
......
......@@ -20,8 +20,6 @@
extern "C" {
#endif
#include "os.h"
#define TD_EQ 0x1
#define TD_GT 0x2
#define TD_LT 0x4
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册