提交 a8e86c9a 编写于 作者: H Haojun Liao

[TD-1870]

上级 b8bead9a
......@@ -70,7 +70,8 @@ typedef struct SResultRec {
typedef struct SWindowResInfo {
SWindowResult* pResult; // result list
SHashObj* hashList; // hash list for quick access
// uint64_t uid; // table uid, in order to identify the result from global hash table
// SHashObj* hashList; // hash list for quick access
int16_t type; // data type for hash key
int32_t capacity; // max capacity
int32_t curIndex; // current start active index
......@@ -177,6 +178,8 @@ typedef struct SQueryRuntimeEnv {
int32_t interBufSize; // intermediate buffer sizse
int32_t prevGroupId; // previous executed group id
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
SHashObj* pWindowHashTable; // quick locate the window object for each result
char* keyBuf; // window key buffer
} SQueryRuntimeEnv;
enum {
......
......@@ -15,6 +15,15 @@
#ifndef TDENGINE_QUERYUTIL_H
#define TDENGINE_QUERYUTIL_H
#define SET_RES_WINDOW_KEY(_k, _ori, _len, _uid) \
do { \
assert(sizeof(_uid) == sizeof(uint64_t)); \
*(uint64_t *)(_k) = (_uid); \
memcpy((_k) + sizeof(uint64_t), (_ori), (_len)); \
} while (0)
#define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t))
int32_t getOutputInterResultBufSize(SQuery* pQuery);
void clearTimeWindowResBuf(SQueryRuntimeEnv* pRuntimeEnv, SWindowResult* pOneOutputRes);
......
......@@ -448,10 +448,11 @@ static bool hasNullValue(SColIndex* pColIndex, SDataStatis *pStatis, SDataStatis
}
static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowResInfo, char *pData,
int16_t bytes, bool masterscan) {
int16_t bytes, bool masterscan, uint64_t uid) {
SQuery *pQuery = pRuntimeEnv->pQuery;
int32_t *p1 = (int32_t *) taosHashGet(pWindowResInfo->hashList, pData, bytes);
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid);
int32_t *p1 = (int32_t *) taosHashGet(pRuntimeEnv->pWindowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
if (p1 != NULL) {
pWindowResInfo->curIndex = *p1;
} else {
......@@ -495,7 +496,7 @@ static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWin
// add a new result set for a new group
pWindowResInfo->curIndex = pWindowResInfo->size++;
taosHashPut(pWindowResInfo->hashList, pData, bytes, (char *)&pWindowResInfo->curIndex, sizeof(int32_t));
taosHashPut(pRuntimeEnv->pWindowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), (char *)&pWindowResInfo->curIndex, sizeof(int32_t));
}
// too many time window in query
......@@ -555,7 +556,7 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t
return w;
}
static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SDiskbasedResultBuf *pResultBuf, int32_t sid,
static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SDiskbasedResultBuf *pResultBuf, int32_t tid,
int32_t numOfRowsPerPage) {
if (pWindowRes->pageId != -1) {
return 0;
......@@ -565,10 +566,10 @@ static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SDiskbasedResult
// in the first scan, new space needed for results
int32_t pageId = -1;
SIDList list = getDataBufPagesIdList(pResultBuf, sid);
SIDList list = getDataBufPagesIdList(pResultBuf, tid);
if (taosArrayGetSize(list) == 0) {
pData = getNewDataBuf(pResultBuf, sid, &pageId);
pData = getNewDataBuf(pResultBuf, tid, &pageId);
} else {
SPageInfo* pi = getLastPageInfo(list);
pData = getResBufPage(pResultBuf, pi->pageId);
......@@ -578,7 +579,7 @@ static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SDiskbasedResult
// release current page first, and prepare the next one
releaseResBufPageInfo(pResultBuf, pi);
pData = getNewDataBuf(pResultBuf, sid, &pageId);
pData = getNewDataBuf(pResultBuf, tid, &pageId);
if (pData != NULL) {
assert(pData->num == 0); // number of elements must be 0 for new allocated buffer
}
......@@ -600,13 +601,12 @@ static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SDiskbasedResult
return 0;
}
static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowResInfo, int32_t sid,
static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowResInfo, SDataBlockInfo* pBockInfo,
STimeWindow *win, bool masterscan, bool* newWind) {
assert(win->skey <= win->ekey);
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&win->skey,
TSDB_KEYSIZE, masterscan);
SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&win->skey, TSDB_KEYSIZE, masterscan, pBockInfo->uid);
if (pWindowRes == NULL) {
*newWind = false;
......@@ -617,7 +617,7 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowRes
// not assign result buffer yet, add new result buffer
if (pWindowRes->pageId == -1) {
int32_t ret = addNewWindowResultBuf(pWindowRes, pResultBuf, sid, pRuntimeEnv->numOfRowsPerPage);
int32_t ret = addNewWindowResultBuf(pWindowRes, pResultBuf, pBockInfo->tid, pRuntimeEnv->numOfRowsPerPage);
if (ret != TSDB_CODE_SUCCESS) {
return -1;
}
......@@ -1024,8 +1024,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
bool hasTimeWindow = false;
STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery);
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &win, masterScan, &hasTimeWindow) !=
TSDB_CODE_SUCCESS) {
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &win, masterScan, &hasTimeWindow) != TSDB_CODE_SUCCESS) {
taosTFree(sasArray);
return;
}
......@@ -1053,8 +1052,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
// null data, failed to allocate more memory buffer
hasTimeWindow = false;
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &nextWin, masterScan,
&hasTimeWindow) != TSDB_CODE_SUCCESS) {
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &nextWin, masterScan, &hasTimeWindow) != TSDB_CODE_SUCCESS) {
break;
}
......@@ -1112,12 +1110,13 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat
len = varDataLen(pData);
} else if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) {
SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv);
qError("QInfo:%p group by not supported on double/float/binary/nchar columns, abort", pQInfo);
qError("QInfo:%p group by not supported on double/float columns, abort", pQInfo);
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
}
SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, d, len, true);
uint64_t uid = 0; // uid is always set to be 0.
SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, d, len, true, uid);
if (pWindowRes == NULL) {
return -1;
}
......@@ -1335,7 +1334,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery);
bool hasTimeWindow = false;
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &win, masterScan, &hasTimeWindow);
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &win, masterScan, &hasTimeWindow);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
continue;
}
......@@ -1363,7 +1362,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
// null data, failed to allocate more memory buffer
hasTimeWindow = false;
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &nextWin, masterScan, &hasTimeWindow) != TSDB_CODE_SUCCESS) {
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &nextWin, masterScan, &hasTimeWindow) != TSDB_CODE_SUCCESS) {
break;
}
......@@ -1764,6 +1763,10 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle);
pRuntimeEnv->pTSBuf = tsBufDestroy(pRuntimeEnv->pTSBuf);
taosTFree(pRuntimeEnv->keyBuf);
taosHashCleanup(pRuntimeEnv->pWindowHashTable);
pRuntimeEnv->pWindowHashTable = NULL;
}
#define IS_QUERY_KILLED(_q) ((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED)
......@@ -2262,7 +2265,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo * pW
TSKEY k = QUERY_IS_ASC_QUERY(pQuery)? pBlockInfo->window.skey:pBlockInfo->window.ekey;
STimeWindow win = getActiveTimeWindow(pWindowResInfo, k, pQuery);
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pBlockInfo->tid, &win, masterScan, &hasTimeWindow) !=
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pBlockInfo, &win, masterScan, &hasTimeWindow) !=
TSDB_CODE_SUCCESS) {
// todo handle error in set result for timewindow
}
......@@ -3787,8 +3790,9 @@ void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) {
return;
}
uint64_t uid = 0; // uid is always set to be 0
SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&groupIndex,
sizeof(groupIndex), true);
sizeof(groupIndex), true, uid);
if (pWindowRes == NULL) {
return;
}
......@@ -4246,7 +4250,7 @@ static void queryCostStatis(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQueryCostInfo *pSummary = &pRuntimeEnv->summary;
uint64_t hashSize = taosHashGetMemSize(pQInfo->runtimeEnv.windowResInfo.hashList);
uint64_t hashSize = taosHashGetMemSize(pQInfo->runtimeEnv.pWindowHashTable);
hashSize += taosHashGetMemSize(pQInfo->tableqinfoGroupInfo.map);
int32_t numOfGroup = GET_NUM_OF_TABLEGROUP(pQInfo);
......@@ -4255,9 +4259,9 @@ static void queryCostStatis(SQInfo *pQInfo) {
int32_t numOfTables = taosArrayGetSize(pa);
for(int32_t j = 0; j < numOfTables; ++j) {
STableQueryInfo* pTableQueryInfo = taosArrayGetP(pa, j);
// STableQueryInfo* pTableQueryInfo = taosArrayGetP(pa, j);
hashSize += taosHashGetMemSize(pTableQueryInfo->windowResInfo.hashList);
// hashSize += taosHashGetMemSize(pTableQueryInfo->windowResInfo.hashList);
}
}
......@@ -6323,7 +6327,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
pQInfo->tableqinfoGroupInfo.pGroupList = taosArrayInit(numOfGroups, POINTER_BYTES);
pQInfo->tableqinfoGroupInfo.numOfTables = pTableGroupInfo->numOfTables;
pQInfo->tableqinfoGroupInfo.map = taosHashInit(pTableGroupInfo->numOfTables,
taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
}
int tableIndex = 0;
......@@ -6331,6 +6335,9 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
pQInfo->runtimeEnv.interBufSize = getOutputInterResultBufSize(pQuery);
pQInfo->runtimeEnv.summary.tableInfoSize += (pTableGroupInfo->numOfTables * sizeof(STableQueryInfo));
pQInfo->runtimeEnv.pWindowHashTable = taosHashInit(pTableGroupInfo->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
pQInfo->runtimeEnv.keyBuf = malloc(TSDB_MAX_BYTES_PER_ROW);
pQInfo->pBuf = calloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo));
if (pQInfo->pBuf == NULL) {
goto _cleanup;
......
......@@ -407,14 +407,14 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf) {
}
if (pResultBuf->file != NULL) {
qDebug("QInfo:%p res output buffer closed, total:%" PRId64 " B, inmem size:%dbytes, file size:%"PRId64" bytes",
pResultBuf->handle, pResultBuf->totalBufSize, listNEles(pResultBuf->lruList) * pResultBuf->pageSize,
qDebug("QInfo:%p res output buffer closed, total:%.2f Kb, inmem size:%dbytes, file size:%"PRId64" bytes",
pResultBuf->handle, pResultBuf->totalBufSize/1024.0, listNEles(pResultBuf->lruList) * pResultBuf->pageSize,
pResultBuf->fileSize);
fclose(pResultBuf->file);
} else {
qDebug("QInfo:%p res output buffer closed, total:%" PRId64 " bytes, no file created", pResultBuf->handle,
pResultBuf->totalBufSize);
qDebug("QInfo:%p res output buffer closed, total:%.2f Kb, no file created", pResultBuf->handle,
pResultBuf->totalBufSize/1024.0);
}
unlink(pResultBuf->path);
......
......@@ -36,12 +36,6 @@ int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRun
pWindowResInfo->threshold = threshold;
pWindowResInfo->type = type;
_hash_fn_t fn = taosGetDefaultHashFunction(type);
pWindowResInfo->hashList = taosHashInit(threshold, fn, true, false);
if (pWindowResInfo->hashList == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
pWindowResInfo->curIndex = -1;
pWindowResInfo->size = 0;
pWindowResInfo->prevSKey = TSKEY_INITIAL_VAL;
......@@ -83,7 +77,7 @@ void cleanupTimeWindowInfo(SWindowResInfo *pWindowResInfo) {
return;
}
if (pWindowResInfo->capacity == 0) {
assert(pWindowResInfo->hashList == NULL && pWindowResInfo->pResult == NULL);
assert(/*pWindowResInfo->hashList == NULL && */pWindowResInfo->pResult == NULL);
return;
}
......@@ -93,7 +87,7 @@ void cleanupTimeWindowInfo(SWindowResInfo *pWindowResInfo) {
}
}
taosHashCleanup(pWindowResInfo->hashList);
// taosHashCleanup(pWindowResInfo->hashList);
taosTFree(pWindowResInfo->pResult);
}
......@@ -108,11 +102,11 @@ void resetTimeWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowR
}
pWindowResInfo->curIndex = -1;
taosHashCleanup(pWindowResInfo->hashList);
// taosHashCleanup(pWindowResInfo->hashList);
pWindowResInfo->size = 0;
_hash_fn_t fn = taosGetDefaultHashFunction(pWindowResInfo->type);
pWindowResInfo->hashList = taosHashInit(pWindowResInfo->capacity, fn, true, false);
// _hash_fn_t fn = taosGetDefaultHashFunction(pWindowResInfo->type);
// pWindowResInfo->hashList = taosHashInit(pWindowResInfo->capacity, fn, true, false);
pWindowResInfo->startTime = TSKEY_INITIAL_VAL;
pWindowResInfo->prevSKey = TSKEY_INITIAL_VAL;
......@@ -127,10 +121,10 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
int32_t numOfClosed = numOfClosedTimeWindow(pWindowResInfo);
assert(num >= 0 && num <= numOfClosed);
int16_t type = pWindowResInfo->type;
char *key = NULL;
int16_t bytes = -1;
int16_t type = pWindowResInfo->type;
uint64_t uid = 0; // uid is always set to be 0.
char *key = NULL;
int16_t bytes = -1;
for (int32_t i = 0; i < num; ++i) {
SWindowResult *pResult = &pWindowResInfo->pResult[i];
......@@ -145,7 +139,8 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
bytes = tDataTypeDesc[pWindowResInfo->type].nSize;
}
taosHashRemove(pWindowResInfo->hashList, (const char *)key, bytes);
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, uid);
taosHashRemove(pRuntimeEnv->pWindowHashTable, (const char *)pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
} else {
break;
}
......@@ -177,12 +172,15 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
bytes = tDataTypeDesc[pWindowResInfo->type].nSize;
}
int32_t *p = (int32_t *)taosHashGet(pWindowResInfo->hashList, (const char *)key, bytes);
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, uid);
int32_t *p = (int32_t *)taosHashGet(pRuntimeEnv->pWindowHashTable, (const char *)pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
assert(p != NULL);
int32_t v = (*p - num);
assert(v >= 0 && v <= pWindowResInfo->size);
taosHashPut(pWindowResInfo->hashList, (char *)key, bytes, (char *)&v, sizeof(int32_t));
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, uid);
taosHashPut(pRuntimeEnv->pWindowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), (char *)&v, sizeof(int32_t));
}
pWindowResInfo->curIndex = -1;
......
......@@ -804,5 +804,5 @@ size_t taosHashGetMemSize(const SHashObj *pHashObj) {
return 0;
}
return (pHashObj->capacity * sizeof(SHashEntry) + POINTER_BYTES) + sizeof(SHashNode) * taosHashGetSize(pHashObj);
return (pHashObj->capacity * (sizeof(SHashEntry) + POINTER_BYTES)) + sizeof(SHashNode) * taosHashGetSize(pHashObj) + sizeof(SHashObj);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册