未验证 提交 99aca608 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #6816 from taosdata/feature/query

[td-5126]<enhance>:improve the nest query performance.
...@@ -277,6 +277,7 @@ typedef struct SQueryRuntimeEnv { ...@@ -277,6 +277,7 @@ typedef struct SQueryRuntimeEnv {
bool enableGroupData; bool enableGroupData;
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
SHashObj* pResultRowHashTable; // quick locate the window object for each result SHashObj* pResultRowHashTable; // quick locate the window object for each result
SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not
char* keyBuf; // window key buffer char* keyBuf; // window key buffer
SResultRowPool* pool; // window result object pool SResultRowPool* pool; // window result object pool
char** prevRow; char** prevRow;
......
...@@ -410,25 +410,25 @@ static void prepareResultListBuffer(SResultRowInfo* pResultRowInfo, SQueryRuntim ...@@ -410,25 +410,25 @@ static void prepareResultListBuffer(SResultRowInfo* pResultRowInfo, SQueryRuntim
pResultRowInfo->capacity = (int32_t)newCapacity; pResultRowInfo->capacity = (int32_t)newCapacity;
} }
static int32_t ascResultRowCompareFn(const void* p1, const void* p2) { //static int32_t ascResultRowCompareFn(const void* p1, const void* p2) {
SResultRow* pRow1 = *(SResultRow**)p1; // SResultRow* pRow1 = *(SResultRow**)p1;
SResultRow* pRow2 = *(SResultRow**)p2; // SResultRow* pRow2 = *(SResultRow**)p2;
//
if (pRow1 == pRow2) { // if (pRow1 == pRow2) {
return 0; // return 0;
} else { // } else {
return pRow1->win.skey < pRow2->win.skey? -1:1; // return pRow1->win.skey < pRow2->win.skey? -1:1;
} // }
} //}
static int32_t descResultRowCompareFn(const void* p1, const void* p2) { //static int32_t descResultRowCompareFn(const void* p1, const void* p2) {
return -ascResultRowCompareFn(p1, p2); // return -ascResultRowCompareFn(p1, p2);
} //}
static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, char *pData, static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, int64_t tid, char *pData,
int16_t bytes, bool masterscan, uint64_t uid) { int16_t bytes, bool masterscan, uint64_t tableGroupId) {
bool existed = false; bool existed = false;
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid); SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, tableGroupId);
SResultRow **p1 = SResultRow **p1 =
(SResultRow **)taosHashGet(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); (SResultRow **)taosHashGet(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
...@@ -446,16 +446,20 @@ static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SRes ...@@ -446,16 +446,20 @@ static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SRes
existed = false; existed = false;
} else if (pResultRowInfo->size == 1) { } else if (pResultRowInfo->size == 1) {
existed = (pResultRowInfo->pResult[0] == (*p1)); existed = (pResultRowInfo->pResult[0] == (*p1));
} else { } else { // check if current pResultRowInfo contains the existed pResultRow
__compar_fn_t fn = QUERY_IS_ASC_QUERY(pRuntimeEnv->pQueryAttr)? ascResultRowCompareFn:descResultRowCompareFn; SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, tid);
void* ptr = taosbsearch(p1, pResultRowInfo->pResult, pResultRowInfo->size, POINTER_BYTES, fn, TD_EQ); void* ptr = taosHashGet(pRuntimeEnv->pResultRowListSet, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
if (ptr != NULL) { existed = (ptr != NULL);
existed = true; // __compar_fn_t fn = QUERY_IS_ASC_QUERY(pRuntimeEnv->pQueryAttr)? ascResultRowCompareFn:descResultRowCompareFn;
} // void* ptr = taosbsearch(p1, pResultRowInfo->pResult, pResultRowInfo->size, POINTER_BYTES, fn, TD_EQ);
// if (ptr != NULL) {
// existed = true;
// }
} }
} }
} else { } else {
if (p1 != NULL) { // group by column query // In case of group by column query, the required SResultRow object must be existed in the pResultRowInfo object.
if (p1 != NULL) {
return *p1; return *p1;
} }
} }
...@@ -479,6 +483,10 @@ static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SRes ...@@ -479,6 +483,10 @@ static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SRes
pResultRowInfo->pResult[pResultRowInfo->size++] = pResult; pResultRowInfo->pResult[pResultRowInfo->size++] = pResult;
pResultRowInfo->current = pResult; pResultRowInfo->current = pResult;
int64_t dummyVal = 0;
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, tid);
taosHashPut(pRuntimeEnv->pResultRowListSet, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &dummyVal, POINTER_BYTES);
} }
// too many time window in query // too many time window in query
...@@ -614,13 +622,13 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf ...@@ -614,13 +622,13 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf
return 0; return 0;
} }
static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, STimeWindow *win, static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, int64_t tid, STimeWindow *win,
bool masterscan, SResultRow **pResult, int64_t groupId, SQLFunctionCtx* pCtx, bool masterscan, SResultRow **pResult, int64_t tableGroupId, SQLFunctionCtx* pCtx,
int32_t numOfOutput, int32_t* rowCellInfoOffset) { int32_t numOfOutput, int32_t* rowCellInfoOffset) {
assert(win->skey <= win->ekey); assert(win->skey <= win->ekey);
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char *)&win->skey, TSDB_KEYSIZE, masterscan, groupId); SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, tid, (char *)&win->skey, TSDB_KEYSIZE, masterscan, tableGroupId);
if (pResultRow == NULL) { if (pResultRow == NULL) {
*pResult = NULL; *pResult = NULL;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -628,7 +636,7 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRow ...@@ -628,7 +636,7 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRow
// not assign result buffer yet, add new result buffer // not assign result buffer yet, add new result buffer
if (pResultRow->pageId == -1) { if (pResultRow->pageId == -1) {
int32_t ret = addNewWindowResultBuf(pResultRow, pResultBuf, (int32_t) groupId, pRuntimeEnv->pQueryAttr->intermediateResultRowSize); int32_t ret = addNewWindowResultBuf(pResultRow, pResultBuf, (int32_t) tableGroupId, pRuntimeEnv->pQueryAttr->intermediateResultRowSize);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
return -1; return -1;
} }
...@@ -1247,7 +1255,7 @@ static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBloc ...@@ -1247,7 +1255,7 @@ static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBloc
} }
} }
static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock, int32_t groupId) { static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock, int32_t tableGroupId) {
STableIntervalOperatorInfo* pInfo = (STableIntervalOperatorInfo*) pOperatorInfo->info; STableIntervalOperatorInfo* pInfo = (STableIntervalOperatorInfo*) pOperatorInfo->info;
SQueryRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv; SQueryRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv;
...@@ -1275,7 +1283,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -1275,7 +1283,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &win, masterScan, &pResult, groupId, pInfo->pCtx, int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &win, masterScan, &pResult, tableGroupId, pInfo->pCtx,
numOfOutput, pInfo->rowCellInfoOffset); numOfOutput, pInfo->rowCellInfoOffset);
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
...@@ -1304,7 +1312,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -1304,7 +1312,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
} }
STimeWindow w = pRes->win; STimeWindow w = pRes->win;
ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &w, masterScan, &pResult, groupId, pInfo->pCtx, ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &w, masterScan, &pResult, tableGroupId, pInfo->pCtx,
numOfOutput, pInfo->rowCellInfoOffset); numOfOutput, pInfo->rowCellInfoOffset);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
...@@ -1322,7 +1330,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -1322,7 +1330,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
} }
// restore current time window // restore current time window
ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &win, masterScan, &pResult, groupId, pInfo->pCtx, ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &win, masterScan, &pResult, tableGroupId, pInfo->pCtx,
numOfOutput, pInfo->rowCellInfoOffset); numOfOutput, pInfo->rowCellInfoOffset);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
...@@ -1342,7 +1350,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -1342,7 +1350,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
} }
// null data, failed to allocate more memory buffer // null data, failed to allocate more memory buffer
int32_t code = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &nextWin, masterScan, &pResult, groupId, int32_t code = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &nextWin, masterScan, &pResult, tableGroupId,
pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset); pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) { if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
...@@ -1483,7 +1491,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInf ...@@ -1483,7 +1491,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInf
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
pInfo->curWindow.ekey = pInfo->curWindow.skey; pInfo->curWindow.ekey = pInfo->curWindow.skey;
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, &pInfo->curWindow, masterScan, int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, pSDataBlock->info.tid, &pInfo->curWindow, masterScan,
&pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput, &pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput,
pBInfo->rowCellInfoOffset); pBInfo->rowCellInfoOffset);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
...@@ -1504,7 +1512,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInf ...@@ -1504,7 +1512,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInf
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
pInfo->curWindow.ekey = pInfo->curWindow.skey; pInfo->curWindow.ekey = pInfo->curWindow.skey;
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, &pInfo->curWindow, masterScan, int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, pSDataBlock->info.tid, &pInfo->curWindow, masterScan,
&pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput, &pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput,
pBInfo->rowCellInfoOffset); pBInfo->rowCellInfoOffset);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
...@@ -1547,7 +1555,8 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasic ...@@ -1547,7 +1555,8 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasic
len = varDataLen(pData); len = varDataLen(pData);
} }
SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, d, len, true, groupIndex); int64_t tid = 0;
SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, tid, d, len, true, groupIndex);
assert (pResultRow != NULL); assert (pResultRow != NULL);
setResultRowKey(pResultRow, pData, type); setResultRowKey(pResultRow, pData, type);
...@@ -1811,6 +1820,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -1811,6 +1820,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
pRuntimeEnv->pQueryAttr = pQueryAttr; pRuntimeEnv->pQueryAttr = pQueryAttr;
pRuntimeEnv->pResultRowHashTable = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pRuntimeEnv->pResultRowHashTable = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
pRuntimeEnv->pResultRowListSet = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
pRuntimeEnv->keyBuf = malloc(pQueryAttr->maxTableColumnWidth + sizeof(int64_t)); pRuntimeEnv->keyBuf = malloc(pQueryAttr->maxTableColumnWidth + sizeof(int64_t));
pRuntimeEnv->pool = initResultRowPool(getResultRowSize(pRuntimeEnv)); pRuntimeEnv->pool = initResultRowPool(getResultRowSize(pRuntimeEnv));
...@@ -2060,6 +2070,9 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -2060,6 +2070,9 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
taosHashCleanup(pRuntimeEnv->pTableRetrieveTsMap); taosHashCleanup(pRuntimeEnv->pTableRetrieveTsMap);
pRuntimeEnv->pTableRetrieveTsMap = NULL; pRuntimeEnv->pTableRetrieveTsMap = NULL;
taosHashCleanup(pRuntimeEnv->pResultRowListSet);
pRuntimeEnv->pResultRowListSet = NULL;
destroyOperatorInfo(pRuntimeEnv->proot); destroyOperatorInfo(pRuntimeEnv->proot);
pRuntimeEnv->pool = destroyResultRowPool(pRuntimeEnv->pool); pRuntimeEnv->pool = destroyResultRowPool(pRuntimeEnv->pool);
...@@ -2790,7 +2803,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa ...@@ -2790,7 +2803,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey; TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;
STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr); STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
if (setWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, groupId, if (setWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.tid, &win, masterScan, &pResult, groupId,
pTableScanInfo->pCtx, pTableScanInfo->numOfOutput, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) { pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
...@@ -2836,7 +2849,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa ...@@ -2836,7 +2849,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey; TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;
STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr); STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
if (setWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, groupId, if (setWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.tid, &win, masterScan, &pResult, groupId,
pTableScanInfo->pCtx, pTableScanInfo->numOfOutput, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) { pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
...@@ -3250,8 +3263,8 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i ...@@ -3250,8 +3263,8 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i
int32_t* rowCellInfoOffset = pInfo->rowCellInfoOffset; int32_t* rowCellInfoOffset = pInfo->rowCellInfoOffset;
SResultRowInfo* pResultRowInfo = &pInfo->resultRowInfo; SResultRowInfo* pResultRowInfo = &pInfo->resultRowInfo;
int32_t tid = 0; int64_t tid = 0;
SResultRow* pRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char *)&tid, sizeof(tid), true, uid); SResultRow* pRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, tid, (char *)&tid, sizeof(tid), true, uid);
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
SColumnInfoData* pData = taosArrayGet(pDataBlock->pDataBlock, i); SColumnInfoData* pData = taosArrayGet(pDataBlock->pDataBlock, i);
...@@ -3482,10 +3495,13 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe ...@@ -3482,10 +3495,13 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe
} }
void doSetTableGroupOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, SQLFunctionCtx* pCtx, void doSetTableGroupOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, SQLFunctionCtx* pCtx,
int32_t* rowCellInfoOffset, int32_t numOfOutput, int32_t groupIndex) { int32_t* rowCellInfoOffset, int32_t numOfOutput, int32_t tableGroupId) {
// for simple group by query without interval, all the tables belong to one group result.
int64_t uid = 0; int64_t uid = 0;
int64_t tid = 0;
SResultRow* pResultRow = SResultRow* pResultRow =
doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char*)&groupIndex, sizeof(groupIndex), true, uid); doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, tid, (char*)&tableGroupId, sizeof(tableGroupId), true, uid);
assert (pResultRow != NULL); assert (pResultRow != NULL);
/* /*
...@@ -3493,7 +3509,7 @@ void doSetTableGroupOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pRe ...@@ -3493,7 +3509,7 @@ void doSetTableGroupOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pRe
* all group belong to one result set, and each group result has different group id so set the id to be one * all group belong to one result set, and each group result has different group id so set the id to be one
*/ */
if (pResultRow->pageId == -1) { if (pResultRow->pageId == -1) {
int32_t ret = addNewWindowResultBuf(pResultRow, pRuntimeEnv->pResultBuf, groupIndex, pRuntimeEnv->pQueryAttr->resultRowSize); int32_t ret = addNewWindowResultBuf(pResultRow, pRuntimeEnv->pResultBuf, tableGroupId, pRuntimeEnv->pQueryAttr->resultRowSize);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
return; return;
} }
...@@ -3502,20 +3518,20 @@ void doSetTableGroupOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pRe ...@@ -3502,20 +3518,20 @@ void doSetTableGroupOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pRe
setResultRowOutputBufInitCtx(pRuntimeEnv, pResultRow, pCtx, numOfOutput, rowCellInfoOffset); setResultRowOutputBufInitCtx(pRuntimeEnv, pResultRow, pCtx, numOfOutput, rowCellInfoOffset);
} }
void setExecutionContext(SQueryRuntimeEnv* pRuntimeEnv, SOptrBasicInfo* pInfo, int32_t numOfOutput, int32_t groupIndex, void setExecutionContext(SQueryRuntimeEnv* pRuntimeEnv, SOptrBasicInfo* pInfo, int32_t numOfOutput, int32_t tableGroupId,
TSKEY nextKey) { TSKEY nextKey) {
STableQueryInfo *pTableQueryInfo = pRuntimeEnv->current; STableQueryInfo *pTableQueryInfo = pRuntimeEnv->current;
// lastKey needs to be updated // lastKey needs to be updated
pTableQueryInfo->lastKey = nextKey; pTableQueryInfo->lastKey = nextKey;
if (pRuntimeEnv->prevGroupId != INT32_MIN && pRuntimeEnv->prevGroupId == groupIndex) { if (pRuntimeEnv->prevGroupId != INT32_MIN && pRuntimeEnv->prevGroupId == tableGroupId) {
return; return;
} }
doSetTableGroupOutputBuf(pRuntimeEnv, &pInfo->resultRowInfo, pInfo->pCtx, pInfo->rowCellInfoOffset, numOfOutput, groupIndex); doSetTableGroupOutputBuf(pRuntimeEnv, &pInfo->resultRowInfo, pInfo->pCtx, pInfo->rowCellInfoOffset, numOfOutput, tableGroupId);
// record the current active group id // record the current active group id
pRuntimeEnv->prevGroupId = groupIndex; pRuntimeEnv->prevGroupId = tableGroupId;
} }
void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx, void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx,
...@@ -5500,7 +5516,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI ...@@ -5500,7 +5516,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
} else { } else {
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
pInfo->curWindow.ekey = pInfo->curWindow.skey; pInfo->curWindow.ekey = pInfo->curWindow.skey;
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, &pInfo->curWindow, masterScan, int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, pSDataBlock->info.tid, &pInfo->curWindow, masterScan,
&pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput, &pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput,
pBInfo->rowCellInfoOffset); pBInfo->rowCellInfoOffset);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
...@@ -5520,7 +5536,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI ...@@ -5520,7 +5536,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
pInfo->curWindow.ekey = pInfo->curWindow.skey; pInfo->curWindow.ekey = pInfo->curWindow.skey;
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, &pInfo->curWindow, masterScan, int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, pSDataBlock->info.tid, &pInfo->curWindow, masterScan,
&pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput, &pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput,
pBInfo->rowCellInfoOffset); pBInfo->rowCellInfoOffset);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 1
system sh/exec.sh -n dnode1 -s start
sleep 100
sql connect
sleep 100
print ========== sub_in_from.sim
$i = 0
$dbPrefix = subdb
$tbPrefix = sub_tb
$stbPrefix = sub_stb
$tbNum = 10
$rowNum = 1000
$totalNum = $tbNum * $rowNum
$loops = 200000
$log = 10000
$ts0 = 1537146000000
$delta = 600000
$i = 0
$db = $dbPrefix . $i
$stb = $stbPrefix . $i
sql drop database $db -x step1
step1:
sql create database $db cache 16 maxrows 4096 keep 36500
print ====== create tables
sql use $db
sql create table $stb (ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 smallint, c6 tinyint, c7 bool, c8 binary(10), c9 nchar(10)) tags(t1 int)
$i = 0
$ts = $ts0
$halfNum = $tbNum / 2
while $i < $halfNum
$tbId = $i + $halfNum
$tb = $tbPrefix . $i
$tb1 = $tbPrefix . $tbId
sql create table $tb using $stb tags( $i )
sql create table $tb1 using $stb tags( $tbId )
$x = 0
while $x < $rowNum
$xs = $x * $delta
$ts = $ts0 + $xs
$c = $x / 10
$c = $c * 10
$c = $x - $c
$binary = 'binary . $c
$binary = $binary . '
$nchar = 'nchar . $c
$nchar = $nchar . '
sql insert into $tb values ( $ts , $c , $c , $c , $c , $c , $c , true, $binary , $nchar )
sql insert into $tb1 values ( $ts , $c , NULL , $c , NULL , $c , $c , true, $binary , $nchar )
$x = $x + 1
endw
$i = $i + 1
endw
print ====== tables created
sql_error select count(*) from (select count(*) from abc.sub_stb0)
sql_error select val + 20 from (select count(*) from sub_stb0 interval(10h))
sql_error select abc+20 from (select count(*) from sub_stb0 interval(1s))
sql select count(*) from (select count(*) from sub_stb0 interval(10h))
if $rows != 1 then
return -1
endi
if $data00 != 18 then
print expect 18, actual: $data00
return -1
endi
sql select ts from (select count(*) from sub_stb0 interval(10h))
if $rows != 18 then
return -1
endi
if $data00 != @18-09-17 04:00:00.000@ then
return -1
endi
if $data01 != @18-09-17 14:00:00.000@ then
return -1
endi
sql select val + 20, val from (select count(*) as val from sub_stb0 interval(10h))
if $rows != 18 then
return -1
endi
if $data00 != 320.000000 then
return -1
endi
if $data01 != 300 then
return -1
endi
if $data10 != 620 then
return -1
endi
if $data11 != 600 then
return -1
endi
if $data20 != 620 then
return -1
endi
if $data21 != 600 then
return -1
endi
sql select max(val), min(val), max(val) - min(val) from (select count(*) val from sub_stb0 interval(10h))
if $rows != 1 then
return -1
endi
if $data00 != 600 then
return -1
endi
if $data01 != 100 then
return -1
endi
if $data02 != 500.000000 then
return -1
endi
sql select first(ts,val),last(ts,val) from (select count(*) val from sub_stb0 interval(10h))
sql select top(val, 5) from (select count(*) val from sub_stb0 interval(10h))
sql select diff(val) from (select count(*) val from sub_stb0 interval(10h))
sql select apercentile(val, 50) from (select count(*) val from sub_stb0 interval(10h))
# not support yet
sql select percentile(val, 50) from (select count(*) val from sub_stb0 interval(10h))
sql select stddev(val) from (select count(*) val from sub_stb0 interval(10h))
print ====================>complex query
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册