提交 aea35d6b 编写于 作者: H Haojun Liao

[td-225] fix bugs.

上级 9f3c8150
...@@ -256,6 +256,7 @@ typedef struct SResRec { ...@@ -256,6 +256,7 @@ typedef struct SResRec {
typedef struct { typedef struct {
int64_t numOfRows; // num of results in current retrieved int64_t numOfRows; // num of results in current retrieved
int64_t numOfRowsGroup; // num of results of current group
int64_t numOfTotal; // num of total results int64_t numOfTotal; // num of total results
int64_t numOfClauseTotal; // num of total result in current subclause int64_t numOfClauseTotal; // num of total result in current subclause
char * pRsp; char * pRsp;
......
...@@ -856,24 +856,6 @@ void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo ...@@ -856,24 +856,6 @@ void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo
tColModelAppend(pModel, pLocalReducer->discardData, pLocalReducer->prevRowOfInput, 0, 1, 1); tColModelAppend(pModel, pLocalReducer->discardData, pLocalReducer->prevRowOfInput, 0, 1, 1);
} }
// todo merge with following function
// static void reversedCopyResultToDstBuf(SQueryInfo* pQueryInfo, SSqlRes *pRes, tFilePage *pFinalDataPage) {
//
// for (int32_t i = 0; i < pQueryInfo->exprList.numOfExprs; ++i) {
// TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
//
// int32_t offset = tscFieldInfoGetOffset(pQueryInfo, i);
// char * src = pFinalDataPage->data + (pRes->numOfRows - 1) * pField->bytes + pRes->numOfRows * offset;
// char * dst = pRes->data + pRes->numOfRows * offset;
//
// for (int32_t j = 0; j < pRes->numOfRows; ++j) {
// memcpy(dst, src, (size_t)pField->bytes);
// dst += pField->bytes;
// src -= pField->bytes;
// }
// }
//}
static void reversedCopyFromInterpolationToDstBuf(SQueryInfo *pQueryInfo, SSqlRes *pRes, tFilePage **pResPages, static void reversedCopyFromInterpolationToDstBuf(SQueryInfo *pQueryInfo, SSqlRes *pRes, tFilePage **pResPages,
SLocalReducer *pLocalReducer) { SLocalReducer *pLocalReducer) {
assert(0); assert(0);
...@@ -907,20 +889,10 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO ...@@ -907,20 +889,10 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO
tFilePage * pFinalDataPage = pLocalReducer->pResultBuf; tFilePage * pFinalDataPage = pLocalReducer->pResultBuf;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
// if (pRes->pLocalReducer != pLocalReducer) {
// /*
// * Release the SSqlObj is called, and it is int destroying function invoked by other thread.
// * However, the other thread will WAIT until current process fully completes.
// * Since the flag of release struct is set by doLocalReduce function
// */
// assert(pRes->pLocalReducer == NULL);
// }
// no interval query, no fill operation // no interval query, no fill operation
if (pQueryInfo->intervalTime == 0 || pQueryInfo->fillType == TSDB_FILL_NONE) { if (pQueryInfo->intervalTime == 0 || pQueryInfo->fillType == TSDB_FILL_NONE) {
pRes->data = pLocalReducer->pFinalRes; pRes->data = pLocalReducer->pFinalRes;
pRes->numOfRows = pFinalDataPage->num; pRes->numOfRows = pFinalDataPage->num;
pRes->numOfClauseTotal += pRes->numOfRows;
if (pQueryInfo->limit.offset > 0) { if (pQueryInfo->limit.offset > 0) {
if (pQueryInfo->limit.offset < pRes->numOfRows) { if (pQueryInfo->limit.offset < pRes->numOfRows) {
...@@ -931,22 +903,22 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO ...@@ -931,22 +903,22 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO
tColModelCompact(pLocalReducer->resColModel, pFinalDataPage, prevSize); tColModelCompact(pLocalReducer->resColModel, pFinalDataPage, prevSize);
pRes->numOfRows -= pQueryInfo->limit.offset; pRes->numOfRows -= pQueryInfo->limit.offset;
pRes->numOfClauseTotal -= pQueryInfo->limit.offset;
pQueryInfo->limit.offset = 0; pQueryInfo->limit.offset = 0;
} else { } else {
pQueryInfo->limit.offset -= pRes->numOfRows; pQueryInfo->limit.offset -= pRes->numOfRows;
pRes->numOfRows = 0; pRes->numOfRows = 0;
pRes->numOfClauseTotal = 0;
} }
} }
if (pQueryInfo->limit.limit >= 0 && pRes->numOfClauseTotal > pQueryInfo->limit.limit) { pRes->numOfRowsGroup += pRes->numOfRows;
if (pQueryInfo->limit.limit >= 0 && pRes->numOfRowsGroup > pQueryInfo->limit.limit) {
/* impose the limitation of output rows on the final result */ /* impose the limitation of output rows on the final result */
int32_t prevSize = pFinalDataPage->num; int32_t prevSize = pFinalDataPage->num;
int32_t overflow = pRes->numOfClauseTotal - pQueryInfo->limit.limit; int32_t overflow = pRes->numOfRowsGroup - pQueryInfo->limit.limit;
assert(overflow < pRes->numOfRows); assert(overflow < pRes->numOfRows);
pRes->numOfClauseTotal = pQueryInfo->limit.limit; pRes->numOfRowsGroup = pQueryInfo->limit.limit;
pRes->numOfRows -= overflow; pRes->numOfRows -= overflow;
pFinalDataPage->num -= overflow; pFinalDataPage->num -= overflow;
...@@ -957,6 +929,8 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO ...@@ -957,6 +929,8 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO
} }
memcpy(pRes->data, pFinalDataPage->data, pRes->numOfRows * pLocalReducer->finalRowSize); memcpy(pRes->data, pFinalDataPage->data, pRes->numOfRows * pLocalReducer->finalRowSize);
pRes->numOfClauseTotal += pRes->numOfRows;
pFinalDataPage->num = 0; pFinalDataPage->num = 0;
return; return;
} }
...@@ -969,7 +943,7 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO ...@@ -969,7 +943,7 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i); TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
pResPages[i] = calloc(1, sizeof(tFilePage) + pField->bytes * pLocalReducer->resColModel->capacity); pResPages[i] = calloc(1, sizeof(tFilePage) + pField->bytes * pLocalReducer->resColModel->capacity);
} }
while (1) { while (1) {
int64_t newRows = taosGenerateDataBlock(pFillInfo, pResPages, pLocalReducer->resColModel->capacity); int64_t newRows = taosGenerateDataBlock(pFillInfo, pResPages, pLocalReducer->resColModel->capacity);
...@@ -986,7 +960,6 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO ...@@ -986,7 +960,6 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO
pRes->data = pLocalReducer->pFinalRes; pRes->data = pLocalReducer->pFinalRes;
pRes->numOfRows = newRows; pRes->numOfRows = newRows;
pRes->numOfClauseTotal += newRows;
pQueryInfo->limit.offset = 0; pQueryInfo->limit.offset = 0;
break; break;
...@@ -1010,15 +983,13 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO ...@@ -1010,15 +983,13 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO
} }
if (pRes->numOfRows > 0) { if (pRes->numOfRows > 0) {
if (pQueryInfo->limit.limit >= 0 && pRes->numOfClauseTotal > pQueryInfo->limit.limit) { if (pQueryInfo->limit.limit >= 0 && pRes->numOfRows > pQueryInfo->limit.limit) {
int32_t overflow = pRes->numOfClauseTotal - pQueryInfo->limit.limit; int32_t overflow = pRes->numOfRows - pQueryInfo->limit.limit;
pRes->numOfRows -= overflow; pRes->numOfRows -= overflow;
assert(pRes->numOfRows >= 0);
pRes->numOfClauseTotal = pQueryInfo->limit.limit;
pFinalDataPage->num -= overflow; pFinalDataPage->num -= overflow;
assert(pRes->numOfRows >= 0 && pFinalDataPage->num > 0);
/* set remain data to be discarded, and reset the interpolation information */ /* set remain data to be discarded, and reset the interpolation information */
savePrevRecordAndSetupInterpoInfo(pLocalReducer, pQueryInfo, pFillInfo); savePrevRecordAndSetupInterpoInfo(pLocalReducer, pQueryInfo, pFillInfo);
} }
...@@ -1032,6 +1003,9 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO ...@@ -1032,6 +1003,9 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO
} else { // todo bug?? } else { // todo bug??
reversedCopyFromInterpolationToDstBuf(pQueryInfo, pRes, pResPages, pLocalReducer); reversedCopyFromInterpolationToDstBuf(pQueryInfo, pRes, pResPages, pLocalReducer);
} }
pRes->numOfRowsGroup += pRes->numOfRows;
pRes->numOfClauseTotal += pRes->numOfRows;
} }
pFinalDataPage->num = 0; pFinalDataPage->num = 0;
...@@ -1227,7 +1201,10 @@ static bool saveGroupResultInfo(SSqlObj *pSql) { ...@@ -1227,7 +1201,10 @@ static bool saveGroupResultInfo(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
pRes->numOfGroups += 1;
if (pRes->numOfRowsGroup > 0) {
pRes->numOfGroups += 1;
}
// the output group is limited by the slimit clause // the output group is limited by the slimit clause
if (reachGroupResultLimit(pQueryInfo, pRes)) { if (reachGroupResultLimit(pQueryInfo, pRes)) {
...@@ -1266,7 +1243,12 @@ bool doGenerateFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool no ...@@ -1266,7 +1243,12 @@ bool doGenerateFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool no
pRes->numOfRows = 0; pRes->numOfRows = 0;
pQueryInfo->slimit.offset -= 1; pQueryInfo->slimit.offset -= 1;
pLocalReducer->discard = !noMoreCurrentGroupRes; pLocalReducer->discard = !noMoreCurrentGroupRes;
if (pLocalReducer->discard) {
SColumnModel *pInternModel = pLocalReducer->pDesc->pColumnModel;
tColModelAppend(pInternModel, pLocalReducer->discardData, pLocalReducer->pTempBuffer->data, 0, 1, 1);
}
return false; return false;
} }
...@@ -1299,7 +1281,7 @@ void resetOutputBuf(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer) { // ...@@ -1299,7 +1281,7 @@ void resetOutputBuf(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer) { //
static void resetEnvForNewResultset(SSqlRes *pRes, SSqlCmd *pCmd, SLocalReducer *pLocalReducer) { static void resetEnvForNewResultset(SSqlRes *pRes, SSqlCmd *pCmd, SLocalReducer *pLocalReducer) {
// In handling data in other groups, we need to reset the interpolation information for a new group data // In handling data in other groups, we need to reset the interpolation information for a new group data
pRes->numOfRows = 0; pRes->numOfRows = 0;
pRes->numOfClauseTotal = 0; pRes->numOfRowsGroup = 0;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
...@@ -1363,7 +1345,8 @@ static bool doHandleLastRemainData(SSqlObj *pSql) { ...@@ -1363,7 +1345,8 @@ static bool doHandleLastRemainData(SSqlObj *pSql) {
if ((isAllSourcesCompleted(pLocalReducer) && !pLocalReducer->hasPrevRow) || pLocalReducer->pLocalDataSrc[0] == NULL || if ((isAllSourcesCompleted(pLocalReducer) && !pLocalReducer->hasPrevRow) || pLocalReducer->pLocalDataSrc[0] == NULL ||
prevGroupCompleted) { prevGroupCompleted) {
// if fillType == TSDB_FILL_NONE, return directly // if fillType == TSDB_FILL_NONE, return directly
if (pQueryInfo->fillType != TSDB_FILL_NONE) { if (pQueryInfo->fillType != TSDB_FILL_NONE &&
((pRes->numOfRowsGroup < pQueryInfo->limit.limit && pQueryInfo->limit.limit > 0) || (pQueryInfo->limit.limit < 0))) {
int64_t etime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.ekey : pQueryInfo->window.skey; int64_t etime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.ekey : pQueryInfo->window.skey;
assert(pFillInfo->numOfRows == 0); assert(pFillInfo->numOfRows == 0);
......
...@@ -5954,13 +5954,13 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { ...@@ -5954,13 +5954,13 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
return doLocalQueryProcess(pCmd, pQueryInfo, pQuerySql); return doLocalQueryProcess(pCmd, pQueryInfo, pQuerySql);
} }
if (pQuerySql->from->nExpr > TSDB_MAX_JOIN_TABLE_NUM) { if (pQuerySql->from->nExpr > TSDB_MAX_JOIN_TABLE_NUM * 2) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg7); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg7);
} }
pQueryInfo->command = TSDB_SQL_SELECT; pQueryInfo->command = TSDB_SQL_SELECT;
if (pQuerySql->from->nExpr > 2) { if (pQuerySql->from->nExpr > 4) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg10); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg10);
} }
...@@ -5983,7 +5983,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { ...@@ -5983,7 +5983,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
tscAddEmptyMetaInfo(pQueryInfo); tscAddEmptyMetaInfo(pQueryInfo);
} }
STableMetaInfo* pTableMetaInfo1 = tscGetMetaInfo(pQueryInfo, i); STableMetaInfo* pTableMetaInfo1 = tscGetMetaInfo(pQueryInfo, i/2);
SSQLToken t = {.type = TSDB_DATA_TYPE_BINARY, .n = pTableItem->nLen, .z = pTableItem->pz}; SSQLToken t = {.type = TSDB_DATA_TYPE_BINARY, .n = pTableItem->nLen, .z = pTableItem->pz};
if (tscSetTableFullName(pTableMetaInfo1, &t, pSql) != TSDB_CODE_SUCCESS) { if (tscSetTableFullName(pTableMetaInfo1, &t, pSql) != TSDB_CODE_SUCCESS) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册