提交 9031c50c 编写于 作者: H Haojun Liao

[td-1103] fix bugs

上级 2238b29f
...@@ -175,7 +175,17 @@ void tVariantAssign(tVariant *pDst, const tVariant *pSrc) { ...@@ -175,7 +175,17 @@ void tVariantAssign(tVariant *pDst, const tVariant *pSrc) {
} }
int32_t tVariantCompare(const tVariant* p1, const tVariant* p2) { int32_t tVariantCompare(const tVariant* p1, const tVariant* p2) {
assert((p1->nType != TSDB_DATA_TYPE_NULL) || (p2->nType != TSDB_DATA_TYPE_NULL)); if (p1->nType == TSDB_DATA_TYPE_NULL && p2->nType == TSDB_DATA_TYPE_NULL) {
return 0;
}
if (p1->nType == TSDB_DATA_TYPE_NULL) {
return -1;
}
if (p2->nType == TSDB_DATA_TYPE_NULL) {
return 1;
}
switch (p1->nType) { switch (p1->nType) {
case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_BINARY:
......
...@@ -118,7 +118,7 @@ typedef struct SQueryCostInfo { ...@@ -118,7 +118,7 @@ typedef struct SQueryCostInfo {
uint32_t loadBlockStatis; uint32_t loadBlockStatis;
uint32_t discardBlocks; uint32_t discardBlocks;
uint64_t elapsedTime; uint64_t elapsedTime;
uint64_t computTime; uint64_t firstStageMergeTime;
uint64_t internalSupSize; uint64_t internalSupSize;
uint64_t numOfTimeWindows; uint64_t numOfTimeWindows;
} SQueryCostInfo; } SQueryCostInfo;
......
...@@ -38,8 +38,8 @@ static FORCE_INLINE SWindowResult *getWindowResult(SWindowResInfo *pWindowResInf ...@@ -38,8 +38,8 @@ static FORCE_INLINE SWindowResult *getWindowResult(SWindowResInfo *pWindowResInf
return &pWindowResInfo->pResult[slot]; return &pWindowResInfo->pResult[slot];
} }
#define curTimeWindowIndex(_winres) ((_winres)->curIndex) #define curTimeWindowIndex(_winres) ((_winres)->curIndex)
#define GET_TIMEWINDOW(_winresInfo, _win) (STimeWindow) {(_win)->skey, ((_win)->skey + (_winresInfo)->interval)} #define GET_TIMEWINDOW(_winresInfo, _win) (STimeWindow) {(_win)->skey, ((_win)->skey + (_winresInfo)->interval - 1)}
#define GET_ROW_PARAM_FOR_MULTIOUTPUT(_q, tbq, sq) (((tbq) && (!sq))? (_q)->pSelectExpr[1].base.arg->argValue.i64:1) #define GET_ROW_PARAM_FOR_MULTIOUTPUT(_q, tbq, sq) (((tbq) && (!sq))? (_q)->pSelectExpr[1].base.arg->argValue.i64:1)
bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot); bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot);
......
...@@ -138,11 +138,11 @@ typedef struct SInterpInfoDetail { ...@@ -138,11 +138,11 @@ typedef struct SInterpInfoDetail {
typedef struct SResultInfo { typedef struct SResultInfo {
int8_t hasResult; // result generated, not NULL value int8_t hasResult; // result generated, not NULL value
bool initialized:1; // output buffer has been initialized bool initialized; // output buffer has been initialized
bool complete:1; // query has completed bool complete; // query has completed
bool superTableQ:1; // is super table query bool superTableQ; // is super table query
int16_t numOfRes; // num of output result in current buffer uint32_t bufLen; // buffer size
uint32_t bufLen; // buffer size uint64_t numOfRes; // num of output result in current buffer
void* interResultBuf; // output result buffer void* interResultBuf; // output result buffer
} SResultInfo; } SResultInfo;
......
...@@ -970,7 +970,6 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * ...@@ -970,7 +970,6 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
int32_t index = pWindowResInfo->curIndex; int32_t index = pWindowResInfo->curIndex;
STimeWindow nextWin = win; STimeWindow nextWin = win;
assert(tsCols != NULL);
while (1) { while (1) {
int32_t prevEndPos = (forwardStep - 1) * step + startPos; int32_t prevEndPos = (forwardStep - 1) * step + startPos;
...@@ -2668,7 +2667,7 @@ int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *param) ...@@ -2668,7 +2667,7 @@ int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *param)
} }
int32_t mergeIntoGroupResult(SQInfo *pQInfo) { int32_t mergeIntoGroupResult(SQInfo *pQInfo) {
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampUs();
int32_t ret = TSDB_CODE_SUCCESS; int32_t ret = TSDB_CODE_SUCCESS;
int32_t numOfGroups = (int32_t)(GET_NUM_OF_TABLEGROUP(pQInfo)); int32_t numOfGroups = (int32_t)(GET_NUM_OF_TABLEGROUP(pQInfo));
...@@ -2695,9 +2694,11 @@ int32_t mergeIntoGroupResult(SQInfo *pQInfo) { ...@@ -2695,9 +2694,11 @@ int32_t mergeIntoGroupResult(SQInfo *pQInfo) {
SET_STABLE_QUERY_OVER(pQInfo); SET_STABLE_QUERY_OVER(pQInfo);
} }
qDebug("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%" PRId64 "ms", pQInfo, int64_t elapsedTime = taosGetTimestampUs() - st;
pQInfo->groupIndex - 1, numOfGroups, taosGetTimestampMs() - st); qDebug("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%" PRId64 "us", pQInfo,
pQInfo->groupIndex - 1, numOfGroups, elapsedTime);
pQInfo->runtimeEnv.summary.firstStageMergeTime += elapsedTime;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -2830,6 +2831,13 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { ...@@ -2830,6 +2831,13 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
while (1) { while (1) {
if (IS_QUERY_KILLED(pQInfo)) { if (IS_QUERY_KILLED(pQInfo)) {
qDebug("QInfo:%p it is already killed, abort", pQInfo); qDebug("QInfo:%p it is already killed, abort", pQInfo);
taosTFree(pTableList);
taosTFree(posList);
taosTFree(pTree);
taosTFree(pResultInfo);
taosTFree(buf);
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
} }
...@@ -3996,9 +4004,9 @@ static void queryCostStatis(SQInfo *pQInfo) { ...@@ -3996,9 +4004,9 @@ static void queryCostStatis(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQueryCostInfo *pSummary = &pRuntimeEnv->summary; SQueryCostInfo *pSummary = &pRuntimeEnv->summary;
qDebug("QInfo:%p :cost summary: elapsed time:%"PRId64" us, total blocks:%d, load block statis:%d," qDebug("QInfo:%p :cost summary: elapsed time:%"PRId64" us, first merge:%"PRId64" us, total blocks:%d, "
" load data block:%d, total rows:%"PRId64 ", check rows:%"PRId64, "load block statis:%d, load data block:%d, total rows:%"PRId64 ", check rows:%"PRId64,
pQInfo, pSummary->elapsedTime, pSummary->totalBlocks, pSummary->loadBlockStatis, pQInfo, pSummary->elapsedTime, pSummary->firstStageMergeTime, pSummary->totalBlocks, pSummary->loadBlockStatis,
pSummary->loadBlocks, pSummary->totalRows, pSummary->totalCheckedRows); pSummary->loadBlocks, pSummary->totalRows, pSummary->totalCheckedRows);
qDebug("QInfo:%p :cost summary: internal size:%"PRId64", numOfWin:%"PRId64, pQInfo, pSummary->internalSupSize, qDebug("QInfo:%p :cost summary: internal size:%"PRId64", numOfWin:%"PRId64, pQInfo, pSummary->internalSupSize,
......
...@@ -232,7 +232,7 @@ static void writeDataToDisk(STSBuf* pTSBuf) { ...@@ -232,7 +232,7 @@ static void writeDataToDisk(STSBuf* pTSBuf) {
TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize); TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize);
int64_t r = fseek(pTSBuf->f, pTSBuf->fileSize, SEEK_SET); int64_t r = fseek(pTSBuf->f, pTSBuf->fileSize, SEEK_SET);
UNUSED(r); assert(r == 0);
/* /*
* format for output data: * format for output data:
...@@ -241,13 +241,14 @@ static void writeDataToDisk(STSBuf* pTSBuf) { ...@@ -241,13 +241,14 @@ static void writeDataToDisk(STSBuf* pTSBuf) {
* *
* both side has the compressed length is used to support load data forwards/backwords. * both side has the compressed length is used to support load data forwards/backwords.
*/ */
fwrite(&pBlock->tag.nType, sizeof(pBlock->tag.nType), 1, pTSBuf->f); int32_t metaLen = 0;
fwrite(&pBlock->tag.nLen, sizeof(pBlock->tag.nLen), 1, pTSBuf->f); metaLen += fwrite(&pBlock->tag.nType, 1, sizeof(pBlock->tag.nType), pTSBuf->f);
metaLen += fwrite(&pBlock->tag.nLen, 1, sizeof(pBlock->tag.nLen), pTSBuf->f);
if (pBlock->tag.nType == TSDB_DATA_TYPE_BINARY || pBlock->tag.nType == TSDB_DATA_TYPE_NCHAR) { if (pBlock->tag.nType == TSDB_DATA_TYPE_BINARY || pBlock->tag.nType == TSDB_DATA_TYPE_NCHAR) {
fwrite(pBlock->tag.pz, (size_t)pBlock->tag.nLen, 1, pTSBuf->f); metaLen += fwrite(pBlock->tag.pz, 1, (size_t)pBlock->tag.nLen, pTSBuf->f);
} else if (pBlock->tag.nType != TSDB_DATA_TYPE_NULL) { } else if (pBlock->tag.nType != TSDB_DATA_TYPE_NULL) {
fwrite(&pBlock->tag.i64Key, sizeof(int64_t), 1, pTSBuf->f); metaLen += fwrite(&pBlock->tag.i64Key, 1, sizeof(int64_t), pTSBuf->f);
} }
fwrite(&pBlock->numOfElem, sizeof(pBlock->numOfElem), 1, pTSBuf->f); fwrite(&pBlock->numOfElem, sizeof(pBlock->numOfElem), 1, pTSBuf->f);
...@@ -255,7 +256,7 @@ static void writeDataToDisk(STSBuf* pTSBuf) { ...@@ -255,7 +256,7 @@ static void writeDataToDisk(STSBuf* pTSBuf) {
fwrite(pBlock->payload, (size_t)pBlock->compLen, 1, pTSBuf->f); fwrite(pBlock->payload, (size_t)pBlock->compLen, 1, pTSBuf->f);
fwrite(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f); fwrite(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f);
int32_t blockSize = sizeof(pBlock->tag) + sizeof(pBlock->numOfElem) + sizeof(pBlock->compLen) * 2 + pBlock->compLen; int32_t blockSize = metaLen + sizeof(pBlock->numOfElem) + sizeof(pBlock->compLen) * 2 + pBlock->compLen;
pTSBuf->fileSize += blockSize; pTSBuf->fileSize += blockSize;
pTSBuf->tsData.len = 0; pTSBuf->tsData.len = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册