提交 9fa4fa30 编写于 作者: H hjxilinx

[td-168] support the last_row query for super table,

上级 d226214f
...@@ -120,7 +120,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd ...@@ -120,7 +120,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
void tscDestroyLocalReducer(SSqlObj *pSql); void tscDestroyLocalReducer(SSqlObj *pSql);
int32_t tscDoLocalreduce(SSqlObj *pSql); int32_t tscDoLocalMerge(SSqlObj *pSql);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -2981,11 +2981,28 @@ static void tag_project_function_f(SQLFunctionCtx *pCtx, int32_t index) { ...@@ -2981,11 +2981,28 @@ static void tag_project_function_f(SQLFunctionCtx *pCtx, int32_t index) {
*/ */
static void tag_function(SQLFunctionCtx *pCtx) { static void tag_function(SQLFunctionCtx *pCtx) {
SET_VAL(pCtx, 1, 1); SET_VAL(pCtx, 1, 1);
tVariantDump(&pCtx->tag, pCtx->aOutputBuf, pCtx->tag.nType);
char* output = pCtx->aOutputBuf;
// todo refactor to dump length presented string(var string)
if (pCtx->tag.nType == TSDB_DATA_TYPE_BINARY || pCtx->tag.nType == TSDB_DATA_TYPE_NCHAR) {
*(int16_t*) output = pCtx->tag.nLen;
output += VARSTR_HEADER_SIZE;
}
tVariantDump(&pCtx->tag, output, pCtx->tag.nType);
} }
static void tag_function_f(SQLFunctionCtx *pCtx, int32_t index) { static void tag_function_f(SQLFunctionCtx *pCtx, int32_t index) {
SET_VAL(pCtx, 1, 1); SET_VAL(pCtx, 1, 1);
char* output = pCtx->aOutputBuf;
// todo refactor to dump length presented string(var string)
if (pCtx->tag.nType == TSDB_DATA_TYPE_BINARY || pCtx->tag.nType == TSDB_DATA_TYPE_NCHAR) {
*(int16_t*) output = pCtx->tag.nLen;
output += VARSTR_HEADER_SIZE;
}
tVariantDump(&pCtx->tag, pCtx->aOutputBuf, pCtx->tag.nType); tVariantDump(&pCtx->tag, pCtx->aOutputBuf, pCtx->tag.nType);
} }
......
...@@ -1422,7 +1422,7 @@ static void doProcessResultInNextWindow(SSqlObj *pSql, int32_t numOfRes) { ...@@ -1422,7 +1422,7 @@ static void doProcessResultInNextWindow(SSqlObj *pSql, int32_t numOfRes) {
doExecuteSecondaryMerge(pCmd, pLocalReducer, true); doExecuteSecondaryMerge(pCmd, pLocalReducer, true);
} }
int32_t tscDoLocalreduce(SSqlObj *pSql) { int32_t tscDoLocalMerge(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
......
...@@ -1470,7 +1470,7 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) { ...@@ -1470,7 +1470,7 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
pRes->code = tscDoLocalreduce(pSql); pRes->code = tscDoLocalMerge(pSql);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) { if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) {
......
...@@ -198,7 +198,7 @@ void assignVal(char *val, const char *src, int32_t len, int32_t type) { ...@@ -198,7 +198,7 @@ void assignVal(char *val, const char *src, int32_t len, int32_t type) {
break; break;
}; };
case TSDB_DATA_TYPE_BINARY: { case TSDB_DATA_TYPE_BINARY: {
strncpy(val, src, len); varDataCopy(val, src);
break; break;
}; };
case TSDB_DATA_TYPE_NCHAR: { case TSDB_DATA_TYPE_NCHAR: {
......
...@@ -35,9 +35,11 @@ extern "C" { ...@@ -35,9 +35,11 @@ extern "C" {
// ----------------- For variable data types such as TSDB_DATA_TYPE_BINARY and TSDB_DATA_TYPE_NCHAR // ----------------- For variable data types such as TSDB_DATA_TYPE_BINARY and TSDB_DATA_TYPE_NCHAR
typedef int32_t VarDataOffsetT; typedef int32_t VarDataOffsetT;
typedef int16_t VarDataLenT; typedef int16_t VarDataLenT;
#define varDataLen(v) ((VarDataLenT *)(v))[0]
#define varDataTLen(v) (sizeof(VarDataLenT) + varDataLen(v)) #define varDataLen(v) ((VarDataLenT *)(v))[0]
#define varDataVal(v) ((void *)((char *)v + sizeof(VarDataLenT))) #define varDataTLen(v) (sizeof(VarDataLenT) + varDataLen(v))
#define varDataVal(v) ((void *)((char *)v + sizeof(VarDataLenT)))
#define varDataCopy(dst, v) memcpy((dst), (void*) (v), varDataTLen(v))
// this data type is internally used only in 'in' query to hold the values // this data type is internally used only in 'in' query to hold the values
#define TSDB_DATA_TYPE_ARRAY (TSDB_DATA_TYPE_NCHAR + 1) #define TSDB_DATA_TYPE_ARRAY (TSDB_DATA_TYPE_NCHAR + 1)
......
...@@ -409,13 +409,21 @@ static FORCE_INLINE int32_t columnValueAscendingComparator(char *f1, char *f2, i ...@@ -409,13 +409,21 @@ static FORCE_INLINE int32_t columnValueAscendingComparator(char *f1, char *f2, i
return (first < second) ? -1 : 1; return (first < second) ? -1 : 1;
}; };
case TSDB_DATA_TYPE_BINARY: { case TSDB_DATA_TYPE_BINARY: {
int32_t ret = strncmp(f1, f2, bytes); int32_t len1 = varDataLen(f1);
if (ret == 0) { int32_t len2 = varDataLen(f2);
return 0;
if (len1 != len2) {
return len1 > len2? 1:-1;
} else {
int32_t ret = strncmp(varDataVal(f1), varDataVal(f2), len1);
if (ret == 0) {
return 0;
}
return (ret < 0) ? -1 : 1;
} }
return (ret < 0) ? -1 : 1;
}; };
case TSDB_DATA_TYPE_NCHAR: { case TSDB_DATA_TYPE_NCHAR: { // todo handle the var string compare
int32_t ret = tasoUcs4Compare(f1, f2, bytes); int32_t ret = tasoUcs4Compare(f1, f2, bytes);
if (ret == 0) { if (ret == 0) {
return 0; return 0;
......
...@@ -2555,7 +2555,7 @@ static void doSetTagValueInParam(void *tsdb, STableId* pTableId, int32_t tagColI ...@@ -2555,7 +2555,7 @@ static void doSetTagValueInParam(void *tsdb, STableId* pTableId, int32_t tagColI
if (tagColId == TSDB_TBNAME_COLUMN_INDEX) { if (tagColId == TSDB_TBNAME_COLUMN_INDEX) {
tsdbGetTableName(tsdb, pTableId, &val); tsdbGetTableName(tsdb, pTableId, &val);
bytes = TSDB_TABLE_NAME_LEN; bytes = strnlen(val, TSDB_TABLE_NAME_LEN);
type = TSDB_DATA_TYPE_BINARY; type = TSDB_DATA_TYPE_BINARY;
} else { } else {
tsdbGetTableTagVal(tsdb, pTableId, tagColId, &type, &bytes, &val); tsdbGetTableTagVal(tsdb, pTableId, tagColId, &type, &bytes, &val);
...@@ -4232,7 +4232,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool ...@@ -4232,7 +4232,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool
// normal query setup the queryhandle here // normal query setup the queryhandle here
if (isFirstLastRowQuery(pQuery)) { // in case of last_row query, invoke a different API. if (isFirstLastRowQuery(pQuery) && !isSTableQuery) { // in case of last_row query, invoke a different API.
pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(tsdb, &cond, &pQInfo->tableIdGroupInfo); pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(tsdb, &cond, &pQInfo->tableIdGroupInfo);
} else if (!isSTableQuery || isIntervalQuery(pQuery) || isFixedOutputQuery(pQuery)) { } else if (!isSTableQuery || isIntervalQuery(pQuery) || isFixedOutputQuery(pQuery)) {
pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->tableIdGroupInfo); pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->tableIdGroupInfo);
...@@ -4478,22 +4478,16 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) { ...@@ -4478,22 +4478,16 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
return true; return true;
} }
static UNUSED_FUNC int64_t doCheckMetersInGroup(SQInfo *pQInfo, int32_t index, int32_t start) { static UNUSED_FUNC int64_t doCheckTables(SQInfo *pQInfo, SArray* pTableList) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery; SQuery * pQuery = pRuntimeEnv->pQuery;
if (!multiTableMultioutputHelper(pQInfo, index)) { if (!multiTableMultioutputHelper(pQInfo, 0)) {
return 0; return 0;
} }
SPointInterpoSupporter pointInterpSupporter = {0}; SPointInterpoSupporter pointInterpSupporter = {0};
pointInterpSupporterInit(pQuery, &pointInterpSupporter); pointInterpSupporterInit(pQuery, &pointInterpSupporter);
assert(0);
// if (!normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &pointInterpSupporter, NULL)) {
// pointInterpSupporterDestroy(&pointInterpSupporter);
// return 0;
// }
/* /*
* here we set the value for before and after the specified time into the * here we set the value for before and after the specified time into the
...@@ -4537,62 +4531,51 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -4537,62 +4531,51 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
resetCtxOutputBuf(pRuntimeEnv); resetCtxOutputBuf(pRuntimeEnv);
assert(pQuery->limit.offset == 0 && pQuery->limit.limit != 0); assert(pQuery->limit.offset == 0 && pQuery->limit.limit != 0);
#if 0
while (pQInfo->groupIndex < numOfGroups) { while (pQInfo->groupIndex < numOfGroups) {
SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, pQInfo->groupIndex); SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, pQInfo->groupIndex);
size_t numOfTable = taosArrayGetSize(group);
if (isFirstLastRowQuery(pQuery)) { if (isFirstLastRowQuery(pQuery)) {
qTrace("QInfo:%p last_row query on vid:%d, numOfGroups:%d, current group:%d", pQInfo, vid, pTableIdList->numOfSubSet, qTrace("QInfo:%p last_row query on group:%d, total group:%d, current group:%d", pQInfo, pQInfo->groupIndex,
pQInfo->groupIndex); numOfGroups);
TSKEY key = -1; STsdbQueryCond cond = {
int32_t index = -1; .twindow = pQuery->window,
.colList = pQuery->colList,
// choose the last key for one group .order = pQuery->order.order,
pQInfo->tableIndex = 0; .numOfCols = pQuery->numOfCols,
};
SArray *g1 = taosArrayInit(1, POINTER_BYTES);
SArray *tx = taosArrayClone(group);
taosArrayPush(g1, &tx);
for (int32_t k = 0; k < numOfTable; ++k, pQInfo->tableIndex++) { STableGroupInfo gp = {.numOfTables = taosArrayGetSize(tx), .pGroupList = g1};
if (isQueryKilled(pQInfo)) {
return; // include only current table
} if (pRuntimeEnv->pQueryHandle != NULL) {
tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle);
pRuntimeEnv->pQueryHandle = NULL;
} }
pQuery->window.skey = key; pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(pQInfo->tsdb, &cond, &gp);
pQuery->window.ekey = key;
// int64_t num = doCheckMetersInGroup(pQInfo, index, start); initCtxOutputBuf(pRuntimeEnv);
// assert(num >= 0); setTagVal(pRuntimeEnv, (STableId*) taosArrayGet(tx, 0), pQInfo->tsdb);
} else { scanAllDataBlocks(pRuntimeEnv);
qTrace("QInfo:%p interp query on vid:%d, numOfGroups:%d, current group:%d", pQInfo, vid, pTableIdList->numOfSubSet,
pQInfo->groupIndex);
for (int32_t k = start; k <= end; ++k) { int64_t numOfRes = getNumOfResult(pRuntimeEnv);
if (isQueryKilled(pQInfo)) { if (numOfRes > 0) {
setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); pQuery->rec.rows += numOfRes;
return; forwardCtxOutputBuf(pRuntimeEnv, numOfRes);
}
pQuery->skey = pSupporter->rawSKey;
pQuery->ekey = pSupporter->rawEKey;
int64_t num = doCheckMetersInGroup(pQInfo, k, start);
if (num == 1) {
break;
}
} }
}
skipResults(pRuntimeEnv);
pSupporter->groupIndex++; pQInfo->groupIndex += 1;
// output buffer is full, return to client // enable execution for next table, when handling the projection query
if (pQuery->size >= pQuery->pointsToRead) { enableExecutionForNextTable(pRuntimeEnv);
break;
} }
} }
#endif
} else { } else {
createTableQueryInfo(pQInfo); createTableQueryInfo(pQInfo);
......
...@@ -122,7 +122,7 @@ typedef struct STsdbQueryHandle { ...@@ -122,7 +122,7 @@ typedef struct STsdbQueryHandle {
SRWHelper rhelper; SRWHelper rhelper;
} STsdbQueryHandle; } STsdbQueryHandle;
static void changeQueryHandleForQuery(TsdbQueryHandleT pqHandle); static void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle);
static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) { static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) {
pBlockLoadInfo->slot = -1; pBlockLoadInfo->slot = -1;
...@@ -207,7 +207,7 @@ TsdbQueryHandleT tsdbQueryLastRow(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STable ...@@ -207,7 +207,7 @@ TsdbQueryHandleT tsdbQueryLastRow(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STable
pQueryHandle->type = TSDB_QUERY_TYPE_LAST_ROW; pQueryHandle->type = TSDB_QUERY_TYPE_LAST_ROW;
pQueryHandle->order = TSDB_ORDER_DESC; pQueryHandle->order = TSDB_ORDER_DESC;
changeQueryHandleForQuery(pQueryHandle); changeQueryHandleForLastrowQuery(pQueryHandle);
return pQueryHandle; return pQueryHandle;
} }
...@@ -957,7 +957,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pqHandle) { ...@@ -957,7 +957,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pqHandle) {
} }
} }
void changeQueryHandleForQuery(TsdbQueryHandleT pqHandle) { void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle) {
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pqHandle; STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pqHandle;
assert(!ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)); assert(!ASCENDING_ORDER_TRAVERSE(pQueryHandle->order));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册