提交 5495b2c3 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/data_format

...@@ -219,6 +219,16 @@ typedef struct { ...@@ -219,6 +219,16 @@ typedef struct {
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP) #define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
#define SORT_QSORT_T 0x1
#define SORT_SPILLED_MERGE_SORT_T 0x2
typedef struct SSortExecInfo {
int32_t sortMethod;
int32_t sortBuffer;
int32_t loops; // loop count
int32_t writeBytes; // write io bytes
int32_t readBytes; // read io bytes
} SSortExecInfo;
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -198,7 +198,7 @@ void colDataTrim(SColumnInfoData* pColumnInfoData); ...@@ -198,7 +198,7 @@ void colDataTrim(SColumnInfoData* pColumnInfoData);
size_t blockDataGetNumOfCols(const SSDataBlock* pBlock); size_t blockDataGetNumOfCols(const SSDataBlock* pBlock);
size_t blockDataGetNumOfRows(const SSDataBlock* pBlock); size_t blockDataGetNumOfRows(const SSDataBlock* pBlock);
int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc, SArray* pIndexMap); int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc);
int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex, int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex,
int32_t pageSize); int32_t pageSize);
int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock); int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock);
......
...@@ -660,8 +660,7 @@ typedef struct { ...@@ -660,8 +660,7 @@ typedef struct {
int32_t tz; // query client timezone int32_t tz; // query client timezone
char intervalUnit; char intervalUnit;
char slidingUnit; char slidingUnit;
char char offsetUnit;
offsetUnit; // TODO Remove it, the offset is the number of precision tickle, and it must be a immutable duration.
int8_t precision; int8_t precision;
int64_t interval; int64_t interval;
int64_t sliding; int64_t sliding;
......
...@@ -361,19 +361,13 @@ int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex) ...@@ -361,19 +361,13 @@ int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex)
return 0; return 0;
} }
// if pIndexMap = NULL, merger one column by on column int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc) {
int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc, SArray* pIndexMap) {
assert(pSrc != NULL && pDest != NULL); assert(pSrc != NULL && pDest != NULL);
int32_t capacity = pDest->info.capacity; int32_t capacity = pDest->info.capacity;
for (int32_t i = 0; i < pDest->info.numOfCols; ++i) { for (int32_t i = 0; i < pDest->info.numOfCols; ++i) {
int32_t mapIndex = i;
// if (pIndexMap) {
// mapIndex = *(int32_t*)taosArrayGet(pIndexMap, i);
// }
SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i); SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i);
SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, mapIndex); SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, i);
capacity = pDest->info.capacity; capacity = pDest->info.capacity;
colDataMergeCol(pCol2, pDest->info.rows, &capacity, pCol1, pSrc->info.rows); colDataMergeCol(pCol2, pDest->info.rows, &capacity, pCol1, pSrc->info.rows);
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#include "commandInt.h" #include "commandInt.h"
#include "plannodes.h" #include "plannodes.h"
#include "query.h" #include "query.h"
#include "tcommon.h"
int32_t qExplainGenerateResNode(SPhysiNode *pNode, SExplainGroup *group, SExplainResNode **pRes); int32_t qExplainGenerateResNode(SPhysiNode *pNode, SExplainGroup *group, SExplainResNode **pRes);
int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level); int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level);
...@@ -637,13 +638,48 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i ...@@ -637,13 +638,48 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
} }
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, pSortNode->pSortKeys->length);
SDataBlockDescNode* pDescNode = pSortNode->node.pOutputDataBlockDesc;
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, nodesGetOutputNumFromSlotList(pDescNode->pSlots));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pSortNode->node.pOutputDataBlockDesc->totalRowSize); EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pDescNode->totalRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END(); EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
if (EXPLAIN_MODE_ANALYZE == ctx->mode) {
// sort key
EXPLAIN_ROW_NEW(level, "Sort Key: ");
if (pResNode->pExecInfo) {
for (int32_t i = 0; i < LIST_LENGTH(pSortNode->pSortKeys); ++i) {
SOrderByExprNode *ptn = nodesListGetNode(pSortNode->pSortKeys, i);
EXPLAIN_ROW_APPEND("%s ", nodesGetNameFromColumnNode(ptn->pExpr));
}
}
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
// sort method
EXPLAIN_ROW_NEW(level, "Sort Method: ");
int32_t nodeNum = taosArrayGetSize(pResNode->pExecInfo);
SExplainExecInfo *execInfo = taosArrayGet(pResNode->pExecInfo, 0);
SSortExecInfo * pExecInfo = (SSortExecInfo *)execInfo->verboseInfo;
EXPLAIN_ROW_APPEND("%s", pExecInfo->sortMethod == SORT_QSORT_T ? "quicksort" : "merge sort");
if (pExecInfo->sortBuffer > 1024 * 1024) {
EXPLAIN_ROW_APPEND(" Buffers:%.2f Mb", pExecInfo->sortBuffer / (1024 * 1024.0));
} else if (pExecInfo->sortBuffer > 1024) {
EXPLAIN_ROW_APPEND(" Buffers:%.2f Kb", pExecInfo->sortBuffer / (1024.0));
} else {
EXPLAIN_ROW_APPEND(" Buffers:%d b", pExecInfo->sortBuffer);
}
EXPLAIN_ROW_APPEND(" loops:%d", pExecInfo->loops);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
}
if (verbose) { if (verbose) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT); EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT,
...@@ -792,13 +828,8 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i ...@@ -792,13 +828,8 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
} }
// EXPLAIN_ROW_APPEND(EXPLAIN_FUNCTIONS_FORMAT, pPartNode->length);
// EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pPartNode->node.pOutputDataBlockDesc->totalRowSize); EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pPartNode->node.pOutputDataBlockDesc->totalRowSize);
// if (pPartNode->pGroupKeys) {
// EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
// EXPLAIN_ROW_APPEND(EXPLAIN_GROUPS_FORMAT, pPartNode->pGroupKeys->length);
// }
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END(); EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
......
...@@ -622,18 +622,14 @@ typedef struct SSortedMergeOperatorInfo { ...@@ -622,18 +622,14 @@ typedef struct SSortedMergeOperatorInfo {
typedef struct SSortOperatorInfo { typedef struct SSortOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
uint32_t sortBufSize; // max buffer size for in-memory sort uint32_t sortBufSize; // max buffer size for in-memory sort
SArray* pSortInfo; SArray* pSortInfo;
SSortHandle* pSortHandle; SSortHandle* pSortHandle;
SArray* pColMatchInfo; // for index map from table scan output SArray* pColMatchInfo; // for index map from table scan output
int32_t bufPageSize; int32_t bufPageSize;
// TODO extact struct int64_t startTs; // sort start time
int64_t startTs; // sort start time uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included.
uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included.
uint64_t totalSize; // total load bytes from remote
uint64_t totalRows; // total number of rows
uint64_t totalElapsed; // total elapsed time
} SSortOperatorInfo; } SSortOperatorInfo;
typedef struct STagFilterOperatorInfo { typedef struct STagFilterOperatorInfo {
......
...@@ -137,6 +137,14 @@ void* tsortGetValue(STupleHandle* pVHandle, int32_t colId); ...@@ -137,6 +137,14 @@ void* tsortGetValue(STupleHandle* pVHandle, int32_t colId);
*/ */
SSDataBlock* tsortGetSortedDataBlock(const SSortHandle* pSortHandle); SSDataBlock* tsortGetSortedDataBlock(const SSortHandle* pSortHandle);
/**
* return the sort execution information.
*
* @param pHandle
* @return
*/
SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -806,7 +806,7 @@ static SSDataBlock* getDataFromCatch(SStreamBlockScanInfo* pInfo) { ...@@ -806,7 +806,7 @@ static SSDataBlock* getDataFromCatch(SStreamBlockScanInfo* pInfo) {
SSDataBlock* pDB = createOneDataBlock(pInfo->pRes, false); SSDataBlock* pDB = createOneDataBlock(pInfo->pRes, false);
blockDataFromBuf(pDB, buf); blockDataFromBuf(pDB, buf);
SSDataBlock* pSub = blockDataExtractBlock(pDB, pos->rowId, 1); SSDataBlock* pSub = blockDataExtractBlock(pDB, pos->rowId, 1);
blockDataMerge(pInfo->pRes, pSub, NULL); blockDataMerge(pInfo->pRes, pSub);
blockDataDestroy(pDB); blockDataDestroy(pDB);
blockDataDestroy(pSub); blockDataDestroy(pSub);
} }
...@@ -1046,8 +1046,9 @@ static void destroySysScanOperator(void* param, int32_t numOfOutput) { ...@@ -1046,8 +1046,9 @@ static void destroySysScanOperator(void* param, int32_t numOfOutput) {
blockDataDestroy(pInfo->pRes); blockDataDestroy(pInfo->pRes);
const char* name = tNameGetTableName(&pInfo->name); const char* name = tNameGetTableName(&pInfo->name);
if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) { if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0 || pInfo->pCur != NULL) {
metaCloseTbCursor(pInfo->pCur); metaCloseTbCursor(pInfo->pCur);
pInfo->pCur = NULL;
} }
taosArrayDestroy(pInfo->scanCols); taosArrayDestroy(pInfo->scanCols);
......
...@@ -2,6 +2,9 @@ ...@@ -2,6 +2,9 @@
#include "executorimpl.h" #include "executorimpl.h"
static SSDataBlock* doSort(SOperatorInfo* pOperator); static SSDataBlock* doSort(SOperatorInfo* pOperator);
static int32_t doOpenSortOperator(SOperatorInfo* pOperator);
static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len);
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput); static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput);
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExprInfo* pExprInfo, int32_t numOfCols,
...@@ -35,7 +38,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR ...@@ -35,7 +38,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doSort, NULL, NULL, destroyOrderOperatorInfo, NULL, NULL, NULL); createOperatorFpSet(doOpenSortOperator, doSort, NULL, NULL, destroyOrderOperatorInfo, NULL, NULL, getExplainExecInfo);
int32_t code = appendDownstream(pOperator, &downstream, 1); int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator; return pOperator;
...@@ -121,20 +124,17 @@ void applyScalarFunction(SSDataBlock* pBlock, void* param) { ...@@ -121,20 +124,17 @@ void applyScalarFunction(SSDataBlock* pBlock, void* param) {
} }
} }
SSDataBlock* doSort(SOperatorInfo* pOperator) { int32_t doOpenSortOperator(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSortOperatorInfo* pInfo = pOperator->info; SSortOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
if (pOperator->status == OP_RES_TO_RETURN) { if (OPTR_IS_OPENED(pOperator)) {
return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, pInfo->pColMatchInfo); return TSDB_CODE_SUCCESS;
} }
// pInfo->binfo.pRes is not equalled to the input datablock. pInfo->startTs = taosGetTimestampUs();
// int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
// pInfo->binfo.pRes is not equalled to the input datablock.
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, pInfo->pColMatchInfo, SORT_SINGLESOURCE_SORT, pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, pInfo->pColMatchInfo, SORT_SINGLESOURCE_SORT,
-1, -1, NULL, pTaskInfo->id.str); -1, -1, NULL, pTaskInfo->id.str);
...@@ -146,12 +146,39 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) { ...@@ -146,12 +146,39 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
int32_t code = tsortOpen(pInfo->pSortHandle); int32_t code = tsortOpen(pInfo->pSortHandle);
taosMemoryFreeClear(ps); taosMemoryFreeClear(ps);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, terrno); longjmp(pTaskInfo->env, terrno);
} }
pOperator->cost.openCost = (taosGetTimestampUs() - pInfo->startTs)/1000.0;
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, pInfo->pColMatchInfo);
OPTR_SET_OPENED(pOperator);
return TSDB_CODE_SUCCESS;
}
SSDataBlock* doSort(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSortOperatorInfo* pInfo = pOperator->info;
int32_t code = pOperator->fpSet._openFn(pOperator);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
}
SSDataBlock* pBlock = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, pInfo->pColMatchInfo);
if (pBlock != NULL) {
pOperator->resultInfo.totalRows += pBlock->info.rows;
} else {
doSetOperatorCompleted(pOperator);
}
return pBlock;
} }
void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) { void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
...@@ -161,3 +188,15 @@ void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -161,3 +188,15 @@ void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
taosArrayDestroy(pInfo->pSortInfo); taosArrayDestroy(pInfo->pSortInfo);
taosArrayDestroy(pInfo->pColMatchInfo); taosArrayDestroy(pInfo->pColMatchInfo);
} }
int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
ASSERT(pOptr != NULL);
SSortExecInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortExecInfo));
SSortOperatorInfo *pOperatorInfo = (SSortOperatorInfo*)pOptr->info;
*pInfo = tsortGetSortExecInfo(pOperatorInfo->pSortHandle);
*pOptrExplain = pInfo;
*len = sizeof(SSortExecInfo);
return TSDB_CODE_SUCCESS;
}
...@@ -31,20 +31,16 @@ struct STupleHandle { ...@@ -31,20 +31,16 @@ struct STupleHandle {
struct SSortHandle { struct SSortHandle {
int32_t type; int32_t type;
int32_t pageSize; int32_t pageSize;
int32_t numOfPages; int32_t numOfPages;
SDiskbasedBuf *pBuf; SDiskbasedBuf *pBuf;
SArray *pSortInfo; SArray *pSortInfo;
SArray *pIndexMap;
SArray *pOrderedSource; SArray *pOrderedSource;
_sort_fetch_block_fn_t fetchfp; int32_t loops;
_sort_merge_compar_fn_t comparFn;
SMultiwayMergeTreeInfo *pMergeTree;
int64_t startTs;
uint64_t sortElapsed; uint64_t sortElapsed;
int64_t startTs;
uint64_t totalElapsed; uint64_t totalElapsed;
int32_t sourceId; int32_t sourceId;
...@@ -53,13 +49,15 @@ struct SSortHandle { ...@@ -53,13 +49,15 @@ struct SSortHandle {
int32_t numOfCompletedSources; int32_t numOfCompletedSources;
bool opened; bool opened;
const char *idStr; const char *idStr;
bool inMemSort; bool inMemSort;
bool needAdjust; bool needAdjust;
STupleHandle tupleHandle; STupleHandle tupleHandle;
void *param; void *param;
void (*beforeFp)(SSDataBlock* pBlock, void* param); void (*beforeFp)(SSDataBlock* pBlock, void* param);
_sort_fetch_block_fn_t fetchfp;
_sort_merge_compar_fn_t comparFn;
SMultiwayMergeTreeInfo *pMergeTree;
}; };
static int32_t msortComparFn(const void *pLeft, const void *pRight, void *param); static int32_t msortComparFn(const void *pLeft, const void *pRight, void *param);
...@@ -80,7 +78,7 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, SArray* pIndexMap, int32_t ...@@ -80,7 +78,7 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, SArray* pIndexMap, int32_t
pSortHandle->pageSize = pageSize; pSortHandle->pageSize = pageSize;
pSortHandle->numOfPages = numOfPages; pSortHandle->numOfPages = numOfPages;
pSortHandle->pSortInfo = pSortInfo; pSortHandle->pSortInfo = pSortInfo;
pSortHandle->pIndexMap = pIndexMap; pSortHandle->loops = 0;
if (pBlock != NULL) { if (pBlock != NULL) {
pSortHandle->pDataBlock = createOneDataBlock(pBlock, false); pSortHandle->pDataBlock = createOneDataBlock(pBlock, false);
...@@ -415,6 +413,9 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { ...@@ -415,6 +413,9 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
int32_t numOfRows = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize); int32_t numOfRows = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize);
blockDataEnsureCapacity(pHandle->pDataBlock, numOfRows); blockDataEnsureCapacity(pHandle->pDataBlock, numOfRows);
// the initial pass + sortPass + final mergePass
pHandle->loops = sortPass + 2;
size_t numOfSorted = taosArrayGetSize(pHandle->pOrderedSource); size_t numOfSorted = taosArrayGetSize(pHandle->pOrderedSource);
for(int32_t t = 0; t < sortPass; ++t) { for(int32_t t = 0; t < sortPass; ++t) {
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
...@@ -502,12 +503,13 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { ...@@ -502,12 +503,13 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
return 0; return 0;
} }
static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) { static int32_t createInitialSources(SSortHandle* pHandle) {
size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize; size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize;
if (pHandle->type == SORT_SINGLESOURCE_SORT) { if (pHandle->type == SORT_SINGLESOURCE_SORT) {
SSortSource* source = taosArrayGetP(pHandle->pOrderedSource, 0); SSortSource* source = taosArrayGetP(pHandle->pOrderedSource, 0);
taosArrayClear(pHandle->pOrderedSource); taosArrayClear(pHandle->pOrderedSource);
while (1) { while (1) {
SSDataBlock* pBlock = pHandle->fetchfp(source->param); SSDataBlock* pBlock = pHandle->fetchfp(source->param);
if (pBlock == NULL) { if (pBlock == NULL) {
...@@ -524,6 +526,7 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) { ...@@ -524,6 +526,7 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) {
} else { } else {
pHandle->pageSize = 4096; pHandle->pageSize = 4096;
} }
// todo!! // todo!!
pHandle->numOfPages = 1024; pHandle->numOfPages = 1024;
sortBufSize = pHandle->numOfPages * pHandle->pageSize; sortBufSize = pHandle->numOfPages * pHandle->pageSize;
...@@ -535,7 +538,7 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) { ...@@ -535,7 +538,7 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) {
} }
// todo relocate the columns // todo relocate the columns
int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock, pHandle->pIndexMap); int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock);
if (code != 0) { if (code != 0) {
return code; return code;
} }
...@@ -569,6 +572,7 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) { ...@@ -569,6 +572,7 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) {
pHandle->cmpParam.numOfSources = 1; pHandle->cmpParam.numOfSources = 1;
pHandle->inMemSort = true; pHandle->inMemSort = true;
pHandle->loops = 1;
pHandle->tupleHandle.rowIndex = -1; pHandle->tupleHandle.rowIndex = -1;
pHandle->tupleHandle.pBlock = pHandle->pDataBlock; pHandle->tupleHandle.pBlock = pHandle->pDataBlock;
return 0; return 0;
...@@ -592,7 +596,7 @@ int32_t tsortOpen(SSortHandle* pHandle) { ...@@ -592,7 +596,7 @@ int32_t tsortOpen(SSortHandle* pHandle) {
pHandle->opened = true; pHandle->opened = true;
int32_t code = createInitialSortedMultiSources(pHandle); int32_t code = createInitialSources(pHandle);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -692,3 +696,20 @@ void* tsortGetValue(STupleHandle* pVHandle, int32_t colIndex) { ...@@ -692,3 +696,20 @@ void* tsortGetValue(STupleHandle* pVHandle, int32_t colIndex) {
SColumnInfoData* pColInfo = TARRAY_GET_ELEM(pVHandle->pBlock->pDataBlock, colIndex); SColumnInfoData* pColInfo = TARRAY_GET_ELEM(pVHandle->pBlock->pDataBlock, colIndex);
return colDataGetData(pColInfo, pVHandle->rowIndex); return colDataGetData(pColInfo, pVHandle->rowIndex);
} }
SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle) {
SSortExecInfo info = {0};
info.sortBuffer = pHandle->pageSize * pHandle->numOfPages;
info.sortMethod = pHandle->inMemSort? SORT_QSORT_T:SORT_SPILLED_MERGE_SORT_T;
info.loops = pHandle->loops;
if (pHandle->pBuf != NULL) {
SDiskbasedBufStatis st = getDBufStatis(pHandle->pBuf);
info.writeBytes = st.flushBytes;
info.readBytes = st.loadBytes;
}
return info;
}
...@@ -189,6 +189,7 @@ static int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, con ...@@ -189,6 +189,7 @@ static int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, con
const char* msg1 = "name too long"; const char* msg1 = "name too long";
const char* msg2 = "invalid database name"; const char* msg2 = "invalid database name";
const char* msg3 = "db is not specified"; const char* msg3 = "db is not specified";
const char* msg4 = "invalid table name";
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
char* p = strnchr(pTableName->z, TS_PATH_DELIMITER[0], pTableName->n, true); char* p = strnchr(pTableName->z, TS_PATH_DELIMITER[0], pTableName->n, true);
...@@ -207,6 +208,10 @@ static int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, con ...@@ -207,6 +208,10 @@ static int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, con
} }
int32_t tbLen = pTableName->n - dbLen - 1; int32_t tbLen = pTableName->n - dbLen - 1;
if (tbLen <= 0) {
return buildInvalidOperationMsg(pMsgBuf, msg4);
}
char tbname[TSDB_TABLE_FNAME_LEN] = {0}; char tbname[TSDB_TABLE_FNAME_LEN] = {0};
strncpy(tbname, p + 1, tbLen); strncpy(tbname, p + 1, tbLen);
/*tbLen = */ strdequote(tbname); /*tbLen = */ strdequote(tbname);
......
...@@ -549,11 +549,16 @@ void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) { ...@@ -549,11 +549,16 @@ void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) {
// print the statistics information // print the statistics information
{ {
SDiskbasedBufStatis* ps = &pBuf->statis; SDiskbasedBufStatis* ps = &pBuf->statis;
uDebug( if (ps->loadPages == 0) {
"Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages), avgPageSize:%.2f " uDebug(
"Kb\n", "Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages)",
ps->getPages, ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f, ps->getPages, ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f, ps->loadPages);
ps->loadPages, ps->loadBytes / (1024.0 * ps->loadPages)); } else {
uDebug(
"Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages), avgPageSize:%.2f Kb",
ps->getPages, ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f,
ps->loadPages, ps->loadBytes / (1024.0 * ps->loadPages));
}
} }
taosRemoveFile(pBuf->path); taosRemoveFile(pBuf->path);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册