提交 5bbd7dd9 编写于 作者: H Haojun Liao

[TD-1870]

上级 2314b195
...@@ -66,7 +66,7 @@ typedef struct SLocalReducer { ...@@ -66,7 +66,7 @@ typedef struct SLocalReducer {
SFillInfo* pFillInfo; // interpolation support structure SFillInfo* pFillInfo; // interpolation support structure
char * pFinalRes; // result data after interpo char * pFinalRes; // result data after interpo
tFilePage * discardData; tFilePage * discardData;
SResultInfo * pResInfo; SResultRowCellInfo * pResInfo;
bool discard; bool discard;
int32_t offset; // limit offset value int32_t offset; // limit offset value
bool orderPrjOnSTable; // projection query on stable bool orderPrjOnSTable; // projection query on stable
......
此差异已折叠。
...@@ -99,12 +99,12 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalReducer *pReducer, tOrderDesc ...@@ -99,12 +99,12 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalReducer *pReducer, tOrderDesc
pCtx->param[1].i64Key = pQueryInfo->order.orderColId; pCtx->param[1].i64Key = pQueryInfo->order.orderColId;
} }
SResultInfo *pResInfo = &pReducer->pResInfo[i]; // SResultRowCellInfo *pResInfo = &pReducer->pResInfo[i];
pResInfo->bufLen = pExpr->interBytes; pCtx->interBufBytes = pExpr->interBytes;
pResInfo->interResultBuf = calloc(1, (size_t) pResInfo->bufLen); // pResInfo->interResultBuf = calloc(1, (size_t) pCtx->interBufBytes);
pCtx->resultInfo = &pReducer->pResInfo[i]; pCtx->resultInfo = &pReducer->pResInfo[i];
pCtx->resultInfo->superTableQ = true; pCtx->stableQuery = true;
} }
int16_t n = 0; int16_t n = 0;
...@@ -345,7 +345,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd ...@@ -345,7 +345,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
size_t numOfCols = tscSqlExprNumOfExprs(pQueryInfo); size_t numOfCols = tscSqlExprNumOfExprs(pQueryInfo);
pReducer->pTempBuffer->num = 0; pReducer->pTempBuffer->num = 0;
pReducer->pResInfo = calloc(numOfCols, sizeof(SResultInfo)); pReducer->pResInfo = calloc(numOfCols, sizeof(SResultRowCellInfo));
tscCreateResPointerInfo(pRes, pQueryInfo); tscCreateResPointerInfo(pRes, pQueryInfo);
tscInitSqlContext(pCmd, pReducer, pDesc); tscInitSqlContext(pCmd, pReducer, pDesc);
...@@ -512,7 +512,7 @@ void tscDestroyLocalReducer(SSqlObj *pSql) { ...@@ -512,7 +512,7 @@ void tscDestroyLocalReducer(SSqlObj *pSql) {
if (pLocalReducer->pResInfo != NULL) { if (pLocalReducer->pResInfo != NULL) {
size_t num = tscSqlExprNumOfExprs(pQueryInfo); size_t num = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
taosTFree(pLocalReducer->pResInfo[i].interResultBuf); // taosTFree(pLocalReducer->pResInfo[i].interResultBuf);
} }
taosTFree(pLocalReducer->pResInfo); taosTFree(pLocalReducer->pResInfo);
...@@ -1072,7 +1072,7 @@ static int64_t getNumOfResultLocal(SQueryInfo *pQueryInfo, SQLFunctionCtx *pCtx) ...@@ -1072,7 +1072,7 @@ static int64_t getNumOfResultLocal(SQueryInfo *pQueryInfo, SQLFunctionCtx *pCtx)
continue; continue;
} }
SResultInfo* pResInfo = GET_RES_INFO(&pCtx[j]); SResultRowCellInfo* pResInfo = GET_RES_INFO(&pCtx[j]);
if (maxOutput < pResInfo->numOfRes) { if (maxOutput < pResInfo->numOfRes) {
maxOutput = pResInfo->numOfRes; maxOutput = pResInfo->numOfRes;
} }
......
...@@ -61,14 +61,14 @@ typedef struct SSqlGroupbyExpr { ...@@ -61,14 +61,14 @@ typedef struct SSqlGroupbyExpr {
int16_t orderType; // order by type: asc/desc int16_t orderType; // order by type: asc/desc
} SSqlGroupbyExpr; } SSqlGroupbyExpr;
typedef struct SWindowResult { typedef struct SResultRow {
int32_t pageId; // pageId & rowId is the position of current result in disk-based output buffer int32_t pageId; // pageId & rowId is the position of current result in disk-based output buffer
int32_t rowId:15; int32_t rowId:15;
bool closed:1; // this result status: closed or opened bool closed:1; // this result status: closed or opened
uint16_t numOfRows; // number of rows of current time window uint16_t numOfRows; // number of rows of current time window
SResultInfo* resultInfo; // For each result column, there is a resultInfo SResultRowCellInfo* pCellInfo; // For each result column, there is a resultInfo
union {STimeWindow win; char* key;}; // start key of current time window union {STimeWindow win; char* key;}; // start key of current time window
} SWindowResult; } SResultRow;
/** /**
* If the number of generated results is greater than this value, * If the number of generated results is greater than this value,
...@@ -82,7 +82,7 @@ typedef struct SResultRec { ...@@ -82,7 +82,7 @@ typedef struct SResultRec {
} SResultRec; } SResultRec;
typedef struct SWindowResInfo { typedef struct SWindowResInfo {
SWindowResult** pResult; // result list SResultRow** pResult; // result list
int16_t type; // data type for hash key int16_t type; // data type for hash key
int32_t capacity; // max capacity int32_t capacity; // max capacity
int32_t curIndex; // current start active index int32_t curIndex; // current start active index
...@@ -169,11 +169,11 @@ typedef struct SQuery { ...@@ -169,11 +169,11 @@ typedef struct SQuery {
typedef struct SQueryRuntimeEnv { typedef struct SQueryRuntimeEnv {
jmp_buf env; jmp_buf env;
SResultInfo* resultInfo; // todo refactor to merge with SWindowResInfo SResultRowCellInfo* resultInfo; // todo refactor to merge with SWindowResInfo
SQuery* pQuery; SQuery* pQuery;
SQLFunctionCtx* pCtx; SQLFunctionCtx* pCtx;
int32_t numOfRowsPerPage; int32_t numOfRowsPerPage;
int16_t offset[TSDB_MAX_COLUMNS]; uint16_t offset[TSDB_MAX_COLUMNS];
uint16_t scanFlag; // denotes reversed scan of data or not uint16_t scanFlag; // denotes reversed scan of data or not
SFillInfo* pFillInfo; SFillInfo* pFillInfo;
SWindowResInfo windowResInfo; SWindowResInfo windowResInfo;
...@@ -192,6 +192,8 @@ typedef struct SQueryRuntimeEnv { ...@@ -192,6 +192,8 @@ typedef struct SQueryRuntimeEnv {
SHashObj* pWindowHashTable; // quick locate the window object for each result SHashObj* pWindowHashTable; // quick locate the window object for each result
char* keyBuf; // window key buffer char* keyBuf; // window key buffer
SWindowResultPool* pool; // window result object pool SWindowResultPool* pool; // window result object pool
int32_t* rowCellInfoOffset;// offset value for each row result cell info
} SQueryRuntimeEnv; } SQueryRuntimeEnv;
enum { enum {
......
...@@ -26,8 +26,9 @@ ...@@ -26,8 +26,9 @@
int32_t getOutputInterResultBufSize(SQuery* pQuery); int32_t getOutputInterResultBufSize(SQuery* pQuery);
void clearTimeWindowResBuf(SQueryRuntimeEnv* pRuntimeEnv, SWindowResult* pOneOutputRes); void clearResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* pRow);
void copyTimeWindowResBuf(SQueryRuntimeEnv* pRuntimeEnv, SWindowResult* dst, const SWindowResult* src); void copyResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* dst, const SResultRow* src);
SResultRowCellInfo* getResultCell(SQueryRuntimeEnv* pRuntimeEnv, const SResultRow* pRow, int32_t index);
int32_t initWindowResInfo(SWindowResInfo* pWindowResInfo, SQueryRuntimeEnv* pRuntimeEnv, int32_t size, int32_t initWindowResInfo(SWindowResInfo* pWindowResInfo, SQueryRuntimeEnv* pRuntimeEnv, int32_t size,
int32_t threshold, int16_t type); int32_t threshold, int16_t type);
...@@ -42,7 +43,7 @@ void closeTimeWindow(SWindowResInfo* pWindowResInfo, int32_t slot); ...@@ -42,7 +43,7 @@ void closeTimeWindow(SWindowResInfo* pWindowResInfo, int32_t slot);
void closeAllTimeWindow(SWindowResInfo* pWindowResInfo); void closeAllTimeWindow(SWindowResInfo* pWindowResInfo);
void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_t order); void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_t order);
static FORCE_INLINE SWindowResult *getWindowResult(SWindowResInfo *pWindowResInfo, int32_t slot) { static FORCE_INLINE SResultRow *getWindowResult(SWindowResInfo *pWindowResInfo, int32_t slot) {
assert(pWindowResInfo != NULL && slot >= 0 && slot < pWindowResInfo->size); assert(pWindowResInfo != NULL && slot >= 0 && slot < pWindowResInfo->size);
return pWindowResInfo->pResult[slot]; return pWindowResInfo->pResult[slot];
} }
...@@ -52,9 +53,9 @@ static FORCE_INLINE SWindowResult *getWindowResult(SWindowResInfo *pWindowResInf ...@@ -52,9 +53,9 @@ static FORCE_INLINE SWindowResult *getWindowResult(SWindowResInfo *pWindowResInf
bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot); bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot);
int32_t createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, size_t interBufSize); int32_t createQueryResultInfo(SQuery *pQuery, SResultRow *pResultRow);
static FORCE_INLINE char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SWindowResult *pResult, static FORCE_INLINE char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SResultRow *pResult,
tFilePage* page) { tFilePage* page) {
assert(pResult != NULL && pRuntimeEnv != NULL); assert(pResult != NULL && pRuntimeEnv != NULL);
...@@ -74,7 +75,7 @@ __filter_func_t *getValueFilterFuncArray(int32_t type); ...@@ -74,7 +75,7 @@ __filter_func_t *getValueFilterFuncArray(int32_t type);
size_t getWindowResultSize(SQueryRuntimeEnv* pRuntimeEnv); size_t getWindowResultSize(SQueryRuntimeEnv* pRuntimeEnv);
SWindowResultPool* initWindowResultPool(size_t size); SWindowResultPool* initWindowResultPool(size_t size);
SWindowResult* getNewWindowResult(SWindowResultPool* p); SResultRow* getNewWindowResult(SWindowResultPool* p);
int64_t getWindowResultPoolMemSize(SWindowResultPool* p); int64_t getWindowResultPoolMemSize(SWindowResultPool* p);
void* destroyWindowResultPool(SWindowResultPool* p); void* destroyWindowResultPool(SWindowResultPool* p);
int32_t getNumOfAllocatedWindowResult(SWindowResultPool* p); int32_t getNumOfAllocatedWindowResult(SWindowResultPool* p);
......
...@@ -145,15 +145,14 @@ typedef struct SInterpInfoDetail { ...@@ -145,15 +145,14 @@ typedef struct SInterpInfoDetail {
int8_t primaryCol; int8_t primaryCol;
} SInterpInfoDetail; } SInterpInfoDetail;
typedef struct SResultInfo { typedef struct SResultRowCellInfo {
int8_t hasResult; // result generated, not NULL value int8_t hasResult; // result generated, not NULL value
bool initialized; // output buffer has been initialized bool initialized; // output buffer has been initialized
bool complete; // query has completed bool complete; // query has completed
bool superTableQ; // is super table query uint16_t numOfRes; // num of output result in current buffer
uint32_t bufLen; // buffer size } SResultRowCellInfo;
uint64_t numOfRes; // num of output result in current buffer
void* interResultBuf; // output result buffer #define GET_ROWCELL_INTERBUF(_c) ((void*) ((char*)(_c) + sizeof(SResultRowCellInfo)))
} SResultInfo;
struct SQLFunctionCtx; struct SQLFunctionCtx;
...@@ -175,9 +174,11 @@ typedef struct SQLFunctionCtx { ...@@ -175,9 +174,11 @@ typedef struct SQLFunctionCtx {
int16_t inputBytes; int16_t inputBytes;
int16_t outputType; int16_t outputType;
int16_t outputBytes; // size of results, determined by function and input column data type int16_t outputBytes; // size of results, determined by function and input column data type
bool hasNull; // null value exist in current block int32_t interBufBytes; // internal buffer size
bool hasNull; // null value exist in current block
bool requireNull; // require null in some function bool requireNull; // require null in some function
bool stableQuery;
int16_t functionId; // function id int16_t functionId; // function id
void * aInputElemBuf; void * aInputElemBuf;
char * aOutputBuf; // final result output buffer, point to sdata->data char * aOutputBuf; // final result output buffer, point to sdata->data
...@@ -189,7 +190,8 @@ typedef struct SQLFunctionCtx { ...@@ -189,7 +190,8 @@ typedef struct SQLFunctionCtx {
void * ptsOutputBuf; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/ void * ptsOutputBuf; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/
SQLPreAggVal preAggVals; SQLPreAggVal preAggVals;
tVariant tag; tVariant tag;
SResultInfo *resultInfo;
SResultRowCellInfo *resultInfo;
SExtTagsInfo tagInfo; SExtTagsInfo tagInfo;
} SQLFunctionCtx; } SQLFunctionCtx;
...@@ -274,16 +276,16 @@ bool topbot_datablock_filter(SQLFunctionCtx *pCtx, int32_t functionId, const cha ...@@ -274,16 +276,16 @@ bool topbot_datablock_filter(SQLFunctionCtx *pCtx, int32_t functionId, const cha
(_r)->initialized = false; \ (_r)->initialized = false; \
} while (0) } while (0)
void setResultInfoBuf(SResultInfo *pResInfo, int32_t size, bool superTable, char* buf); //void setResultInfoBuf(SResultRowCellInfo *pResInfo, char* buf);
static FORCE_INLINE void initResultInfo(SResultInfo *pResInfo) { static FORCE_INLINE void initResultInfo(SResultRowCellInfo *pResInfo, uint32_t bufLen) {
pResInfo->initialized = true; // the this struct has been initialized flag pResInfo->initialized = true; // the this struct has been initialized flag
pResInfo->complete = false; pResInfo->complete = false;
pResInfo->hasResult = false; pResInfo->hasResult = false;
pResInfo->numOfRes = 0; pResInfo->numOfRes = 0;
memset(pResInfo->interResultBuf, 0, (size_t)pResInfo->bufLen); memset(GET_ROWCELL_INTERBUF(pResInfo), 0, (size_t)bufLen);
} }
#ifdef __cplusplus #ifdef __cplusplus
......
此差异已折叠。
...@@ -407,9 +407,9 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf) { ...@@ -407,9 +407,9 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf) {
} }
if (pResultBuf->file != NULL) { if (pResultBuf->file != NULL) {
qDebug("QInfo:%p res output buffer closed, total:%.2f Kb, inmem size:%dbytes, file size:%"PRId64" bytes", qDebug("QInfo:%p res output buffer closed, total:%.2f Kb, inmem size:%.2f Kb, file size:%.2f",
pResultBuf->handle, pResultBuf->totalBufSize/1024.0, listNEles(pResultBuf->lruList) * pResultBuf->pageSize, pResultBuf->handle, pResultBuf->totalBufSize/1024.0, listNEles(pResultBuf->lruList) * pResultBuf->pageSize / 1024.0,
pResultBuf->fileSize); pResultBuf->fileSize/1024.0);
fclose(pResultBuf->file); fclose(pResultBuf->file);
} else { } else {
......
...@@ -27,7 +27,7 @@ int32_t getOutputInterResultBufSize(SQuery* pQuery) { ...@@ -27,7 +27,7 @@ int32_t getOutputInterResultBufSize(SQuery* pQuery) {
size += pQuery->pSelectExpr[i].interBytes; size += pQuery->pSelectExpr[i].interBytes;
} }
assert(size > 0); assert(size >= 0);
return size; return size;
} }
...@@ -41,26 +41,12 @@ int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRun ...@@ -41,26 +41,12 @@ int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRun
pWindowResInfo->size = 0; pWindowResInfo->size = 0;
pWindowResInfo->prevSKey = TSKEY_INITIAL_VAL; pWindowResInfo->prevSKey = TSKEY_INITIAL_VAL;
// SQueryCostInfo* pSummary = &pRuntimeEnv->summary;
pWindowResInfo->pResult = calloc(pWindowResInfo->capacity, POINTER_BYTES); pWindowResInfo->pResult = calloc(pWindowResInfo->capacity, POINTER_BYTES);
if (pWindowResInfo->pResult == NULL) { if (pWindowResInfo->pResult == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY;
} }
pWindowResInfo->interval = pRuntimeEnv->pQuery->interval.interval; pWindowResInfo->interval = pRuntimeEnv->pQuery->interval.interval;
// pSummary->winInfoSize += POINTER_BYTES * pWindowResInfo->capacity;
// pSummary->winInfoSize += (pRuntimeEnv->pQuery->numOfOutput * sizeof(SResultInfo) + pRuntimeEnv->interBufSize) * pWindowResInfo->capacity;
// pSummary->numOfTimeWindows = pWindowResInfo->capacity;
// for (int32_t i = 0; i < pWindowResInfo->capacity; ++i) {
// int32_t code = createQueryResultInfo(pRuntimeEnv->pQuery, pWindowResInfo->pResult[i], pRuntimeEnv->stableQuery, pRuntimeEnv->interBufSize);
// if (code != TSDB_CODE_SUCCESS) {
// return code;
// }
// }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -82,8 +68,8 @@ void resetTimeWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowR ...@@ -82,8 +68,8 @@ void resetTimeWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowR
} }
for (int32_t i = 0; i < pWindowResInfo->size; ++i) { for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
SWindowResult *pWindowRes = pWindowResInfo->pResult[i]; SResultRow *pWindowRes = pWindowResInfo->pResult[i];
clearTimeWindowResBuf(pRuntimeEnv, pWindowRes); clearResultRow(pRuntimeEnv, pWindowRes);
} }
pWindowResInfo->curIndex = -1; pWindowResInfo->curIndex = -1;
...@@ -108,7 +94,7 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) { ...@@ -108,7 +94,7 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
int16_t bytes = -1; int16_t bytes = -1;
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
SWindowResult *pResult = pWindowResInfo->pResult[i]; SResultRow *pResult = pWindowResInfo->pResult[i];
if (pResult->closed) { // remove the window slot from hash table if (pResult->closed) { // remove the window slot from hash table
// todo refactor // todo refactor
...@@ -131,19 +117,19 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) { ...@@ -131,19 +117,19 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
// clear all the closed windows from the window list // clear all the closed windows from the window list
for (int32_t k = 0; k < remain; ++k) { for (int32_t k = 0; k < remain; ++k) {
copyTimeWindowResBuf(pRuntimeEnv, pWindowResInfo->pResult[k], pWindowResInfo->pResult[num + k]); copyResultRow(pRuntimeEnv, pWindowResInfo->pResult[k], pWindowResInfo->pResult[num + k]);
} }
// move the unclosed window in the front of the window list // move the unclosed window in the front of the window list
for (int32_t k = remain; k < pWindowResInfo->size; ++k) { for (int32_t k = remain; k < pWindowResInfo->size; ++k) {
SWindowResult *pWindowRes = pWindowResInfo->pResult[k]; SResultRow *pWindowRes = pWindowResInfo->pResult[k];
clearTimeWindowResBuf(pRuntimeEnv, pWindowRes); clearResultRow(pRuntimeEnv, pWindowRes);
} }
pWindowResInfo->size = remain; pWindowResInfo->size = remain;
for (int32_t k = 0; k < pWindowResInfo->size; ++k) { for (int32_t k = 0; k < pWindowResInfo->size; ++k) {
SWindowResult *pResult = pWindowResInfo->pResult[k]; SResultRow *pResult = pWindowResInfo->pResult[k];
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
key = varDataVal(pResult->key); key = varDataVal(pResult->key);
...@@ -240,7 +226,7 @@ void closeTimeWindow(SWindowResInfo *pWindowResInfo, int32_t slot) { ...@@ -240,7 +226,7 @@ void closeTimeWindow(SWindowResInfo *pWindowResInfo, int32_t slot) {
getWindowResult(pWindowResInfo, slot)->closed = true; getWindowResult(pWindowResInfo, slot)->closed = true;
} }
void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindowRes) { void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pWindowRes) {
if (pWindowRes == NULL) { if (pWindowRes == NULL) {
return; return;
} }
...@@ -248,7 +234,7 @@ void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindow ...@@ -248,7 +234,7 @@ void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindow
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes->pageId); tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes->pageId);
for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfOutput; ++i) { for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfOutput; ++i) {
SResultInfo *pResultInfo = &pWindowRes->resultInfo[i]; SResultRowCellInfo *pResultInfo = &pWindowRes->pCellInfo[i];
char * s = getPosInResultPage(pRuntimeEnv, i, pWindowRes, page); char * s = getPosInResultPage(pRuntimeEnv, i, pWindowRes, page);
size_t size = pRuntimeEnv->pQuery->pSelectExpr[i].bytes; size_t size = pRuntimeEnv->pQuery->pSelectExpr[i].bytes;
...@@ -269,7 +255,7 @@ void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindow ...@@ -269,7 +255,7 @@ void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindow
* since the attribute of "Pos" is bound to each window result when the window result is created in the * since the attribute of "Pos" is bound to each window result when the window result is created in the
* disk-based result buffer. * disk-based result buffer.
*/ */
void copyTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *dst, const SWindowResult *src) { void copyResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *dst, const SResultRow *src) {
dst->numOfRows = src->numOfRows; dst->numOfRows = src->numOfRows;
dst->win = src->win; dst->win = src->win;
dst->closed = src->closed; dst->closed = src->closed;
...@@ -277,30 +263,35 @@ void copyTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *dst, con ...@@ -277,30 +263,35 @@ void copyTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *dst, con
int32_t nOutputCols = pRuntimeEnv->pQuery->numOfOutput; int32_t nOutputCols = pRuntimeEnv->pQuery->numOfOutput;
for (int32_t i = 0; i < nOutputCols; ++i) { for (int32_t i = 0; i < nOutputCols; ++i) {
SResultInfo *pDst = &dst->resultInfo[i]; SResultRowCellInfo *pDst = getResultCell(pRuntimeEnv, dst, i);
SResultInfo *pSrc = &src->resultInfo[i]; SResultRowCellInfo *pSrc = getResultCell(pRuntimeEnv, src, i);
char *buf = pDst->interResultBuf; // char *buf = pDst->interResultBuf;
memcpy(pDst, pSrc, sizeof(SResultInfo)); memcpy(pDst, pSrc, sizeof(SResultRowCellInfo) + pRuntimeEnv->pCtx[i].interBufBytes);
pDst->interResultBuf = buf; // restore the allocated buffer // pDst->interResultBuf = buf; // restore the allocated buffer
// copy the result info struct // copy the result info struct
memcpy(pDst->interResultBuf, pSrc->interResultBuf, pDst->bufLen); // memcpy(pDst->interResultBuf, pSrc->interResultBuf, pRuntimeEnv->pCtx[i].interBufBytes);
// copy the output buffer data from src to dst, the position info keep unchanged // copy the output buffer data from src to dst, the position info keep unchanged
tFilePage *dstpage = getResBufPage(pRuntimeEnv->pResultBuf, dst->pageId); tFilePage *dstpage = getResBufPage(pRuntimeEnv->pResultBuf, dst->pageId);
char * dstBuf = getPosInResultPage(pRuntimeEnv, i, dst, dstpage); char * dstBuf = getPosInResultPage(pRuntimeEnv, i, dst, dstpage);
tFilePage *srcpage = getResBufPage(pRuntimeEnv->pResultBuf, src->pageId); tFilePage *srcpage = getResBufPage(pRuntimeEnv->pResultBuf, src->pageId);
char * srcBuf = getPosInResultPage(pRuntimeEnv, i, (SWindowResult *)src, srcpage); char * srcBuf = getPosInResultPage(pRuntimeEnv, i, (SResultRow *)src, srcpage);
size_t s = pRuntimeEnv->pQuery->pSelectExpr[i].bytes; size_t s = pRuntimeEnv->pQuery->pSelectExpr[i].bytes;
memcpy(dstBuf, srcBuf, s); memcpy(dstBuf, srcBuf, s);
} }
} }
SResultRowCellInfo* getResultCell(SQueryRuntimeEnv* pRuntimeEnv, const SResultRow* pRow, int32_t index) {
assert(index >= 0 && index < pRuntimeEnv->pQuery->numOfOutput);
return (SResultRowCellInfo*)((char*) pRow->pCellInfo + pRuntimeEnv->rowCellInfoOffset[index]);
}
size_t getWindowResultSize(SQueryRuntimeEnv* pRuntimeEnv) { size_t getWindowResultSize(SQueryRuntimeEnv* pRuntimeEnv) {
return (pRuntimeEnv->pQuery->numOfOutput * sizeof(SResultInfo)) + pRuntimeEnv->interBufSize + sizeof(SWindowResult); return (pRuntimeEnv->pQuery->numOfOutput * sizeof(SResultRowCellInfo)) + pRuntimeEnv->interBufSize + sizeof(SResultRow);
} }
SWindowResultPool* initWindowResultPool(size_t size) { SWindowResultPool* initWindowResultPool(size_t size) {
...@@ -320,7 +311,7 @@ SWindowResultPool* initWindowResultPool(size_t size) { ...@@ -320,7 +311,7 @@ SWindowResultPool* initWindowResultPool(size_t size) {
return p; return p;
} }
SWindowResult* getNewWindowResult(SWindowResultPool* p) { SResultRow* getNewWindowResult(SWindowResultPool* p) {
if (p == NULL) { if (p == NULL) {
return NULL; return NULL;
} }
......
...@@ -31,13 +31,16 @@ extern "C" { ...@@ -31,13 +31,16 @@ extern "C" {
typedef void (*_hash_free_fn_t)(void *param); typedef void (*_hash_free_fn_t)(void *param);
typedef struct SHashNode { typedef struct SHashNode {
char *key; // char *key;
struct SHashNode *next; struct SHashNode *next;
uint32_t hashVal; // the hash value of key, if hashVal == HASH_VALUE_IN_TRASH, this node is moved to trash uint32_t hashVal; // the hash value of key
uint32_t keyLen; // length of the key uint32_t keyLen; // length of the key
char *data; // char *data;
} SHashNode; } SHashNode;
#define GET_HASH_NODE_KEY(_n) ((char*)(_n) + sizeof(SHashNode))
#define GET_HASH_NODE_DATA(_n) ((char*)(_n) + sizeof(SHashNode) + (_n)->keyLen)
typedef enum SHashLockTypeE { typedef enum SHashLockTypeE {
HASH_NO_LOCK = 0, HASH_NO_LOCK = 0,
HASH_ENTRY_LOCK = 1, HASH_ENTRY_LOCK = 1,
......
...@@ -22,14 +22,13 @@ ...@@ -22,14 +22,13 @@
#define DO_FREE_HASH_NODE(_n) \ #define DO_FREE_HASH_NODE(_n) \
do { \ do { \
taosTFree((_n)->data); \
taosTFree(_n); \ taosTFree(_n); \
} while (0) } while (0)
#define FREE_HASH_NODE(_h, _n) \ #define FREE_HASH_NODE(_h, _n) \
do { \ do { \
if ((_h)->freeFp) { \ if ((_h)->freeFp) { \
(_h)->freeFp((_n)->data); \ (_h)->freeFp(GET_HASH_NODE_DATA(_n)); \
} \ } \
\ \
DO_FREE_HASH_NODE(_n); \ DO_FREE_HASH_NODE(_n); \
...@@ -77,7 +76,7 @@ static FORCE_INLINE int32_t taosHashCapacity(int32_t length) { ...@@ -77,7 +76,7 @@ static FORCE_INLINE int32_t taosHashCapacity(int32_t length) {
static FORCE_INLINE SHashNode *doSearchInEntryList(SHashEntry *pe, const void *key, size_t keyLen, uint32_t hashVal) { static FORCE_INLINE SHashNode *doSearchInEntryList(SHashEntry *pe, const void *key, size_t keyLen, uint32_t hashVal) {
SHashNode *pNode = pe->next; SHashNode *pNode = pe->next;
while (pNode) { while (pNode) {
if ((pNode->keyLen == keyLen) && (memcmp(pNode->key, key, keyLen) == 0)) { if ((pNode->keyLen == keyLen) && (memcmp(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0)) {
assert(pNode->hashVal == hashVal); assert(pNode->hashVal == hashVal);
break; break;
} }
...@@ -115,11 +114,13 @@ static SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *p ...@@ -115,11 +114,13 @@ static SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *p
* @param dsize size of actual data * @param dsize size of actual data
* @return hash node * @return hash node
*/ */
static FORCE_INLINE SHashNode *doUpdateHashNode(SHashNode *pNode, SHashNode *pNewNode) { static FORCE_INLINE SHashNode *doUpdateHashNode(SHashNode* prev, SHashNode *pNode, SHashNode *pNewNode) {
assert(pNode->keyLen == pNewNode->keyLen); assert(pNode->keyLen == pNewNode->keyLen);
SWAP(pNode->key, pNewNode->key, void *); if (prev != NULL) {
SWAP(pNode->data, pNewNode->data, void *); prev->next = pNewNode;
}
pNewNode->next = pNode->next;
return pNewNode; return pNewNode;
} }
...@@ -208,12 +209,14 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da ...@@ -208,12 +209,14 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
assert(pNode == NULL); assert(pNode == NULL);
} }
SHashNode* prev = NULL;
while (pNode) { while (pNode) {
if ((pNode->keyLen == keyLen) && (memcmp(pNode->key, key, keyLen) == 0)) { if ((pNode->keyLen == keyLen) && (memcmp(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0)) {
assert(pNode->hashVal == hashVal); assert(pNode->hashVal == hashVal);
break; break;
} }
prev = pNode;
pNode = pNode->next; pNode = pNode->next;
} }
...@@ -239,7 +242,7 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da ...@@ -239,7 +242,7 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
} else { } else {
// not support the update operation, return error // not support the update operation, return error
if (pHashObj->enableUpdate) { if (pHashObj->enableUpdate) {
doUpdateHashNode(pNode, pNewNode); doUpdateHashNode(prev, pNode, pNewNode);
} }
if (pHashObj->type == HASH_ENTRY_LOCK) { if (pHashObj->type == HASH_ENTRY_LOCK) {
...@@ -293,13 +296,13 @@ void* taosHashGetCB(SHashObj *pHashObj, const void *key, size_t keyLen, void (*f ...@@ -293,13 +296,13 @@ void* taosHashGetCB(SHashObj *pHashObj, const void *key, size_t keyLen, void (*f
SHashNode *pNode = doSearchInEntryList(pe, key, keyLen, hashVal); SHashNode *pNode = doSearchInEntryList(pe, key, keyLen, hashVal);
if (pNode != NULL) { if (pNode != NULL) {
if (fp != NULL) { if (fp != NULL) {
fp(pNode->data); fp(GET_HASH_NODE_DATA(pNode));
} }
if (d != NULL) { if (d != NULL) {
memcpy(d, pNode->data, dsize); memcpy(d, GET_HASH_NODE_DATA(pNode), dsize);
} else { } else {
data = pNode->data; data = GET_HASH_NODE_DATA(pNode);
} }
} }
...@@ -357,13 +360,13 @@ int32_t taosHashRemoveWithData(SHashObj *pHashObj, const void *key, size_t keyLe ...@@ -357,13 +360,13 @@ int32_t taosHashRemoveWithData(SHashObj *pHashObj, const void *key, size_t keyLe
SHashNode *pRes = NULL; SHashNode *pRes = NULL;
// remove it // remove it
if ((pNode->keyLen == keyLen) && (memcmp(pNode->key, key, keyLen) == 0)) { if ((pNode->keyLen == keyLen) && (memcmp(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0)) {
pe->num -= 1; pe->num -= 1;
pRes = pNode; pRes = pNode;
pe->next = pNode->next; pe->next = pNode->next;
} else { } else {
while (pNode->next != NULL) { while (pNode->next != NULL) {
if (((pNode->next)->keyLen == keyLen) && (memcmp((pNode->next)->key, key, keyLen) == 0)) { if (((pNode->next)->keyLen == keyLen) && (memcmp(GET_HASH_NODE_KEY((pNode->next)), key, keyLen) == 0)) {
assert((pNode->next)->hashVal == hashVal); assert((pNode->next)->hashVal == hashVal);
break; break;
} }
...@@ -392,7 +395,7 @@ int32_t taosHashRemoveWithData(SHashObj *pHashObj, const void *key, size_t keyLe ...@@ -392,7 +395,7 @@ int32_t taosHashRemoveWithData(SHashObj *pHashObj, const void *key, size_t keyLe
__rd_unlock(&pHashObj->lock, pHashObj->type); __rd_unlock(&pHashObj->lock, pHashObj->type);
if (data != NULL && pRes != NULL) { if (data != NULL && pRes != NULL) {
memcpy(data, pRes->data, dsize); memcpy(data, GET_HASH_NODE_DATA(pRes), dsize);
} }
if (pRes != NULL) { if (pRes != NULL) {
...@@ -426,7 +429,7 @@ int32_t taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), voi ...@@ -426,7 +429,7 @@ int32_t taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), voi
// todo remove the first node // todo remove the first node
SHashNode *pNode = NULL; SHashNode *pNode = NULL;
while((pNode = pEntry->next) != NULL) { while((pNode = pEntry->next) != NULL) {
if (fp && (!fp(param, pNode->data))) { if (fp && (!fp(param, GET_HASH_NODE_DATA(pNode)))) {
pEntry->num -= 1; pEntry->num -= 1;
atomic_sub_fetch_64(&pHashObj->size, 1); atomic_sub_fetch_64(&pHashObj->size, 1);
...@@ -451,7 +454,7 @@ int32_t taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), voi ...@@ -451,7 +454,7 @@ int32_t taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), voi
while ((pNext = pNode->next) != NULL) { while ((pNext = pNode->next) != NULL) {
// not qualified, remove it // not qualified, remove it
if (fp && (!fp(param, pNext->data))) { if (fp && (!fp(param, GET_HASH_NODE_DATA(pNext)))) {
pNode->next = pNext->next; pNode->next = pNext->next;
pEntry->num -= 1; pEntry->num -= 1;
atomic_sub_fetch_64(&pHashObj->size, 1); atomic_sub_fetch_64(&pHashObj->size, 1);
...@@ -605,7 +608,7 @@ bool taosHashIterNext(SHashMutableIterator *pIter) { ...@@ -605,7 +608,7 @@ bool taosHashIterNext(SHashMutableIterator *pIter) {
} }
} }
void *taosHashIterGet(SHashMutableIterator *iter) { return (iter == NULL) ? NULL : iter->pCur->data; } void *taosHashIterGet(SHashMutableIterator *iter) { return (iter == NULL) ? NULL : GET_HASH_NODE_DATA(iter->pCur); }
void *taosHashDestroyIter(SHashMutableIterator *iter) { void *taosHashDestroyIter(SHashMutableIterator *iter) {
if (iter == NULL) { if (iter == NULL) {
...@@ -743,21 +746,19 @@ void taosHashTableResize(SHashObj *pHashObj) { ...@@ -743,21 +746,19 @@ void taosHashTableResize(SHashObj *pHashObj) {
} }
SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, size_t dsize, uint32_t hashVal) { SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, size_t dsize, uint32_t hashVal) {
SHashNode *pNewNode = calloc(1, sizeof(SHashNode)); SHashNode *pNewNode = calloc(1, sizeof(SHashNode) + keyLen + dsize);
if (pNewNode == NULL) { if (pNewNode == NULL) {
uError("failed to allocate memory, reason:%s", strerror(errno)); uError("failed to allocate memory, reason:%s", strerror(errno));
return NULL; return NULL;
} }
pNewNode->data = malloc(dsize + keyLen);
memcpy(pNewNode->data, pData, dsize);
pNewNode->key = pNewNode->data + dsize;
memcpy(pNewNode->key, key, keyLen);
pNewNode->keyLen = (uint32_t)keyLen; pNewNode->keyLen = (uint32_t)keyLen;
pNewNode->hashVal = hashVal; pNewNode->hashVal = hashVal;
memcpy(GET_HASH_NODE_DATA(pNewNode), pData, dsize);
memcpy(GET_HASH_NODE_KEY(pNewNode), key, keyLen);
return pNewNode; return pNewNode;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册