提交 2a7e0171 编写于 作者: H Haojun Liao

[td-13039] support interval query.

上级 c440e553
...@@ -589,7 +589,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbQueryHandle, SQueryRunt ...@@ -589,7 +589,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbQueryHandle, SQueryRunt
SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createAllTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createAllTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult); SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult);
......
...@@ -2166,7 +2166,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -2166,7 +2166,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
} }
case OP_TimeWindow: { case OP_TimeWindow: {
pRuntimeEnv->proot = pRuntimeEnv->proot =
createTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); createIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType;
if (opType != OP_DummyInput && opType != OP_Join) { if (opType != OP_DummyInput && opType != OP_Join) {
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
...@@ -6756,7 +6756,7 @@ SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI ...@@ -6756,7 +6756,7 @@ SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI
return pOperator; return pOperator;
} }
SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
......
...@@ -32,7 +32,7 @@ typedef struct SDiskbasedBuf SDiskbasedBuf; ...@@ -32,7 +32,7 @@ typedef struct SDiskbasedBuf SDiskbasedBuf;
#define DEFAULT_INTERN_BUF_PAGE_SIZE (1024L) // in bytes #define DEFAULT_INTERN_BUF_PAGE_SIZE (1024L) // in bytes
typedef struct SFilePage { typedef struct SFilePage {
int64_t num; int32_t num;
char data[]; char data[];
} SFilePage; } SFilePage;
......
...@@ -127,6 +127,7 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con ...@@ -127,6 +127,7 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con
case TSDB_DATA_TYPE_USMALLINT: {*(int16_t*) p = *(int16_t*) pData;break;} case TSDB_DATA_TYPE_USMALLINT: {*(int16_t*) p = *(int16_t*) pData;break;}
case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_UINT: {*(int32_t*) p = *(int32_t*) pData;break;} case TSDB_DATA_TYPE_UINT: {*(int32_t*) p = *(int32_t*) pData;break;}
case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_BIGINT: case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_UBIGINT: {*(int64_t*) p = *(int64_t*) pData;break;} case TSDB_DATA_TYPE_UBIGINT: {*(int64_t*) p = *(int64_t*) pData;break;}
default: default:
......
...@@ -68,6 +68,7 @@ typedef struct SResultRow { ...@@ -68,6 +68,7 @@ typedef struct SResultRow {
} SResultRow; } SResultRow;
typedef struct SResultRowInfo { typedef struct SResultRowInfo {
SResultRow *pCurResult; // current active result row info
SResultRow** pResult; // result list SResultRow** pResult; // result list
// int16_t type:8; // data type for hash key // int16_t type:8; // data type for hash key
int32_t size; // number of result set int32_t size; // number of result set
......
...@@ -453,7 +453,19 @@ typedef struct SAggSupporter { ...@@ -453,7 +453,19 @@ typedef struct SAggSupporter {
SResultRowPool *pool; // The window result objects pool, all the resultRow Objects are allocated and managed by this object. SResultRowPool *pool; // The window result objects pool, all the resultRow Objects are allocated and managed by this object.
} SAggSupporter; } SAggSupporter;
typedef struct SOptrBasicInfo STableIntervalOperatorInfo; typedef struct STableIntervalOperatorInfo {
SOptrBasicInfo binfo;
SDiskbasedBuf *pResultBuf; // query result buffer based on blocked-wised disk file
SGroupResInfo groupResInfo;
SInterval interval;
STimeWindow win;
int32_t precision;
bool timeWindowInterpo;
char **pRow;
SAggSupporter aggSup;
STableQueryInfo *pCurrent;
int32_t order;
} STableIntervalOperatorInfo;
typedef struct SAggOperatorInfo { typedef struct SAggOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
...@@ -606,8 +618,8 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray ...@@ -606,8 +618,8 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray
SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr,
int32_t numOfOutput); int32_t numOfOutput);
SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream); SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream);
SOperatorInfo* createTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo);
int32_t numOfOutput);
SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
SExprInfo* pExpr, int32_t numOfOutput); SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr,
......
...@@ -54,7 +54,6 @@ int32_t getOutputInterResultBufSize(STaskAttr* pQueryAttr) { ...@@ -54,7 +54,6 @@ int32_t getOutputInterResultBufSize(STaskAttr* pQueryAttr) {
} }
int32_t initResultRowInfo(SResultRowInfo *pResultRowInfo, int32_t size) { int32_t initResultRowInfo(SResultRowInfo *pResultRowInfo, int32_t size) {
// pResultRowInfo->type = type;
pResultRowInfo->size = 0; pResultRowInfo->size = 0;
pResultRowInfo->curPos = -1; pResultRowInfo->curPos = -1;
pResultRowInfo->capacity = size; pResultRowInfo->capacity = size;
......
...@@ -141,19 +141,19 @@ static int32_t getExprFunctionId(SExprInfo *pExprInfo) { ...@@ -141,19 +141,19 @@ static int32_t getExprFunctionId(SExprInfo *pExprInfo) {
return 0; return 0;
} }
static void getNextTimeWindow(STaskAttr* pQueryAttr, STimeWindow* tw) { static void getNextTimeWindow(SInterval* pInterval, int32_t precision, int32_t order, STimeWindow* tw) {
int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order); int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order);
if (pQueryAttr->interval.intervalUnit != 'n' && pQueryAttr->interval.intervalUnit != 'y') { if (pInterval->intervalUnit != 'n' && pInterval->intervalUnit != 'y') {
tw->skey += pQueryAttr->interval.sliding * factor; tw->skey += pInterval->sliding * factor;
tw->ekey = tw->skey + pQueryAttr->interval.interval - 1; tw->ekey = tw->skey + pInterval->interval - 1;
return; return;
} }
int64_t key = tw->skey, interval = pQueryAttr->interval.interval; int64_t key = tw->skey, interval = pInterval->interval;
//convert key to second //convert key to second
key = convertTimePrecision(key, pQueryAttr->precision, TSDB_TIME_PRECISION_MILLI) / 1000; key = convertTimePrecision(key, precision, TSDB_TIME_PRECISION_MILLI) / 1000;
if (pQueryAttr->interval.intervalUnit == 'y') { if (pInterval->intervalUnit == 'y') {
interval *= 12; interval *= 12;
} }
...@@ -164,12 +164,12 @@ static void getNextTimeWindow(STaskAttr* pQueryAttr, STimeWindow* tw) { ...@@ -164,12 +164,12 @@ static void getNextTimeWindow(STaskAttr* pQueryAttr, STimeWindow* tw) {
int mon = (int)(tm.tm_year * 12 + tm.tm_mon + interval * factor); int mon = (int)(tm.tm_year * 12 + tm.tm_mon + interval * factor);
tm.tm_year = mon / 12; tm.tm_year = mon / 12;
tm.tm_mon = mon % 12; tm.tm_mon = mon % 12;
tw->skey = convertTimePrecision((int64_t)mktime(&tm) * 1000L, TSDB_TIME_PRECISION_MILLI, pQueryAttr->precision); tw->skey = convertTimePrecision((int64_t)mktime(&tm) * 1000L, TSDB_TIME_PRECISION_MILLI, precision);
mon = (int)(mon + interval); mon = (int)(mon + interval);
tm.tm_year = mon / 12; tm.tm_year = mon / 12;
tm.tm_mon = mon % 12; tm.tm_mon = mon % 12;
tw->ekey = convertTimePrecision((int64_t)mktime(&tm) * 1000L, TSDB_TIME_PRECISION_MILLI, pQueryAttr->precision); tw->ekey = convertTimePrecision((int64_t)mktime(&tm) * 1000L, TSDB_TIME_PRECISION_MILLI, precision);
tw->ekey -= 1; tw->ekey -= 1;
} }
...@@ -179,7 +179,7 @@ static void setResultOutputBuf(STaskRuntimeEnv* pRuntimeEnv, SResultRow* pResult ...@@ -179,7 +179,7 @@ static void setResultOutputBuf(STaskRuntimeEnv* pRuntimeEnv, SResultRow* pResult
int32_t numOfCols, int32_t* rowCellInfoOffset); int32_t numOfCols, int32_t* rowCellInfoOffset);
void setResultRowOutputBufInitCtx(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset); void setResultRowOutputBufInitCtx(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset);
static bool functionNeedToExecute(STaskRuntimeEnv *pRuntimeEnv, SqlFunctionCtx *pCtx); static bool functionNeedToExecute(SqlFunctionCtx *pCtx);
static void setBlockStatisInfo(SqlFunctionCtx *pCtx, SSDataBlock* pSDataBlock, SColumn* pColumn); static void setBlockStatisInfo(SqlFunctionCtx *pCtx, SSDataBlock* pSDataBlock, SColumn* pColumn);
...@@ -222,7 +222,8 @@ static int32_t getGroupbyColumnIndex(SGroupbyExpr *pGroupbyExpr, SSDataBlock* pD ...@@ -222,7 +222,8 @@ static int32_t getGroupbyColumnIndex(SGroupbyExpr *pGroupbyExpr, SSDataBlock* pD
static int32_t setGroupResultOutputBuf(STaskRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *binf, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex); static int32_t setGroupResultOutputBuf(STaskRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *binf, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex);
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size); static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
static void getAlignQueryTimeWindow(STaskAttr *pQueryAttr, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win); static void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win);
static void setResultBufSize(STaskAttr* pQueryAttr, SRspResultInfo* pResultInfo); static void setResultBufSize(STaskAttr* pQueryAttr, SRspResultInfo* pResultInfo);
static void setCtxTagForJoin(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable); static void setCtxTagForJoin(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable);
static void setParamForStableStddev(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr); static void setParamForStableStddev(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr);
...@@ -394,6 +395,10 @@ static void prepareResultListBuffer(SResultRowInfo* pResultRowInfo, jmp_buf env) ...@@ -394,6 +395,10 @@ static void prepareResultListBuffer(SResultRowInfo* pResultRowInfo, jmp_buf env)
newCapacity = (int64_t)(pResultRowInfo->capacity * 1.5); newCapacity = (int64_t)(pResultRowInfo->capacity * 1.5);
} }
if (newCapacity == pResultRowInfo->capacity) {
newCapacity += 4;
}
char *t = realloc(pResultRowInfo->pResult, (size_t)(newCapacity * POINTER_BYTES)); char *t = realloc(pResultRowInfo->pResult, (size_t)(newCapacity * POINTER_BYTES));
if (t == NULL) { if (t == NULL) {
longjmp(env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(env, TSDB_CODE_QRY_OUT_OF_MEMORY);
...@@ -593,19 +598,19 @@ static SResultRow* doSetResultOutBufByKey_rv(SResultRowInfo* pResultRowInfo, int ...@@ -593,19 +598,19 @@ static SResultRow* doSetResultOutBufByKey_rv(SResultRowInfo* pResultRowInfo, int
return pResultRowInfo->pResult[pResultRowInfo->curPos]; return pResultRowInfo->pResult[pResultRowInfo->curPos];
} }
static void getInitialStartTimeWindow(STaskAttr* pQueryAttr, TSKEY ts, STimeWindow* w) { static void getInitialStartTimeWindow(SInterval* pInterval, int32_t precision, TSKEY ts, STimeWindow* w, TSKEY ekey, bool ascQuery) {
if (QUERY_IS_ASC_QUERY(pQueryAttr)) { if (ascQuery) {
getAlignQueryTimeWindow(pQueryAttr, ts, ts, pQueryAttr->window.ekey, w); getAlignQueryTimeWindow(pInterval, precision, ts, ts, ekey, w);
} else { } else {
// the start position of the first time window in the endpoint that spreads beyond the queried last timestamp // the start position of the first time window in the endpoint that spreads beyond the queried last timestamp
getAlignQueryTimeWindow(pQueryAttr, ts, pQueryAttr->window.ekey, ts, w); getAlignQueryTimeWindow(pInterval, precision, ts, ekey, ts, w);
int64_t key = w->skey; int64_t key = w->skey;
while(key < ts) { // moving towards end while(key < ts) { // moving towards end
if (pQueryAttr->interval.intervalUnit == 'n' || pQueryAttr->interval.intervalUnit == 'y') { if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
key = taosTimeAdd(key, pQueryAttr->interval.sliding, pQueryAttr->interval.slidingUnit, pQueryAttr->precision); key = taosTimeAdd(key, pInterval->sliding, pInterval->slidingUnit, precision);
} else { } else {
key += pQueryAttr->interval.sliding; key += pInterval->sliding;
} }
if (key >= ts) { if (key >= ts) {
...@@ -618,39 +623,39 @@ static void getInitialStartTimeWindow(STaskAttr* pQueryAttr, TSKEY ts, STimeWind ...@@ -618,39 +623,39 @@ static void getInitialStartTimeWindow(STaskAttr* pQueryAttr, TSKEY ts, STimeWind
} }
// get the correct time window according to the handled timestamp // get the correct time window according to the handled timestamp
static STimeWindow getActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t ts, STaskAttr *pQueryAttr) { static STimeWindow getActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t ts, SInterval* pInterval, int32_t precision, STimeWindow* win) {
STimeWindow w = {0}; STimeWindow w = {0};
if (pResultRowInfo->curPos == -1) { // the first window, from the previous stored value if (pResultRowInfo->curPos == -1) { // the first window, from the previous stored value
getInitialStartTimeWindow(pQueryAttr, ts, &w); getInitialStartTimeWindow(pInterval, precision, ts, &w, win->ekey, true);
if (pQueryAttr->interval.intervalUnit == 'n' || pQueryAttr->interval.intervalUnit == 'y') { if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
w.ekey = taosTimeAdd(w.skey, pQueryAttr->interval.interval, pQueryAttr->interval.intervalUnit, pQueryAttr->precision) - 1; w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
} else { } else {
w.ekey = w.skey + pQueryAttr->interval.interval - 1; w.ekey = w.skey + pInterval->interval - 1;
} }
} else { } else {
w = getResultRow(pResultRowInfo, pResultRowInfo->curPos)->win; w = getResultRow(pResultRowInfo, pResultRowInfo->curPos)->win;
} }
if (w.skey > ts || w.ekey < ts) { if (w.skey > ts || w.ekey < ts) {
if (pQueryAttr->interval.intervalUnit == 'n' || pQueryAttr->interval.intervalUnit == 'y') { if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
w.skey = taosTimeTruncate(ts, &pQueryAttr->interval, pQueryAttr->precision); w.skey = taosTimeTruncate(ts, pInterval, precision);
w.ekey = taosTimeAdd(w.skey, pQueryAttr->interval.interval, pQueryAttr->interval.intervalUnit, pQueryAttr->precision) - 1; w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
} else { } else {
int64_t st = w.skey; int64_t st = w.skey;
if (st > ts) { if (st > ts) {
st -= ((st - ts + pQueryAttr->interval.sliding - 1) / pQueryAttr->interval.sliding) * pQueryAttr->interval.sliding; st -= ((st - ts + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
} }
int64_t et = st + pQueryAttr->interval.interval - 1; int64_t et = st + pInterval->interval - 1;
if (et < ts) { if (et < ts) {
st += ((ts - et + pQueryAttr->interval.sliding - 1) / pQueryAttr->interval.sliding) * pQueryAttr->interval.sliding; st += ((ts - et + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
} }
w.skey = st; w.skey = st;
w.ekey = w.skey + pQueryAttr->interval.interval - 1; w.ekey = w.skey + pInterval->interval - 1;
} }
} }
...@@ -658,10 +663,7 @@ static STimeWindow getActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t ...@@ -658,10 +663,7 @@ static STimeWindow getActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t
* query border check, skey should not be bounded by the query time range, since the value skey will * query border check, skey should not be bounded by the query time range, since the value skey will
* be used as the time window index value. So we only change ekey of time window accordingly. * be used as the time window index value. So we only change ekey of time window accordingly.
*/ */
if (w.ekey > pQueryAttr->window.ekey && QUERY_IS_ASC_QUERY(pQueryAttr)) { // ASSERT(win->skey <= win->ekey); // todo no need this
w.ekey = pQueryAttr->window.ekey;
}
return w; return w;
} }
...@@ -670,7 +672,7 @@ static STimeWindow getCurrentActiveTimeWindow(SResultRowInfo * pResultRowInfo, i ...@@ -670,7 +672,7 @@ static STimeWindow getCurrentActiveTimeWindow(SResultRowInfo * pResultRowInfo, i
STimeWindow w = {0}; STimeWindow w = {0};
if (pResultRowInfo->curPos == -1) { // the first window, from the previous stored value if (pResultRowInfo->curPos == -1) { // the first window, from the previous stored value
getInitialStartTimeWindow(pQueryAttr, ts, &w); // getInitialStartTimeWindow(pQueryAttr, ts, &w);
if (pQueryAttr->interval.intervalUnit == 'n' || pQueryAttr->interval.intervalUnit == 'y') { if (pQueryAttr->interval.intervalUnit == 'n' || pQueryAttr->interval.intervalUnit == 'y') {
w.ekey = taosTimeAdd(w.skey, pQueryAttr->interval.interval, pQueryAttr->interval.intervalUnit, pQueryAttr->precision) - 1; w.ekey = taosTimeAdd(w.skey, pQueryAttr->interval.interval, pQueryAttr->interval.intervalUnit, pQueryAttr->precision) - 1;
...@@ -772,6 +774,37 @@ static int32_t setResultOutputBufByKey(STaskRuntimeEnv *pRuntimeEnv, SResultRowI ...@@ -772,6 +774,37 @@ static int32_t setResultOutputBufByKey(STaskRuntimeEnv *pRuntimeEnv, SResultRowI
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void setResultRowOutputBufInitCtx_rv(SDiskbasedBuf * pBuf, SResultRow *pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset);
static int32_t setResultOutputBufByKey_rv(SResultRowInfo *pResultRowInfo, int64_t id, STimeWindow *win,
bool masterscan, SResultRow **pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
int32_t numOfOutput, int32_t* rowCellInfoOffset, SDiskbasedBuf *pBuf, SAggSupporter *pAggSup, SExecTaskInfo* pTaskInfo) {
assert(win->skey <= win->ekey);
SResultRow *pResultRow = doSetResultOutBufByKey_rv(pResultRowInfo, id, (char *)&win->skey, TSDB_KEYSIZE, masterscan, tableGroupId,
pTaskInfo, true, pAggSup);
if (pResultRow == NULL) {
*pResult = NULL;
return TSDB_CODE_SUCCESS;
}
// not assign result buffer yet, add new result buffer
if (pResultRow->pageId == -1) { // todo intermediate result size
int32_t ret = addNewWindowResultBuf(pResultRow, pBuf, (int32_t) tableGroupId, 0);
if (ret != TSDB_CODE_SUCCESS) {
return -1;
}
}
// set time window for current result
pResultRow->win = (*win);
*pResult = pResultRow;
setResultRowOutputBufInitCtx_rv(pBuf, pResultRow, pCtx, numOfOutput, rowCellInfoOffset);
return TSDB_CODE_SUCCESS;
}
static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) { static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) {
assert(pResult != NULL && (type == RESULT_ROW_START_INTERP || type == RESULT_ROW_END_INTERP)); assert(pResult != NULL && (type == RESULT_ROW_START_INTERP || type == RESULT_ROW_END_INTERP));
if (type == RESULT_ROW_START_INTERP) { if (type == RESULT_ROW_START_INTERP) {
...@@ -873,48 +906,44 @@ static void doUpdateResultRowIndex(SResultRowInfo*pResultRowInfo, TSKEY lastKey, ...@@ -873,48 +906,44 @@ static void doUpdateResultRowIndex(SResultRowInfo*pResultRowInfo, TSKEY lastKey,
} }
} }
static void updateResultRowInfoActiveIndex(SResultRowInfo* pResultRowInfo, STaskAttr* pQueryAttr, TSKEY lastKey) { static void updateResultRowInfoActiveIndex(SResultRowInfo* pResultRowInfo, const STimeWindow* pWin, TSKEY lastKey, bool ascQuery, bool interp) {
bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr); if ((lastKey > pWin->ekey && ascQuery) || (lastKey < pWin->ekey && (!ascQuery))) {
if ((lastKey > pQueryAttr->window.ekey && ascQuery) || (lastKey < pQueryAttr->window.ekey && (!ascQuery))) {
closeAllResultRows(pResultRowInfo); closeAllResultRows(pResultRowInfo);
pResultRowInfo->curPos = pResultRowInfo->size - 1; pResultRowInfo->curPos = pResultRowInfo->size - 1;
} else { } else {
int32_t step = ascQuery ? 1 : -1; int32_t step = ascQuery ? 1 : -1;
doUpdateResultRowIndex(pResultRowInfo, lastKey - step, ascQuery, pQueryAttr->timeWindowInterpo); doUpdateResultRowIndex(pResultRowInfo, lastKey - step, ascQuery, interp);
} }
} }
static int32_t getNumOfRowsInTimeWindow(STaskRuntimeEnv* pRuntimeEnv, SDataBlockInfo *pDataBlockInfo, TSKEY *pPrimaryColumn, static int32_t getNumOfRowsInTimeWindow(SDataBlockInfo *pDataBlockInfo, TSKEY *pPrimaryColumn,
int32_t startPos, TSKEY ekey, __block_search_fn_t searchFn, bool updateLastKey) { int32_t startPos, TSKEY ekey, __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order) {
assert(startPos >= 0 && startPos < pDataBlockInfo->rows); assert(startPos >= 0 && startPos < pDataBlockInfo->rows);
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
STableQueryInfo* item = pRuntimeEnv->current;
int32_t num = -1; int32_t num = -1;
int32_t order = pQueryAttr->order.order;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(order); int32_t step = GET_FORWARD_DIRECTION_FACTOR(order);
if (QUERY_IS_ASC_QUERY(pQueryAttr)) { if (order == TSDB_ORDER_ASC) {
if (ekey < pDataBlockInfo->window.ekey && pPrimaryColumn) { if (ekey < pDataBlockInfo->window.ekey && pPrimaryColumn) {
num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn); num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
if (updateLastKey) { // update the last key if (item != NULL) {
item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step; item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
} }
} else { } else {
num = pDataBlockInfo->rows - startPos; num = pDataBlockInfo->rows - startPos;
if (updateLastKey) { if (item != NULL) {
item->lastKey = pDataBlockInfo->window.ekey + step; item->lastKey = pDataBlockInfo->window.ekey + step;
} }
} }
} else { // desc } else { // desc
if (ekey > pDataBlockInfo->window.skey && pPrimaryColumn) { if (ekey > pDataBlockInfo->window.skey && pPrimaryColumn) {
num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn); num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
if (updateLastKey) { // update the last key if (item != NULL) {
item->lastKey = pPrimaryColumn[startPos - (num - 1)] + step; item->lastKey = pPrimaryColumn[startPos - (num - 1)] + step;
} }
} else { } else {
num = startPos + 1; num = startPos + 1;
if (updateLastKey) { if (item != NULL) {
item->lastKey = pDataBlockInfo->window.skey + step; item->lastKey = pDataBlockInfo->window.skey + step;
} }
} }
...@@ -924,22 +953,18 @@ static int32_t getNumOfRowsInTimeWindow(STaskRuntimeEnv* pRuntimeEnv, SDataBlock ...@@ -924,22 +953,18 @@ static int32_t getNumOfRowsInTimeWindow(STaskRuntimeEnv* pRuntimeEnv, SDataBlock
return num; return num;
} }
static void doApplyFunctions(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, STimeWindow* pWin, int32_t offset, static void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, int32_t offset, int32_t forwardStep, TSKEY* tsCol,
int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput) { int32_t numOfTotal, int32_t numOfOutput, int32_t order) {
STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
bool hasAggregates = pCtx[0].isAggSet;
for (int32_t k = 0; k < numOfOutput; ++k) { for (int32_t k = 0; k < numOfOutput; ++k) {
pCtx[k].size = forwardStep; pCtx[k].size = forwardStep;
pCtx[k].startTs = pWin->skey; pCtx[k].startTs = pWin->skey;
// keep it temporarialy // keep it temporarialy
char* start = NULL;//pCtx[k].pInput; int32_t startOffset = pCtx[k].startRow;
bool hasAgg = pCtx[k].isAggSet;
int32_t pos = (QUERY_IS_ASC_QUERY(pQueryAttr)) ? offset : offset - (forwardStep - 1); int32_t pos = (order == TSDB_ORDER_ASC) ? offset : offset - (forwardStep - 1);
if (pCtx[k].pInput != NULL) { pCtx[k].startRow = pos;
// pCtx[k].pInput = (char *)pCtx[k].pInput + pos * pCtx[k].inputBytes;
}
if (tsCol != NULL) { if (tsCol != NULL) {
pCtx[k].ptsList = &tsCol[pos]; pCtx[k].ptsList = &tsCol[pos];
...@@ -951,88 +976,80 @@ static void doApplyFunctions(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, ...@@ -951,88 +976,80 @@ static void doApplyFunctions(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx,
pCtx[k].isAggSet = false; pCtx[k].isAggSet = false;
} }
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k])) { if (functionNeedToExecute(&pCtx[k])) {
pCtx[k].fpSet->addInput(&pCtx[k]); pCtx[k].fpSet->addInput(&pCtx[k]);
} }
// restore it // restore it
pCtx[k].isAggSet = hasAggregates; pCtx[k].isAggSet = hasAgg;
// pCtx[k].pInput = start; pCtx[k].startRow = startOffset;
} }
} }
static int32_t getNextQualifiedWindow(STaskAttr* pQueryAttr, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo, static int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
TSKEY* primaryKeys, __block_search_fn_t searchFn, int32_t prevPosition) { TSKEY* primaryKeys, int32_t prevPosition, STableIntervalOperatorInfo* pInfo) {
getNextTimeWindow(pQueryAttr, pNext); int32_t order = pInfo->order;
bool ascQuery = (order == TSDB_ORDER_ASC);
int32_t precision = pInfo->precision;
getNextTimeWindow(pInterval, precision, order, pNext);
// next time window is not in current block // next time window is not in current block
if ((pNext->skey > pDataBlockInfo->window.ekey && QUERY_IS_ASC_QUERY(pQueryAttr)) || if ((pNext->skey > pDataBlockInfo->window.ekey && order == TSDB_ORDER_ASC) ||
(pNext->ekey < pDataBlockInfo->window.skey && !QUERY_IS_ASC_QUERY(pQueryAttr))) { (pNext->ekey < pDataBlockInfo->window.skey && order == TSDB_ORDER_DESC)) {
return -1; return -1;
} }
TSKEY startKey = -1; TSKEY startKey = ascQuery? pNext->skey:pNext->ekey;
if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
startKey = pNext->skey;
if (startKey < pQueryAttr->window.skey) {
startKey = pQueryAttr->window.skey;
}
} else {
startKey = pNext->ekey;
if (startKey > pQueryAttr->window.skey) {
startKey = pQueryAttr->window.skey;
}
}
int32_t startPos = 0; int32_t startPos = 0;
// tumbling time window query, a special case of sliding time window query // tumbling time window query, a special case of sliding time window query
if (pQueryAttr->interval.sliding == pQueryAttr->interval.interval && prevPosition != -1) { if (pInterval->sliding == pInterval->interval && prevPosition != -1) {
int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order); int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order);
startPos = prevPosition + factor; startPos = prevPosition + factor;
} else { } else {
if (startKey <= pDataBlockInfo->window.skey && QUERY_IS_ASC_QUERY(pQueryAttr)) { if (startKey <= pDataBlockInfo->window.skey && ascQuery) {
startPos = 0; startPos = 0;
} else if (startKey >= pDataBlockInfo->window.ekey && !QUERY_IS_ASC_QUERY(pQueryAttr)) { } else if (startKey >= pDataBlockInfo->window.ekey && !ascQuery) {
startPos = pDataBlockInfo->rows - 1; startPos = pDataBlockInfo->rows - 1;
} else { } else {
startPos = searchFn((char *)primaryKeys, pDataBlockInfo->rows, startKey, pQueryAttr->order.order); startPos = binarySearchForKey((char *)primaryKeys, pDataBlockInfo->rows, startKey, order);
} }
} }
/* interp query with fill should not skip time window */ /* interp query with fill should not skip time window */
if (pQueryAttr->pointInterpQuery && pQueryAttr->fillType != TSDB_FILL_NONE) { // if (pQueryAttr->pointInterpQuery && pQueryAttr->fillType != TSDB_FILL_NONE) {
return startPos; // return startPos;
} // }
/* /*
* This time window does not cover any data, try next time window, * This time window does not cover any data, try next time window,
* this case may happen when the time window is too small * this case may happen when the time window is too small
*/ */
if (primaryKeys == NULL) { if (primaryKeys == NULL) {
if (QUERY_IS_ASC_QUERY(pQueryAttr)) { if (ascQuery) {
assert(pDataBlockInfo->window.skey <= pNext->ekey); assert(pDataBlockInfo->window.skey <= pNext->ekey);
} else { } else {
assert(pDataBlockInfo->window.ekey >= pNext->skey); assert(pDataBlockInfo->window.ekey >= pNext->skey);
} }
} else { } else {
if (QUERY_IS_ASC_QUERY(pQueryAttr) && primaryKeys[startPos] > pNext->ekey) { if (ascQuery && primaryKeys[startPos] > pNext->ekey) {
TSKEY next = primaryKeys[startPos]; TSKEY next = primaryKeys[startPos];
if (pQueryAttr->interval.intervalUnit == 'n' || pQueryAttr->interval.intervalUnit == 'y') { if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
pNext->skey = taosTimeTruncate(next, &pQueryAttr->interval, pQueryAttr->precision); pNext->skey = taosTimeTruncate(next, pInterval, precision);
pNext->ekey = taosTimeAdd(pNext->skey, pQueryAttr->interval.interval, pQueryAttr->interval.intervalUnit, pQueryAttr->precision) - 1; pNext->ekey = taosTimeAdd(pNext->skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
} else { } else {
pNext->ekey += ((next - pNext->ekey + pQueryAttr->interval.sliding - 1)/pQueryAttr->interval.sliding) * pQueryAttr->interval.sliding; pNext->ekey += ((next - pNext->ekey + pInterval->sliding - 1)/pInterval->sliding) * pInterval->sliding;
pNext->skey = pNext->ekey - pQueryAttr->interval.interval + 1; pNext->skey = pNext->ekey - pInterval->interval + 1;
} }
} else if ((!QUERY_IS_ASC_QUERY(pQueryAttr)) && primaryKeys[startPos] < pNext->skey) { } else if ((!ascQuery) && primaryKeys[startPos] < pNext->skey) {
TSKEY next = primaryKeys[startPos]; TSKEY next = primaryKeys[startPos];
if (pQueryAttr->interval.intervalUnit == 'n' || pQueryAttr->interval.intervalUnit == 'y') { if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
pNext->skey = taosTimeTruncate(next, &pQueryAttr->interval, pQueryAttr->precision); pNext->skey = taosTimeTruncate(next, pInterval, precision);
pNext->ekey = taosTimeAdd(pNext->skey, pQueryAttr->interval.interval, pQueryAttr->interval.intervalUnit, pQueryAttr->precision) - 1; pNext->ekey = taosTimeAdd(pNext->skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
} else { } else {
pNext->skey -= ((pNext->skey - next + pQueryAttr->interval.sliding - 1) / pQueryAttr->interval.sliding) * pQueryAttr->interval.sliding; pNext->skey -= ((pNext->skey - next + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
pNext->ekey = pNext->skey + pQueryAttr->interval.interval - 1; pNext->ekey = pNext->skey + pInterval->interval - 1;
} }
} }
} }
...@@ -1069,23 +1086,19 @@ static void setNotInterpoWindowKey(SqlFunctionCtx* pCtx, int32_t numOfOutput, in ...@@ -1069,23 +1086,19 @@ static void setNotInterpoWindowKey(SqlFunctionCtx* pCtx, int32_t numOfOutput, in
} }
} }
static void saveDataBlockLastRow(STaskRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pDataBlockInfo, SArray* pDataBlock, static void saveDataBlockLastRow(char** pRow, SArray* pDataBlock, int32_t rowIndex, int32_t numOfCols) {
int32_t rowIndex) {
if (pDataBlock == NULL) { if (pDataBlock == NULL) {
return; return;
} }
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; for (int32_t k = 0; k < numOfCols; ++k) {
for (int32_t k = 0; k < pQueryAttr->numOfCols; ++k) {
SColumnInfoData *pColInfo = taosArrayGet(pDataBlock, k); SColumnInfoData *pColInfo = taosArrayGet(pDataBlock, k);
memcpy(pRuntimeEnv->prevRow[k], ((char*)pColInfo->pData) + (pColInfo->info.bytes * rowIndex), pColInfo->info.bytes); memcpy(pRow[k], ((char*)pColInfo->pData) + (pColInfo->info.bytes * rowIndex), pColInfo->info.bytes);
} }
} }
static TSKEY getStartTsKey(STaskAttr* pQueryAttr, STimeWindow* win, const TSKEY* tsCols, int32_t rows) { static TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols, int32_t rows, bool ascQuery) {
TSKEY ts = TSKEY_INITIAL_VAL; TSKEY ts = TSKEY_INITIAL_VAL;
bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
if (tsCols == NULL) { if (tsCols == NULL) {
ts = ascQuery? win->skey : win->ekey; ts = ascQuery? win->skey : win->ekey;
} else { } else {
...@@ -1191,7 +1204,7 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, ...@@ -1191,7 +1204,7 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx,
static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx, SSDataBlock* pSDataBlock) { static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx, SSDataBlock* pSDataBlock) {
for (int32_t k = 0; k < pOperator->numOfOutput; ++k) { for (int32_t k = 0; k < pOperator->numOfOutput; ++k) {
if (functionNeedToExecute(NULL, &pCtx[k])) { if (functionNeedToExecute(&pCtx[k])) {
pCtx[k].startTs = startTs;// this can be set during create the struct pCtx[k].startTs = startTs;// this can be set during create the struct
pCtx[k].fpSet->addInput(&pCtx[k]); pCtx[k].fpSet->addInput(&pCtx[k]);
} }
...@@ -1350,20 +1363,19 @@ static bool setTimeWindowInterpolationEndTs(SOperatorInfo* pOperatorInfo, SqlFun ...@@ -1350,20 +1363,19 @@ static bool setTimeWindowInterpolationEndTs(SOperatorInfo* pOperatorInfo, SqlFun
} }
static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBlock* pBlock, SqlFunctionCtx* pCtx, static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBlock* pBlock, SqlFunctionCtx* pCtx,
SResultRow* pResult, STimeWindow* win, int32_t startPos, int32_t forwardStep) { SResultRow* pResult, STimeWindow* win, int32_t startPos, int32_t forwardStep, int32_t order, bool timeWindowInterpo) {
STaskRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv; if (!timeWindowInterpo) {
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
if (!pQueryAttr->timeWindowInterpo) {
return; return;
} }
assert(pBlock != NULL); assert(pBlock != NULL);
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order); int32_t step = GET_FORWARD_DIRECTION_FACTOR(order);
if (pBlock->pDataBlock == NULL){ if (pBlock->pDataBlock == NULL){
// tscError("pBlock->pDataBlock == NULL"); // tscError("pBlock->pDataBlock == NULL");
return; return;
} }
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, 0); SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, 0);
TSKEY *tsCols = (TSKEY *)(pColInfo->pData); TSKEY *tsCols = (TSKEY *)(pColInfo->pData);
...@@ -1376,38 +1388,37 @@ static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBloc ...@@ -1376,38 +1388,37 @@ static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBloc
setResultRowInterpo(pResult, RESULT_ROW_START_INTERP); setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
} }
} else { } else {
setNotInterpoWindowKey(pCtx, pQueryAttr->numOfOutput, RESULT_ROW_START_INTERP); setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfOutput, RESULT_ROW_START_INTERP);
} }
// point interpolation does not require the end key time window interpolation. // point interpolation does not require the end key time window interpolation.
if (pQueryAttr->pointInterpQuery) { // if (pointInterpQuery) {
return; // return;
} // }
// interpolation query does not generate the time window end interpolation // interpolation query does not generate the time window end interpolation
done = resultRowInterpolated(pResult, RESULT_ROW_END_INTERP); done = resultRowInterpolated(pResult, RESULT_ROW_END_INTERP);
if (!done) { if (!done) {
int32_t endRowIndex = startPos + (forwardStep - 1) * step; int32_t endRowIndex = startPos + (forwardStep - 1) * step;
TSKEY endKey = QUERY_IS_ASC_QUERY(pQueryAttr)? pBlock->info.window.ekey:pBlock->info.window.skey; TSKEY endKey = (order == TSDB_ORDER_ASC)? pBlock->info.window.ekey:pBlock->info.window.skey;
bool interp = setTimeWindowInterpolationEndTs(pOperatorInfo, pCtx, endRowIndex, pBlock->pDataBlock, tsCols, endKey, win); bool interp = setTimeWindowInterpolationEndTs(pOperatorInfo, pCtx, endRowIndex, pBlock->pDataBlock, tsCols, endKey, win);
if (interp) { if (interp) {
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
} }
} else { } else {
setNotInterpoWindowKey(pCtx, pQueryAttr->numOfOutput, RESULT_ROW_END_INTERP); setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfOutput, RESULT_ROW_END_INTERP);
} }
} }
static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock, int32_t tableGroupId) { static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock, int32_t tableGroupId) {
STableIntervalOperatorInfo* pInfo = (STableIntervalOperatorInfo*) pOperatorInfo->info; STableIntervalOperatorInfo* pInfo = (STableIntervalOperatorInfo*) pOperatorInfo->info;
STaskRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv; SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
int32_t numOfOutput = pOperatorInfo->numOfOutput; int32_t numOfOutput = pOperatorInfo->numOfOutput;
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order); int32_t step = 1;
bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr); bool ascQuery = true;
int32_t prevIndex = pResultRowInfo->curPos; int32_t prevIndex = pResultRowInfo->curPos;
...@@ -1420,26 +1431,26 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -1420,26 +1431,26 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
} }
int32_t startPos = ascQuery? 0 : (pSDataBlock->info.rows - 1); int32_t startPos = ascQuery? 0 : (pSDataBlock->info.rows - 1);
TSKEY ts = getStartTsKey(pQueryAttr, &pSDataBlock->info.window, tsCols, pSDataBlock->info.rows); TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols, pSDataBlock->info.rows, ascQuery);
STimeWindow win = getActiveTimeWindow(pResultRowInfo, ts, pQueryAttr); STimeWindow win = getActiveTimeWindow(pResultRowInfo, ts, &pInfo->interval, pInfo->precision, &pInfo->win);
bool masterScan = IS_MAIN_SCAN(pRuntimeEnv); bool masterScan = true;
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
int32_t ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.uid, &win, masterScan, &pResult, tableGroupId, pInfo->pCtx, int32_t ret = setResultOutputBufByKey_rv(pResultRowInfo, pSDataBlock->info.uid, &win, masterScan, &pResult, tableGroupId, pInfo->binfo.pCtx,
numOfOutput, pInfo->rowCellInfoOffset); numOfOutput, pInfo->binfo.rowCellInfoOffset, pInfo->pResultBuf, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
int32_t forwardStep = 0; int32_t forwardStep = 0;
TSKEY ekey = reviseWindowEkey(pQueryAttr, &win); TSKEY ekey = win.ekey;
forwardStep = forwardStep =
getNumOfRowsInTimeWindow(pRuntimeEnv, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true); getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
// prev time window not interpolation yet. // prev time window not interpolation yet.
int32_t curIndex = pResultRowInfo->curPos; int32_t curIndex = pResultRowInfo->curPos;
if (prevIndex != -1 && prevIndex < curIndex && pQueryAttr->timeWindowInterpo) { if (prevIndex != -1 && prevIndex < curIndex && pInfo->timeWindowInterpo) {
for (int32_t j = prevIndex; j < curIndex; ++j) { // previous time window may be all closed already. for (int32_t j = prevIndex; j < curIndex; ++j) { // previous time window may be all closed already.
SResultRow* pRes = getResultRow(pResultRowInfo, j); SResultRow* pRes = getResultRow(pResultRowInfo, j);
if (pRes->closed) { if (pRes->closed) {
...@@ -1448,64 +1459,64 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -1448,64 +1459,64 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
} }
STimeWindow w = pRes->win; STimeWindow w = pRes->win;
ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.uid, &w, masterScan, &pResult, ret = setResultOutputBufByKey_rv(pResultRowInfo, pSDataBlock->info.uid, &w, masterScan, &pResult,
tableGroupId, pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset); tableGroupId, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, pInfo->pResultBuf, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
assert(!resultRowInterpolated(pResult, RESULT_ROW_END_INTERP)); assert(!resultRowInterpolated(pResult, RESULT_ROW_END_INTERP));
doTimeWindowInterpolation(pOperatorInfo, pInfo, pSDataBlock->pDataBlock, *(TSKEY*)pRuntimeEnv->prevRow[0], -1, doTimeWindowInterpolation(pOperatorInfo, &pInfo->binfo, pSDataBlock->pDataBlock, *(TSKEY*)pInfo->pRow[0], -1,
tsCols[startPos], startPos, w.ekey, RESULT_ROW_END_INTERP); tsCols[startPos], startPos, w.ekey, RESULT_ROW_END_INTERP);
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
setNotInterpoWindowKey(pInfo->pCtx, pQueryAttr->numOfOutput, RESULT_ROW_START_INTERP); setNotInterpoWindowKey(pInfo->binfo.pCtx, pOperatorInfo->numOfOutput, RESULT_ROW_START_INTERP);
doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &w, startPos, 0, tsCols, pSDataBlock->info.rows, numOfOutput); doApplyFunctions(pInfo->binfo.pCtx, &w, startPos, 0, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
} }
// restore current time window // restore current time window
ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.uid, &win, masterScan, &pResult, tableGroupId, pInfo->pCtx, ret = setResultOutputBufByKey_rv(pResultRowInfo, pSDataBlock->info.uid, &win, masterScan, &pResult, tableGroupId, pInfo->binfo.pCtx,
numOfOutput, pInfo->rowCellInfoOffset); numOfOutput, pInfo->binfo.rowCellInfoOffset, pInfo->pResultBuf, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
} }
// window start key interpolation // window start key interpolation
doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->pCtx, pResult, &win, startPos, forwardStep); doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &win, startPos, forwardStep, pInfo->order, false);
doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &win, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput); doApplyFunctions(pInfo->binfo.pCtx, &win, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
STimeWindow nextWin = win; STimeWindow nextWin = win;
while (1) { while (1) {
int32_t prevEndPos = (forwardStep - 1) * step + startPos; int32_t prevEndPos = (forwardStep - 1) * step + startPos;
startPos = getNextQualifiedWindow(pQueryAttr, &nextWin, &pSDataBlock->info, tsCols, binarySearchForKey, prevEndPos); startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, pInfo);
if (startPos < 0) { if (startPos < 0) {
break; break;
} }
// null data, failed to allocate more memory buffer // null data, failed to allocate more memory buffer
int32_t code = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.uid, &nextWin, masterScan, &pResult, tableGroupId, int32_t code = setResultOutputBufByKey_rv(pResultRowInfo, pSDataBlock->info.uid, &nextWin, masterScan, &pResult, tableGroupId,
pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset); pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, pInfo->pResultBuf, &pInfo->aggSup, pTaskInfo);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) { if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
ekey = reviseWindowEkey(pQueryAttr, &nextWin); ekey = nextWin.ekey;//reviseWindowEkey(pQueryAttr, &nextWin);
forwardStep = getNumOfRowsInTimeWindow(pRuntimeEnv, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true); forwardStep = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
// window start(end) key interpolation // window start(end) key interpolation
doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->pCtx, pResult, &nextWin, startPos, forwardStep); doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardStep, pInfo->order, false);
doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &nextWin, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput); doApplyFunctions(pInfo->binfo.pCtx, &nextWin, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
} }
if (pQueryAttr->timeWindowInterpo) { if (pInfo->timeWindowInterpo) {
int32_t rowIndex = ascQuery? (pSDataBlock->info.rows-1):0; int32_t rowIndex = ascQuery? (pSDataBlock->info.rows-1):0;
saveDataBlockLastRow(pRuntimeEnv, &pSDataBlock->info, pSDataBlock->pDataBlock, rowIndex); saveDataBlockLastRow(pInfo->pRow, pSDataBlock->pDataBlock, rowIndex, pSDataBlock->info.numOfCols);
} }
updateResultRowInfoActiveIndex(pResultRowInfo, pQueryAttr, pRuntimeEnv->current->lastKey); // updateResultRowInfoActiveIndex(pResultRowInfo, &pInfo->win, pRuntimeEnv->current->lastKey, true, false);
} }
...@@ -1528,7 +1539,7 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe ...@@ -1528,7 +1539,7 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
} }
int32_t startPos = ascQuery? 0 : (pSDataBlock->info.rows - 1); int32_t startPos = ascQuery? 0 : (pSDataBlock->info.rows - 1);
TSKEY ts = getStartTsKey(pQueryAttr, &pSDataBlock->info.window, tsCols, pSDataBlock->info.rows); TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols, pSDataBlock->info.rows, ascQuery);
STimeWindow win = getCurrentActiveTimeWindow(pResultRowInfo, ts, pQueryAttr); STimeWindow win = getCurrentActiveTimeWindow(pResultRowInfo, ts, pQueryAttr);
bool masterScan = IS_MAIN_SCAN(pRuntimeEnv); bool masterScan = IS_MAIN_SCAN(pRuntimeEnv);
...@@ -1541,25 +1552,25 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe ...@@ -1541,25 +1552,25 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
while (1) { while (1) {
// null data, failed to allocate more memory buffer // null data, failed to allocate more memory buffer
ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.uid, &win, masterScan, &pResult, ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.uid, &win, masterScan, &pResult,
tableGroupId, pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset); tableGroupId, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
TSKEY ekey = reviseWindowEkey(pQueryAttr, &win); TSKEY ekey = reviseWindowEkey(pQueryAttr, &win);
forwardStep = getNumOfRowsInTimeWindow(pRuntimeEnv, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true); // forwardStep = getNumOfRowsInTimeWindow(pRuntimeEnv, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true);
// window start(end) key interpolation // window start(end) key interpolation
doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->pCtx, pResult, &win, startPos, forwardStep); // doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &win, startPos, forwardStep);
doApplyFunctions(pRuntimeEnv, pInfo->pCtx, ascQuery ? &win : &preWin, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput); // doApplyFunctions(pRuntimeEnv, pInfo->binfo.pCtx, ascQuery ? &win : &preWin, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput);
preWin = win; preWin = win;
int32_t prevEndPos = (forwardStep - 1) * step + startPos; int32_t prevEndPos = (forwardStep - 1) * step + startPos;
startPos = getNextQualifiedWindow(pQueryAttr, &win, &pSDataBlock->info, tsCols, binarySearchForKey, prevEndPos); // startPos = getNextQualifiedWindow(pQueryAttr, &win, &pSDataBlock->info, tsCols, binarySearchForKey, prevEndPos);
if (startPos < 0) { if (startPos < 0) {
if ((ascQuery && win.skey <= pQueryAttr->window.ekey) || ((!ascQuery) && win.ekey >= pQueryAttr->window.ekey)) { if ((ascQuery && win.skey <= pQueryAttr->window.ekey) || ((!ascQuery) && win.ekey >= pQueryAttr->window.ekey)) {
int32_t code = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.uid, &win, masterScan, &pResult, tableGroupId, int32_t code = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.uid, &win, masterScan, &pResult, tableGroupId,
pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset); pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) { if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
...@@ -1567,8 +1578,8 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe ...@@ -1567,8 +1578,8 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
startPos = pSDataBlock->info.rows - 1; startPos = pSDataBlock->info.rows - 1;
// window start(end) key interpolation // window start(end) key interpolation
doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->pCtx, pResult, &win, startPos, forwardStep); // doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &win, startPos, forwardStep);
doApplyFunctions(pRuntimeEnv, pInfo->pCtx, ascQuery ? &win : &preWin, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput); // doApplyFunctions(pRuntimeEnv, pInfo->binfo.pCtx, ascQuery ? &win : &preWin, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput);
} }
break; break;
...@@ -1578,10 +1589,10 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe ...@@ -1578,10 +1589,10 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
if (pQueryAttr->timeWindowInterpo) { if (pQueryAttr->timeWindowInterpo) {
int32_t rowIndex = ascQuery? (pSDataBlock->info.rows-1):0; int32_t rowIndex = ascQuery? (pSDataBlock->info.rows-1):0;
saveDataBlockLastRow(pRuntimeEnv, &pSDataBlock->info, pSDataBlock->pDataBlock, rowIndex); // saveDataBlockLastRow(pRuntimeEnv, &pSDataBlock->info, pSDataBlock->pDataBlock, rowIndex);
} }
updateResultRowInfoActiveIndex(pResultRowInfo, pQueryAttr, pRuntimeEnv->current->lastKey); // updateResultRowInfoActiveIndex(pResultRowInfo, pQueryAttr, pRuntimeEnv->current->lastKey);
} }
...@@ -1643,7 +1654,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn ...@@ -1643,7 +1654,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
} }
doApplyFunctions(pRuntimeEnv, pInfo->binfo.pCtx, &w, j - num, num, tsList, pSDataBlock->info.rows, pOperator->numOfOutput); // doApplyFunctions(pRuntimeEnv, pInfo->binfo.pCtx, &w, j - num, num, tsList, pSDataBlock->info.rows, pOperator->numOfOutput);
num = 1; num = 1;
memcpy(pInfo->prevData, val, bytes); memcpy(pInfo->prevData, val, bytes);
...@@ -1662,7 +1673,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn ...@@ -1662,7 +1673,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
} }
doApplyFunctions(pRuntimeEnv, pInfo->binfo.pCtx, &w, pSDataBlock->info.rows - num, num, tsList, pSDataBlock->info.rows, pOperator->numOfOutput); // doApplyFunctions(pRuntimeEnv, pInfo->binfo.pCtx, &w, pSDataBlock->info.rows - num, num, tsList, pSDataBlock->info.rows, pOperator->numOfOutput);
} }
tfree(pInfo->prevData); tfree(pInfo->prevData);
...@@ -1712,8 +1723,8 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInf ...@@ -1712,8 +1723,8 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInf
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
} }
doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList, // doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList,
pSDataBlock->info.rows, pOperator->numOfOutput); // pSDataBlock->info.rows, pOperator->numOfOutput);
pInfo->curWindow.skey = tsList[j]; pInfo->curWindow.skey = tsList[j];
pInfo->curWindow.ekey = tsList[j]; pInfo->curWindow.ekey = tsList[j];
...@@ -1733,8 +1744,8 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInf ...@@ -1733,8 +1744,8 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInf
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
} }
doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList, // doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList,
pSDataBlock->info.rows, pOperator->numOfOutput); // pSDataBlock->info.rows, pOperator->numOfOutput);
} }
static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) { static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) {
...@@ -1808,7 +1819,7 @@ static int32_t getGroupbyColumnIndex(SGroupbyExpr *pGroupbyExpr, SSDataBlock* pD ...@@ -1808,7 +1819,7 @@ static int32_t getGroupbyColumnIndex(SGroupbyExpr *pGroupbyExpr, SSDataBlock* pD
return -1; return -1;
} }
static bool functionNeedToExecute(STaskRuntimeEnv *pRuntimeEnv, SqlFunctionCtx *pCtx) { static bool functionNeedToExecute(SqlFunctionCtx *pCtx) {
struct SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); struct SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
// in case of timestamp column, always generated results. // in case of timestamp column, always generated results.
...@@ -2048,6 +2059,10 @@ static SqlFunctionCtx* createSqlFunctionCtx_rv(SArray* pExprInfo, int32_t** rowC ...@@ -2048,6 +2059,10 @@ static SqlFunctionCtx* createSqlFunctionCtx_rv(SArray* pExprInfo, int32_t** rowC
pCtx->resDataInfo.type = pSqlExpr->resSchema.type; pCtx->resDataInfo.type = pSqlExpr->resSchema.type;
pCtx->order = TSDB_ORDER_ASC; pCtx->order = TSDB_ORDER_ASC;
if (i == 0) {
pCtx->functionId = FUNCTION_TS;
}
// pCtx->functionId = pSqlExpr->functionId; // pCtx->functionId = pSqlExpr->functionId;
// pCtx->stableQuery = pQueryAttr->stableQuery; // pCtx->stableQuery = pQueryAttr->stableQuery;
pCtx->resDataInfo.intermediateBytes = pSqlExpr->interBytes; pCtx->resDataInfo.intermediateBytes = pSqlExpr->interBytes;
...@@ -2343,22 +2358,22 @@ static bool isCachedLastQuery(STaskAttr *pQueryAttr) { ...@@ -2343,22 +2358,22 @@ static bool isCachedLastQuery(STaskAttr *pQueryAttr) {
} }
///////////////////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////////////
//todo refactor : return window
void getAlignQueryTimeWindow(STaskAttr *pQueryAttr, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win) { void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win) {
assert(key >= keyFirst && key <= keyLast && pQueryAttr->interval.sliding <= pQueryAttr->interval.interval); assert(key >= keyFirst && key <= keyLast && pInterval->sliding <= pInterval->interval);
win->skey = taosTimeTruncate(key, &pQueryAttr->interval, pQueryAttr->precision); win->skey = taosTimeTruncate(key, pInterval, precision);
/* /*
* if the realSkey > INT64_MAX - pQueryAttr->interval.interval, the query duration between * if the realSkey > INT64_MAX - pInterval->interval, the query duration between
* realSkey and realEkey must be less than one interval.Therefore, no need to adjust the query ranges. * realSkey and realEkey must be less than one interval.Therefore, no need to adjust the query ranges.
*/ */
if (keyFirst > (INT64_MAX - pQueryAttr->interval.interval)) { if (keyFirst > (INT64_MAX - pInterval->interval)) {
assert(keyLast - keyFirst < pQueryAttr->interval.interval); assert(keyLast - keyFirst < pInterval->interval);
win->ekey = INT64_MAX; win->ekey = INT64_MAX;
} else if (pQueryAttr->interval.intervalUnit == 'n' || pQueryAttr->interval.intervalUnit == 'y') { } else if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
win->ekey = taosTimeAdd(win->skey, pQueryAttr->interval.interval, pQueryAttr->interval.intervalUnit, pQueryAttr->precision) - 1; win->ekey = taosTimeAdd(win->skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
} else { } else {
win->ekey = win->skey + pQueryAttr->interval.interval - 1; win->ekey = win->skey + pInterval->interval - 1;
} }
} }
...@@ -2584,7 +2599,7 @@ static bool overlapWithTimeWindow(STaskAttr* pQueryAttr, SDataBlockInfo* pBlockI ...@@ -2584,7 +2599,7 @@ static bool overlapWithTimeWindow(STaskAttr* pQueryAttr, SDataBlockInfo* pBlockI
TSKEY ek = TMAX(pQueryAttr->window.skey, pQueryAttr->window.ekey); TSKEY ek = TMAX(pQueryAttr->window.skey, pQueryAttr->window.ekey);
if (QUERY_IS_ASC_QUERY(pQueryAttr)) { if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
getAlignQueryTimeWindow(pQueryAttr, pBlockInfo->window.skey, sk, ek, &w); // getAlignQueryTimeWindow(pQueryAttr, pBlockInfo->window.skey, sk, ek, &w);
assert(w.ekey >= pBlockInfo->window.skey); assert(w.ekey >= pBlockInfo->window.skey);
if (w.ekey < pBlockInfo->window.ekey) { if (w.ekey < pBlockInfo->window.ekey) {
...@@ -2592,7 +2607,7 @@ static bool overlapWithTimeWindow(STaskAttr* pQueryAttr, SDataBlockInfo* pBlockI ...@@ -2592,7 +2607,7 @@ static bool overlapWithTimeWindow(STaskAttr* pQueryAttr, SDataBlockInfo* pBlockI
} }
while(1) { while(1) {
getNextTimeWindow(pQueryAttr, &w); // getNextTimeWindow(pQueryAttr, &w);
if (w.skey > pBlockInfo->window.ekey) { if (w.skey > pBlockInfo->window.ekey) {
break; break;
} }
...@@ -2603,7 +2618,7 @@ static bool overlapWithTimeWindow(STaskAttr* pQueryAttr, SDataBlockInfo* pBlockI ...@@ -2603,7 +2618,7 @@ static bool overlapWithTimeWindow(STaskAttr* pQueryAttr, SDataBlockInfo* pBlockI
} }
} }
} else { } else {
getAlignQueryTimeWindow(pQueryAttr, pBlockInfo->window.ekey, sk, ek, &w); // getAlignQueryTimeWindow(pQueryAttr, pBlockInfo->window.ekey, sk, ek, &w);
assert(w.skey <= pBlockInfo->window.ekey); assert(w.skey <= pBlockInfo->window.ekey);
if (w.skey > pBlockInfo->window.skey) { if (w.skey > pBlockInfo->window.skey) {
...@@ -2611,7 +2626,7 @@ static bool overlapWithTimeWindow(STaskAttr* pQueryAttr, SDataBlockInfo* pBlockI ...@@ -2611,7 +2626,7 @@ static bool overlapWithTimeWindow(STaskAttr* pQueryAttr, SDataBlockInfo* pBlockI
} }
while(1) { while(1) {
getNextTimeWindow(pQueryAttr, &w); // getNextTimeWindow(pQueryAttr, &w);
if (w.ekey < pBlockInfo->window.skey) { if (w.ekey < pBlockInfo->window.skey) {
break; break;
} }
...@@ -3513,9 +3528,6 @@ static void setupEnvForReverseScan(STableScanInfo *pTableScanInfo, SqlFunctionCt ...@@ -3513,9 +3528,6 @@ static void setupEnvForReverseScan(STableScanInfo *pTableScanInfo, SqlFunctionCt
} }
void finalizeQueryResult(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset) { void finalizeQueryResult(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset) {
STaskRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv;
// STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t numOfOutput = pOperator->numOfOutput; int32_t numOfOutput = pOperator->numOfOutput;
// if (pQueryAttr->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQueryAttr) || pQueryAttr->sw.gap > 0 || pQueryAttr->stateWindow) { // if (pQueryAttr->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQueryAttr) || pQueryAttr->sw.gap > 0 || pQueryAttr->stateWindow) {
// // for each group result, call the finalize function for each column // // for each group result, call the finalize function for each column
...@@ -3908,7 +3920,7 @@ void setIntervalQueryRange(STaskRuntimeEnv *pRuntimeEnv, TSKEY key) { ...@@ -3908,7 +3920,7 @@ void setIntervalQueryRange(STaskRuntimeEnv *pRuntimeEnv, TSKEY key) {
TSKEY sk = TMIN(win.skey, win.ekey); TSKEY sk = TMIN(win.skey, win.ekey);
TSKEY ek = TMAX(win.skey, win.ekey); TSKEY ek = TMAX(win.skey, win.ekey);
getAlignQueryTimeWindow(pQueryAttr, win.skey, sk, ek, &w); // getAlignQueryTimeWindow(pQueryAttr, win.skey, sk, ek, &w);
// if (pResultRowInfo->prevSKey == TSKEY_INITIAL_VAL) { // if (pResultRowInfo->prevSKey == TSKEY_INITIAL_VAL) {
// if (!QUERY_IS_ASC_QUERY(pQueryAttr)) { // if (!QUERY_IS_ASC_QUERY(pQueryAttr)) {
...@@ -6364,22 +6376,19 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) { ...@@ -6364,22 +6376,19 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) {
return NULL; return NULL;
} }
STableIntervalOperatorInfo* pIntervalInfo = pOperator->info; STableIntervalOperatorInfo* pInfo = pOperator->info;
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
// toSDatablock(pAggInfo->pGroupResInfo, pAggInfo->pResultBuf, pInfo->pRes, pAggInfo->binfo.capacity); // toSDatablock(pAggInfo->pGroupResInfo, pAggInfo->pResultBuf, pInfo->pRes, pAggInfo->binfo.capacity);
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
} }
return pIntervalInfo->pRes; return pInfo->binfo.pRes;
} }
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; // int32_t order = pQueryAttr->order.order;
int32_t order = pQueryAttr->order.order; // STimeWindow win = pQueryAttr->window;
STimeWindow win = pQueryAttr->window;
SOperatorInfo* downstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
while(1) { while(1) {
...@@ -6391,30 +6400,30 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) { ...@@ -6391,30 +6400,30 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) {
break; break;
} }
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput); // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput);
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQueryAttr->order.order); setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, TSDB_ORDER_ASC);
hashIntervalAgg(pOperator, &pIntervalInfo->resultRowInfo, pBlock, 0); hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0);
} }
// restore the value // restore the value
pQueryAttr->order.order = order; // pQueryAttr->order.order = order;
pQueryAttr->window = win; // pQueryAttr->window = win;
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pIntervalInfo->resultRowInfo); closeAllResultRows(&pInfo->binfo.resultRowInfo);
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
finalizeQueryResult(pOperator, pIntervalInfo->pCtx, &pIntervalInfo->resultRowInfo, pIntervalInfo->rowCellInfoOffset); finalizeQueryResult(pOperator, pInfo->binfo.pCtx, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset);
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pIntervalInfo->resultRowInfo); initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo);
// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); toSDatablock(&pInfo->groupResInfo, pInfo->pResultBuf, pInfo->binfo.pRes, pInfo->binfo.capacity);
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
} }
return pIntervalInfo->pRes->info.rows == 0? NULL:pIntervalInfo->pRes; return pInfo->binfo.pRes->info.rows == 0? NULL:pInfo->binfo.pRes;
} }
static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) { static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) {
...@@ -6429,11 +6438,11 @@ static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) { ...@@ -6429,11 +6438,11 @@ static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) {
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); // toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { if (pIntervalInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
} }
return pIntervalInfo->pRes; return pIntervalInfo->binfo.pRes;
} }
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
...@@ -6454,8 +6463,8 @@ static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) { ...@@ -6454,8 +6463,8 @@ static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) {
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput); // setTagValue(pOperator, pRuntimeEnv->current->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput);
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQueryAttr->order.order); setInputDataBlock(pOperator, pIntervalInfo->binfo.pCtx, pBlock, pQueryAttr->order.order);
hashAllIntervalAgg(pOperator, &pIntervalInfo->resultRowInfo, pBlock, 0); hashAllIntervalAgg(pOperator, &pIntervalInfo->binfo.resultRowInfo, pBlock, 0);
} }
// restore the value // restore the value
...@@ -6463,18 +6472,18 @@ static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) { ...@@ -6463,18 +6472,18 @@ static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) {
pQueryAttr->window = win; pQueryAttr->window = win;
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pIntervalInfo->resultRowInfo); closeAllResultRows(&pIntervalInfo->binfo.resultRowInfo);
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
finalizeQueryResult(pOperator, pIntervalInfo->pCtx, &pIntervalInfo->resultRowInfo, pIntervalInfo->rowCellInfoOffset); finalizeQueryResult(pOperator, pIntervalInfo->binfo.pCtx, &pIntervalInfo->binfo.resultRowInfo, pIntervalInfo->binfo.rowCellInfoOffset);
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pIntervalInfo->resultRowInfo); initGroupResInfo(&pRuntimeEnv->groupResInfo, &pIntervalInfo->binfo.resultRowInfo);
// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); // toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { if (pIntervalInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
} }
return pIntervalInfo->pRes->info.rows == 0? NULL:pIntervalInfo->pRes; return pIntervalInfo->binfo.pRes->info.rows == 0? NULL:pIntervalInfo->binfo.pRes;
} }
static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
...@@ -6490,14 +6499,14 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { ...@@ -6490,14 +6499,14 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
// copyToSDataBlock(NULL, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset); // copyToSDataBlock(NULL, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset);
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { if (pIntervalInfo->binfo.pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
} }
SQInfo* pQInfo = pRuntimeEnv->qinfo; SQInfo* pQInfo = pRuntimeEnv->qinfo;
pQInfo->summary.firstStageMergeTime += (taosGetTimestampUs() - st); pQInfo->summary.firstStageMergeTime += (taosGetTimestampUs() - st);
return pIntervalInfo->pRes; return pIntervalInfo->binfo.pRes;
} }
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
...@@ -6518,7 +6527,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { ...@@ -6518,7 +6527,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current; STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
// setTagValue(pOperator, pTableQueryInfo->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput); // setTagValue(pOperator, pTableQueryInfo->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput);
setInputDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQueryAttr->order.order); setInputDataBlock(pOperator, pIntervalInfo->binfo.pCtx, pBlock, pQueryAttr->order.order);
setIntervalQueryRange(pRuntimeEnv, pBlock->info.window.skey); setIntervalQueryRange(pRuntimeEnv, pBlock->info.window.skey);
hashIntervalAgg(pOperator, &pTableQueryInfo->resInfo, pBlock, pTableQueryInfo->groupIndex); hashIntervalAgg(pOperator, &pTableQueryInfo->resInfo, pBlock, pTableQueryInfo->groupIndex);
...@@ -6530,11 +6539,11 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { ...@@ -6530,11 +6539,11 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
// copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset); // copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset);
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { if (pIntervalInfo->binfo.pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
} }
return pIntervalInfo->pRes; return pIntervalInfo->binfo.pRes;
} }
static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) { static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) {
...@@ -6548,11 +6557,11 @@ static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) { ...@@ -6548,11 +6557,11 @@ static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) {
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
// copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset); // copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset);
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { if (pIntervalInfo->binfo.pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
} }
return pIntervalInfo->pRes; return pIntervalInfo->binfo.pRes;
} }
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
...@@ -6573,7 +6582,7 @@ static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) { ...@@ -6573,7 +6582,7 @@ static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) {
STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current; STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
// setTagValue(pOperator, pTableQueryInfo->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput); // setTagValue(pOperator, pTableQueryInfo->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput);
setInputDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQueryAttr->order.order); setInputDataBlock(pOperator, pIntervalInfo->binfo.pCtx, pBlock, pQueryAttr->order.order);
setIntervalQueryRange(pRuntimeEnv, pBlock->info.window.skey); setIntervalQueryRange(pRuntimeEnv, pBlock->info.window.skey);
hashAllIntervalAgg(pOperator, &pTableQueryInfo->resInfo, pBlock, pTableQueryInfo->groupIndex); hashAllIntervalAgg(pOperator, &pTableQueryInfo->resInfo, pBlock, pTableQueryInfo->groupIndex);
...@@ -6586,14 +6595,14 @@ static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) { ...@@ -6586,14 +6595,14 @@ static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) {
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
// copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset); // copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset);
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { if (pIntervalInfo->binfo.pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
} }
SQInfo* pQInfo = pRuntimeEnv->qinfo; SQInfo* pQInfo = pRuntimeEnv->qinfo;
pQInfo->summary.firstStageMergeTime += (taosGetTimestampUs() - st); pQInfo->summary.firstStageMergeTime += (taosGetTimestampUs() - st);
return pIntervalInfo->pRes; return pIntervalInfo->binfo.pRes;
} }
static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo *pInfo, SSDataBlock *pSDataBlock) { static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo *pInfo, SSDataBlock *pSDataBlock) {
...@@ -6645,8 +6654,8 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI ...@@ -6645,8 +6654,8 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
} }
doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList, // doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList,
pSDataBlock->info.rows, pOperator->numOfOutput); // pSDataBlock->info.rows, pOperator->numOfOutput);
pInfo->curWindow.skey = tsList[j]; pInfo->curWindow.skey = tsList[j];
pInfo->curWindow.ekey = tsList[j]; pInfo->curWindow.ekey = tsList[j];
...@@ -6667,8 +6676,8 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI ...@@ -6667,8 +6676,8 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
} }
doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList, // doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList,
pSDataBlock->info.rows, pOperator->numOfOutput); // pSDataBlock->info.rows, pOperator->numOfOutput);
} }
static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) { static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) {
...@@ -7268,27 +7277,44 @@ SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorIn ...@@ -7268,27 +7277,44 @@ SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorIn
return pOperator; return pOperator;
} }
SOperatorInfo* createTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo) {
STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
pInfo->pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); initAggSup(&pInfo->aggSup, pExprInfo);
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
initResultRowInfo(&pInfo->resultRowInfo, 8); // todo:
pInfo->order = TSDB_ORDER_ASC;
pInfo->precision = TSDB_TIME_PRECISION_MICRO;
pInfo->win.skey = INT64_MIN;
pInfo->win.ekey = INT64_MAX;
pInfo->interval.intervalUnit = 's';
pInfo->interval.slidingUnit = 's';
pInfo->interval.interval = 1000;
pInfo->interval.sliding = 1000;
int32_t code = createDiskbasedBuf(&pInfo->pResultBuf, 4096, 4096 * 256, 0, "/tmp/");
int32_t numOfOutput = taosArrayGetSize(pExprInfo);
pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, &pInfo->binfo.rowCellInfoOffset, &pInfo->binfo.resRowSize);
pInfo->binfo.pRes = createOutputBuf_rv(pExprInfo, pInfo->binfo.capacity);
initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "TimeIntervalAggOperator"; pOperator->name = "TimeIntervalAggOperator";
// pOperator->operatorType = OP_TimeWindow; pOperator->operatorType = OP_TimeWindow;
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->pExpr = pExpr; pOperator->pExpr = exprArrayDup(pExprInfo);
pOperator->pTaskInfo = pTaskInfo;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doIntervalAgg; pOperator->exec = doIntervalAgg;
pOperator->cleanupFn = destroyBasicOperatorInfo; pOperator->cleanupFn = destroyBasicOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
return pOperator; return pOperator;
} }
...@@ -7296,9 +7322,9 @@ SOperatorInfo* createTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOpe ...@@ -7296,9 +7322,9 @@ SOperatorInfo* createTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOpe
SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) {
STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
pInfo->pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
initResultRowInfo(&pInfo->resultRowInfo, 8); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
...@@ -7369,9 +7395,9 @@ SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator ...@@ -7369,9 +7395,9 @@ SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator
SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) {
STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
pInfo->pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
initResultRowInfo(&pInfo->resultRowInfo, 8); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "MultiTableTimeIntervalOperator"; pOperator->name = "MultiTableTimeIntervalOperator";
...@@ -7393,9 +7419,9 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntim ...@@ -7393,9 +7419,9 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntim
SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) {
STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
pInfo->pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
initResultRowInfo(&pInfo->resultRowInfo, 8); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "AllMultiTableTimeIntervalOperator"; pOperator->name = "AllMultiTableTimeIntervalOperator";
...@@ -7458,7 +7484,7 @@ SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInf ...@@ -7458,7 +7484,7 @@ SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInf
TSKEY sk = TMIN(pQueryAttr->window.skey, pQueryAttr->window.ekey); TSKEY sk = TMIN(pQueryAttr->window.skey, pQueryAttr->window.ekey);
TSKEY ek = TMAX(pQueryAttr->window.skey, pQueryAttr->window.ekey); TSKEY ek = TMAX(pQueryAttr->window.skey, pQueryAttr->window.ekey);
getAlignQueryTimeWindow(pQueryAttr, pQueryAttr->window.skey, sk, ek, &w); // getAlignQueryTimeWindow(pQueryAttr, pQueryAttr->window.skey, sk, ek, &w);
pInfo->pFillInfo = pInfo->pFillInfo =
taosCreateFillInfo(pQueryAttr->order.order, w.skey, 0, (int32_t)pRuntimeEnv->resultInfo.capacity, numOfOutput, taosCreateFillInfo(pQueryAttr->order.order, w.skey, 0, (int32_t)pRuntimeEnv->resultInfo.capacity, numOfOutput,
......
...@@ -44,8 +44,8 @@ typedef struct SLHashObj { ...@@ -44,8 +44,8 @@ typedef struct SLHashObj {
* +-----------+-------+--------+ * +-----------+-------+--------+
*/ */
typedef struct SLHashNode { typedef struct SLHashNode {
int32_t keyLen; uint16_t keyLen;
int32_t dataLen; uint16_t dataLen;
} SLHashNode; } SLHashNode;
#define GET_LHASH_NODE_KEY(_n) (((char*)(_n)) + sizeof(SLHashNode)) #define GET_LHASH_NODE_KEY(_n) (((char*)(_n)) + sizeof(SLHashNode))
...@@ -70,10 +70,10 @@ static int32_t doGetRelatedSplitBucketId(int32_t bucketId, int32_t bits) { ...@@ -70,10 +70,10 @@ static int32_t doGetRelatedSplitBucketId(int32_t bucketId, int32_t bits) {
} }
static void doCopyObject(char* p, const void* key, int32_t keyLen, const void* data, int32_t size) { static void doCopyObject(char* p, const void* key, int32_t keyLen, const void* data, int32_t size) {
*(int32_t*) p = keyLen; *(uint16_t*) p = keyLen;
p += sizeof(int32_t); p += sizeof(uint16_t);
*(int32_t*) p = size; *(uint16_t*) p = size;
p += sizeof(int32_t); p += sizeof(uint16_t);
memcpy(p, key, keyLen); memcpy(p, key, keyLen);
p += keyLen; p += keyLen;
...@@ -118,7 +118,7 @@ static int32_t doAddToBucket(SLHashObj* pHashObj, SLHashBucket* pBucket, int32_t ...@@ -118,7 +118,7 @@ static int32_t doAddToBucket(SLHashObj* pHashObj, SLHashBucket* pBucket, int32_t
} }
pBucket->size += 1; pBucket->size += 1;
printf("===> add to bucket:0x%x, num:%d, key:%d\n", index, pBucket->size, *(int*) key); // printf("===> add to bucket:0x%x, num:%d, key:%d\n", index, pBucket->size, *(int*) key);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -154,6 +154,14 @@ static void doCompressBucketPages(SLHashObj *pHashObj, SLHashBucket* pBucket) { ...@@ -154,6 +154,14 @@ static void doCompressBucketPages(SLHashObj *pHashObj, SLHashBucket* pBucket) {
int32_t* pageId = taosArrayGetLast(pBucket->pPageIdList); int32_t* pageId = taosArrayGetLast(pBucket->pPageIdList);
SFilePage* pLast = getBufPage(pHashObj->pBuf, *pageId); SFilePage* pLast = getBufPage(pHashObj->pBuf, *pageId);
if (pLast->num <= sizeof(SFilePage)) {
// this is empty
dBufSetBufPageRecycled(pHashObj->pBuf, pLast);
releaseBufPage(pHashObj->pBuf, pFirst);
taosArrayRemove(pBucket->pPageIdList, numOfPages - 1);
return;
}
char* pStart = pLast->data; char* pStart = pLast->data;
int32_t nodeSize = GET_LHASH_NODE_LEN(pStart); int32_t nodeSize = GET_LHASH_NODE_LEN(pStart);
while (1) { while (1) {
...@@ -162,21 +170,33 @@ static void doCompressBucketPages(SLHashObj *pHashObj, SLHashBucket* pBucket) { ...@@ -162,21 +170,33 @@ static void doCompressBucketPages(SLHashObj *pHashObj, SLHashBucket* pBucket) {
SLHashNode* pNode = (SLHashNode*)pStart; SLHashNode* pNode = (SLHashNode*)pStart;
doCopyObject(p, GET_LHASH_NODE_KEY(pStart), pNode->keyLen, GET_LHASH_NODE_DATA(pStart), pNode->dataLen); doCopyObject(p, GET_LHASH_NODE_KEY(pStart), pNode->keyLen, GET_LHASH_NODE_DATA(pStart), pNode->dataLen);
setBufPageDirty(pFirst, true); setBufPageDirty(pFirst, true);
setBufPageDirty(pLast, true);
ASSERT(pLast->num >= nodeSize + sizeof(SFilePage));
pFirst->num += nodeSize; pFirst->num += nodeSize;
pLast->num -= nodeSize; pLast->num -= nodeSize;
pStart += nodeSize; pStart += nodeSize;
if (pStart - pLast->data >= pLast->num) { if (pLast->num <= sizeof(SFilePage)) {
// this is empty // this is empty
dBufSetBufPageRecycled(pHashObj->pBuf, pLast); dBufSetBufPageRecycled(pHashObj->pBuf, pLast);
releaseBufPage(pHashObj->pBuf, pFirst);
taosArrayRemove(pBucket->pPageIdList, numOfPages - 1); taosArrayRemove(pBucket->pPageIdList, numOfPages - 1);
break; break;
} }
nodeSize = GET_LHASH_NODE_LEN(pStart); nodeSize = GET_LHASH_NODE_LEN(pStart);
} else { // move to the front of pLast page } else { // move to the front of pLast page
memmove(pLast->data, pStart,(((char*)pLast) + pLast->num - pStart)); if (pStart != pLast->data) {
memmove(pLast->data, pStart, (((char*)pLast) + pLast->num - pStart));
setBufPageDirty(pLast, true);
}
releaseBufPage(pHashObj->pBuf, pLast);
releaseBufPage(pHashObj->pBuf, pFirst);
break; break;
} }
} }
...@@ -216,7 +236,7 @@ static int32_t doAddNewBucket(SLHashObj* pHashObj) { ...@@ -216,7 +236,7 @@ static int32_t doAddNewBucket(SLHashObj* pHashObj) {
taosArrayPush(pBucket->pPageIdList, &pageId); taosArrayPush(pBucket->pPageIdList, &pageId);
pHashObj->numOfBuckets += 1; pHashObj->numOfBuckets += 1;
printf("---------------add new bucket, id:0x%x, total:%d\n", pHashObj->numOfBuckets - 1, pHashObj->numOfBuckets); // printf("---------------add new bucket, id:0x%x, total:%d\n", pHashObj->numOfBuckets - 1, pHashObj->numOfBuckets);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -281,7 +301,7 @@ int32_t tHashPut(SLHashObj* pHashObj, const void *key, size_t keyLen, void *data ...@@ -281,7 +301,7 @@ int32_t tHashPut(SLHashObj* pHashObj, const void *key, size_t keyLen, void *data
if (v >= pHashObj->numOfBuckets) { if (v >= pHashObj->numOfBuckets) {
int32_t newBucketId = doGetAlternativeBucketId(v, pHashObj->bits, pHashObj->numOfBuckets); int32_t newBucketId = doGetAlternativeBucketId(v, pHashObj->bits, pHashObj->numOfBuckets);
printf("bucketId: 0x%x not exists, put it into 0x%x instead\n", v, newBucketId); // printf("bucketId: 0x%x not exists, put it into 0x%x instead\n", v, newBucketId);
v = newBucketId; v = newBucketId;
} }
...@@ -305,7 +325,7 @@ int32_t tHashPut(SLHashObj* pHashObj, const void *key, size_t keyLen, void *data ...@@ -305,7 +325,7 @@ int32_t tHashPut(SLHashObj* pHashObj, const void *key, size_t keyLen, void *data
int32_t numOfBits = ceil(log(pHashObj->numOfBuckets) / log(2)); int32_t numOfBits = ceil(log(pHashObj->numOfBuckets) / log(2));
if (numOfBits > pHashObj->bits) { if (numOfBits > pHashObj->bits) {
printf("extend the bits from %d to %d, new bucket:%d\n", pHashObj->bits, numOfBits, newBucketId); // printf("extend the bits from %d to %d, new bucket:%d\n", pHashObj->bits, numOfBits, newBucketId);
ASSERT(numOfBits == pHashObj->bits + 1); ASSERT(numOfBits == pHashObj->bits + 1);
pHashObj->bits = numOfBits; pHashObj->bits = numOfBits;
} }
...@@ -314,7 +334,7 @@ int32_t tHashPut(SLHashObj* pHashObj, const void *key, size_t keyLen, void *data ...@@ -314,7 +334,7 @@ int32_t tHashPut(SLHashObj* pHashObj, const void *key, size_t keyLen, void *data
// load all data in this bucket and check if the data needs to relocated into the new bucket // load all data in this bucket and check if the data needs to relocated into the new bucket
SLHashBucket* pBucket = pHashObj->pBucket[splitBucketId]; SLHashBucket* pBucket = pHashObj->pBucket[splitBucketId];
printf("split %d items' bucket:0x%x to new bucket:0x%x\n", pBucket->size, splitBucketId, newBucketId); // printf("split %d items' bucket:0x%x to new bucket:0x%x\n", pBucket->size, splitBucketId, newBucketId);
for (int32_t i = 0; i < taosArrayGetSize(pBucket->pPageIdList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pBucket->pPageIdList); ++i) {
int32_t pageId = *(int32_t*)taosArrayGet(pBucket->pPageIdList, i); int32_t pageId = *(int32_t*)taosArrayGet(pBucket->pPageIdList, i);
...@@ -331,14 +351,14 @@ int32_t tHashPut(SLHashObj* pHashObj, const void *key, size_t keyLen, void *data ...@@ -331,14 +351,14 @@ int32_t tHashPut(SLHashObj* pHashObj, const void *key, size_t keyLen, void *data
if (v1 != splitBucketId) { // place it into the new bucket if (v1 != splitBucketId) { // place it into the new bucket
ASSERT(v1 == newBucketId); ASSERT(v1 == newBucketId);
printf("move key:%d to 0x%x bucket, remain items:%d\n", *(int32_t*)k, v1, pBucket->size - 1); // printf("move key:%d to 0x%x bucket, remain items:%d\n", *(int32_t*)k, v1, pBucket->size - 1);
SLHashBucket* pNewBucket = pHashObj->pBucket[newBucketId]; SLHashBucket* pNewBucket = pHashObj->pBucket[newBucketId];
doAddToBucket(pHashObj, pNewBucket, newBucketId, (void*)GET_LHASH_NODE_KEY(pNode), pNode->keyLen, doAddToBucket(pHashObj, pNewBucket, newBucketId, (void*)GET_LHASH_NODE_KEY(pNode), pNode->keyLen,
GET_LHASH_NODE_KEY(pNode), pNode->dataLen); GET_LHASH_NODE_KEY(pNode), pNode->dataLen);
doRemoveFromBucket(p, pNode, pBucket); doRemoveFromBucket(p, pNode, pBucket);
} else { } else {
printf("check key:%d, located into: %d, skip it\n", *(int*) k, v1); // printf("check key:%d, located into: %d, skip it\n", *(int*) k, v1);
int32_t nodeSize = GET_LHASH_NODE_LEN(pStart); int32_t nodeSize = GET_LHASH_NODE_LEN(pStart);
pStart += nodeSize; pStart += nodeSize;
...@@ -398,8 +418,8 @@ void tHashPrint(const SLHashObj* pHashObj, int32_t type) { ...@@ -398,8 +418,8 @@ void tHashPrint(const SLHashObj* pHashObj, int32_t type) {
if (type == LINEAR_HASH_DATA) { if (type == LINEAR_HASH_DATA) {
for (int32_t i = 0; i < pHashObj->numOfBuckets; ++i) { for (int32_t i = 0; i < pHashObj->numOfBuckets; ++i) {
printf("bucket: 0x%x, obj:%d, page:%d\n", i, pHashObj->pBucket[i]->size, // printf("bucket: 0x%x, obj:%d, page:%d\n", i, pHashObj->pBucket[i]->size,
(int)taosArrayGetSize(pHashObj->pBucket[i]->pPageIdList)); // (int)taosArrayGetSize(pHashObj->pBucket[i]->pPageIdList));
} }
} else { } else {
dBufPrintStatis(pHashObj->pBuf); dBufPrintStatis(pHashObj->pBuf);
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
*/ */
#include <executorimpl.h> #include <executorimpl.h>
#include <function.h>
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <tglobal.h> #include <tglobal.h>
#include <iostream> #include <iostream>
...@@ -47,6 +48,8 @@ typedef struct SDummyInputInfo { ...@@ -47,6 +48,8 @@ typedef struct SDummyInputInfo {
int32_t startVal; int32_t startVal;
int32_t type; int32_t type;
int32_t numOfRowsPerPage; int32_t numOfRowsPerPage;
int32_t numOfCols; // number of columns
int64_t tsStart;
SSDataBlock* pBlock; SSDataBlock* pBlock;
} SDummyInputInfo; } SDummyInputInfo;
...@@ -117,16 +120,96 @@ SSDataBlock* getDummyBlock(void* param, bool* newgroup) { ...@@ -117,16 +120,96 @@ SSDataBlock* getDummyBlock(void* param, bool* newgroup) {
return pBlock; return pBlock;
} }
SOperatorInfo* createDummyOperator(int32_t startVal, int32_t numOfBlocks, int32_t rowsPerPage, int32_t type) { SSDataBlock* get2ColsDummyBlock(void* param, bool* newgroup) {
SOperatorInfo* pOperator = static_cast<SOperatorInfo*>(param);
SDummyInputInfo* pInfo = static_cast<SDummyInputInfo*>(pOperator->info);
if (pInfo->current >= pInfo->totalPages) {
return NULL;
}
if (pInfo->pBlock == NULL) {
pInfo->pBlock = static_cast<SSDataBlock*>(calloc(1, sizeof(SSDataBlock)));
pInfo->pBlock->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData));
SColumnInfoData colInfo = {0};
colInfo.info.type = TSDB_DATA_TYPE_TIMESTAMP;
colInfo.info.bytes = sizeof(int64_t);
colInfo.info.colId = 1;
colInfo.pData = static_cast<char*>(calloc(pInfo->numOfRowsPerPage, sizeof(int64_t)));
// colInfo.nullbitmap = static_cast<char*>(calloc(1, (pInfo->numOfRowsPerPage + 7) / 8));
taosArrayPush(pInfo->pBlock->pDataBlock, &colInfo);
SColumnInfoData colInfo1 = {0};
colInfo1.info.type = TSDB_DATA_TYPE_INT;
colInfo1.info.bytes = 4;
colInfo1.info.colId = 2;
colInfo1.pData = static_cast<char*>(calloc(pInfo->numOfRowsPerPage, sizeof(int32_t)));
colInfo1.nullbitmap = static_cast<char*>(calloc(1, (pInfo->numOfRowsPerPage + 7) / 8));
taosArrayPush(pInfo->pBlock->pDataBlock, &colInfo1);
} else {
blockDataClearup(pInfo->pBlock, false);
}
SSDataBlock* pBlock = pInfo->pBlock;
char buf[128] = {0};
char b1[128] = {0};
int64_t ts = 0;
int32_t v = 0;
for(int32_t i = 0; i < pInfo->numOfRowsPerPage; ++i) {
SColumnInfoData* pColInfo = static_cast<SColumnInfoData*>(TARRAY_GET_ELEM(pBlock->pDataBlock, 0));
ts = (++pInfo->tsStart);
colDataAppend(pColInfo, i, reinterpret_cast<const char*>(&ts), false);
SColumnInfoData* pColInfo1 = static_cast<SColumnInfoData*>(TARRAY_GET_ELEM(pBlock->pDataBlock, 1));
if (pInfo->type == data_desc) {
v = (--pInfo->startVal);
} else if (pInfo->type == data_asc) {
v = ++pInfo->startVal;
} else if (pInfo->type == data_rand) {
v = random();
}
colDataAppend(pColInfo1, i, reinterpret_cast<const char*>(&v), false);
// sprintf(buf, "this is %d row", i);
// STR_TO_VARSTR(b1, buf);
//
// SColumnInfoData* pColInfo2 = static_cast<SColumnInfoData*>(TARRAY_GET_ELEM(pBlock->pDataBlock, 1));
// colDataAppend(pColInfo2, i, b1, false);
}
pBlock->info.rows = pInfo->numOfRowsPerPage;
pBlock->info.numOfCols = 1;
pInfo->current += 1;
blockDataUpdateTsWindow(pBlock);
return pBlock;
}
SOperatorInfo* createDummyOperator(int32_t startVal, int32_t numOfBlocks, int32_t rowsPerPage, int32_t type, int32_t numOfCols) {
SOperatorInfo* pOperator = static_cast<SOperatorInfo*>(calloc(1, sizeof(SOperatorInfo))); SOperatorInfo* pOperator = static_cast<SOperatorInfo*>(calloc(1, sizeof(SOperatorInfo)));
pOperator->name = "dummyInputOpertor4Test"; pOperator->name = "dummyInputOpertor4Test";
if (numOfCols == 1) {
pOperator->exec = getDummyBlock; pOperator->exec = getDummyBlock;
} else {
pOperator->exec = get2ColsDummyBlock;
}
SDummyInputInfo *pInfo = (SDummyInputInfo*) calloc(1, sizeof(SDummyInputInfo)); SDummyInputInfo *pInfo = (SDummyInputInfo*) calloc(1, sizeof(SDummyInputInfo));
pInfo->totalPages = numOfBlocks; pInfo->totalPages = numOfBlocks;
pInfo->startVal = startVal; pInfo->startVal = startVal;
pInfo->numOfRowsPerPage = rowsPerPage; pInfo->numOfRowsPerPage = rowsPerPage;
pInfo->type = type; pInfo->type = type;
pInfo->tsStart = 1620000000000;
pOperator->info = pInfo; pOperator->info = pInfo;
return pOperator; return pOperator;
...@@ -357,8 +440,6 @@ TEST(testCase, external_sort_Test) { ...@@ -357,8 +440,6 @@ TEST(testCase, external_sort_Test) {
taosArrayDestroy(pOrderVal); taosArrayDestroy(pOrderVal);
} }
TEST(testCase, sorted_merge_Test) { TEST(testCase, sorted_merge_Test) {
srand(time(NULL)); srand(time(NULL));
...@@ -432,4 +513,79 @@ TEST(testCase, sorted_merge_Test) { ...@@ -432,4 +513,79 @@ TEST(testCase, sorted_merge_Test) {
} }
#endif #endif
TEST(testCase, time_interval_Operator_Test) {
srand(time(NULL));
SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder));
SOrder o = {0};
o.order = TSDB_ORDER_ASC;
o.col.info.colId = 1;
o.col.info.type = TSDB_DATA_TYPE_INT;
taosArrayPush(pOrderVal, &o);
SArray* pExprInfo = taosArrayInit(4, sizeof(SExprInfo));
SExprInfo *exp = static_cast<SExprInfo*>(calloc(1, sizeof(SExprInfo)));
exp->base.resSchema = createSchema(TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 1, "ts");
exp->base.pColumns = static_cast<SColumn*>(calloc(1, sizeof(SColumn)));
exp->base.pColumns->flag = TSDB_COL_NORMAL;
exp->base.pColumns->info = (SColumnInfo) {.colId = 1, .type = TSDB_DATA_TYPE_TIMESTAMP, .bytes = 8};
exp->base.numOfCols = 1;
taosArrayPush(pExprInfo, &exp);
SExprInfo *exp1 = static_cast<SExprInfo*>(calloc(1, sizeof(SExprInfo)));
exp1->base.resSchema = createSchema(TSDB_DATA_TYPE_BIGINT, 8, 2, "res1");
exp1->base.pColumns = static_cast<SColumn*>(calloc(1, sizeof(SColumn)));
exp1->base.pColumns->flag = TSDB_COL_NORMAL;
exp1->base.pColumns->info = (SColumnInfo) {.colId = 1, .type = TSDB_DATA_TYPE_INT, .bytes = 4};
exp1->base.numOfCols = 1;
taosArrayPush(pExprInfo, &exp1);
SOperatorInfo* p = createDummyOperator(1, 1, 2000, data_asc, 2);
SExecTaskInfo ti = {0};
SOperatorInfo* pOperator = createIntervalOperatorInfo(p, pExprInfo, &ti);
bool newgroup = false;
SSDataBlock* pRes = NULL;
int32_t total = 1;
int64_t s1 = taosGetTimestampUs();
int32_t t = 1;
while(1) {
int64_t s = taosGetTimestampUs();
pRes = pOperator->exec(pOperator, &newgroup);
int64_t e = taosGetTimestampUs();
if (t++ == 1) {
printf("---------------elapsed:%ld\n", e - s);
}
if (pRes == NULL) {
break;
}
SColumnInfoData* pCol1 = static_cast<SColumnInfoData*>(taosArrayGet(pRes->pDataBlock, 0));
// SColumnInfoData* pCol2 = static_cast<SColumnInfoData*>(taosArrayGet(pRes->pDataBlock, 1));
for (int32_t i = 0; i < pRes->info.rows; ++i) {
// char* p = colDataGetData(pCol2, i);
printf("%d: %ld\n", total++, ((int64_t*)pCol1->pData)[i]);
// printf("%d: %d, %s\n", total++, ((int32_t*)pCol1->pData)[i], (char*)varDataVal(p));
}
}
int64_t s2 = taosGetTimestampUs();
printf("total:%ld\n", s2 - s1);
pOperator->cleanupFn(pOperator->info, 2);
tfree(exp);
tfree(exp1);
taosArrayDestroy(pExprInfo);
taosArrayDestroy(pOrderVal);
}
#pragma GCC diagnostic pop #pragma GCC diagnostic pop
...@@ -28,9 +28,9 @@ TEST(testCase, linear_hash_Tests) { ...@@ -28,9 +28,9 @@ TEST(testCase, linear_hash_Tests) {
srand(time(NULL)); srand(time(NULL));
_hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT); _hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT);
#if 1 #if 0
SLHashObj* pHashObj = tHashInit(10, 128 + 8, fn, 8); SLHashObj* pHashObj = tHashInit(256, 4096, fn, 320);
for(int32_t i = 0; i < 100; ++i) { for(int32_t i = 0; i < 5000000; ++i) {
int32_t code = tHashPut(pHashObj, &i, sizeof(i), &i, sizeof(i)); int32_t code = tHashPut(pHashObj, &i, sizeof(i), &i, sizeof(i));
assert(code == 0); assert(code == 0);
} }
...@@ -46,13 +46,13 @@ TEST(testCase, linear_hash_Tests) { ...@@ -46,13 +46,13 @@ TEST(testCase, linear_hash_Tests) {
// } // }
// } // }
tHashPrint(pHashObj, LINEAR_HASH_DATA); tHashPrint(pHashObj, LINEAR_HASH_STATIS);
tHashCleanup(pHashObj); tHashCleanup(pHashObj);
#endif #endif
#if 0 #if 0
SHashObj* pHashObj = taosHashInit(1000, fn, false, HASH_NO_LOCK); SHashObj* pHashObj = taosHashInit(1000, fn, false, HASH_NO_LOCK);
for(int32_t i = 0; i < 500000; ++i) { for(int32_t i = 0; i < 1000000; ++i) {
taosHashPut(pHashObj, &i, sizeof(i), &i, sizeof(i)); taosHashPut(pHashObj, &i, sizeof(i), &i, sizeof(i));
} }
......
...@@ -4395,7 +4395,7 @@ SFunctionFpSet fpSet[1] = { ...@@ -4395,7 +4395,7 @@ SFunctionFpSet fpSet[1] = {
.addInput = count_function, .addInput = count_function,
.finalize = doFinalizer, .finalize = doFinalizer,
.combine = count_func_merge, .combine = count_func_merge,
} },
}; };
SAggFunctionInfo aggFunc[35] = {{ SAggFunctionInfo aggFunc[35] = {{
......
...@@ -5,12 +5,6 @@ ...@@ -5,12 +5,6 @@
#include "tcompression.h" #include "tcompression.h"
#include "thash.h" #include "thash.h"
//enum {
// true = 0x1,
// BUF_PAGE_RELEASED = 0x2,
// true = 0x3,
//};
#define GET_DATA_PAYLOAD(_p) ((char *)(_p)->pData + POINTER_BYTES) #define GET_DATA_PAYLOAD(_p) ((char *)(_p)->pData + POINTER_BYTES)
#define NO_IN_MEM_AVAILABLE_PAGES(_b) (listNEles((_b)->lruList) >= (_b)->inMemPages) #define NO_IN_MEM_AVAILABLE_PAGES(_b) (listNEles((_b)->lruList) >= (_b)->inMemPages)
...@@ -20,7 +14,7 @@ typedef struct SPageDiskInfo { ...@@ -20,7 +14,7 @@ typedef struct SPageDiskInfo {
} SPageDiskInfo, SFreeListItem; } SPageDiskInfo, SFreeListItem;
struct SPageInfo { struct SPageInfo {
SListNode* pn; // point to list node SListNode* pn; // point to list node struct
void* pData; void* pData;
int64_t offset; int64_t offset;
int32_t pageId; int32_t pageId;
...@@ -38,7 +32,8 @@ struct SDiskbasedBuf { ...@@ -38,7 +32,8 @@ struct SDiskbasedBuf {
char* path; // file path char* path; // file path
int32_t pageSize; // current used page size int32_t pageSize; // current used page size
int32_t inMemPages; // numOfPages that are allocated in memory int32_t inMemPages; // numOfPages that are allocated in memory
SHashObj* groupSet; // id hash table SList* freePgList; // free page list
SHashObj* groupSet; // id hash table, todo remove it
SHashObj* all; SHashObj* all;
SList* lruList; SList* lruList;
void* emptyDummyIdList; // dummy id list void* emptyDummyIdList; // dummy id list
...@@ -110,6 +105,14 @@ static uint64_t allocatePositionInFile(SDiskbasedBuf* pBuf, size_t size) { ...@@ -110,6 +105,14 @@ static uint64_t allocatePositionInFile(SDiskbasedBuf* pBuf, size_t size) {
} }
} }
static void setPageNotInBuf(SPageInfo* pPageInfo) {
pPageInfo->pData = NULL;
}
static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) {
return pageSize + POINTER_BYTES + 2;
}
/** /**
* +--------------------------+-------------------+--------------+ * +--------------------------+-------------------+--------------+
* | PTR to SPageInfo (8bytes)| Payload (PageSize)| 2 Extra Bytes| * | PTR to SPageInfo (8bytes)| Payload (PageSize)| 2 Extra Bytes|
...@@ -189,17 +192,17 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { ...@@ -189,17 +192,17 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
} }
} else {// NOTE: the size may be -1, the this recycle page has not been flushed to disk yet. } else {// NOTE: the size may be -1, the this recycle page has not been flushed to disk yet.
size = pg->length; size = pg->length;
if (size == -1) {
printf("----\n");
}
} }
ASSERT(size > 0 || (pg->offset == -1 && pg->length == -1)); ASSERT(size > 0 || (pg->offset == -1 && pg->length == -1));
char* pDataBuf = pg->pData; char* pDataBuf = pg->pData;
memset(pDataBuf, 0, pBuf->pageSize); memset(pDataBuf, 0, getAllocPageSize(pBuf->pageSize));
pg->pData = NULL; // this means the data is not in buffer
pg->length = size;
pg->dirty = false;
pg->length = size; // on disk size
return pDataBuf; return pDataBuf;
} }
...@@ -214,7 +217,11 @@ static char* flushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { ...@@ -214,7 +217,11 @@ static char* flushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
} }
} }
return doFlushPageToDisk(pBuf, pg); char* p = doFlushPageToDisk(pBuf, pg);
setPageNotInBuf(pg);
pg->dirty = false;
return p;
} }
// load file block data in disk // load file block data in disk
...@@ -284,12 +291,23 @@ static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) { ...@@ -284,12 +291,23 @@ static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) {
assert(pageInfo->pageId >= 0 && pageInfo->pn == pn); assert(pageInfo->pageId >= 0 && pageInfo->pn == pn);
if (!pageInfo->used) { if (!pageInfo->used) {
// printf("%d is chosen\n", pageInfo->pageId);
break; break;
} else { } else {
printf("page %d is used, dirty:%d\n", pageInfo->pageId, pageInfo->dirty); // printf("page %d is used, dirty:%d\n", pageInfo->pageId, pageInfo->dirty);
} }
} }
// int32_t pos = listNEles(pBuf->lruList);
// SListIter iter1 = {0};
// tdListInitIter(pBuf->lruList, &iter1, TD_LIST_BACKWARD);
// SListNode* pn1 = NULL;
// while((pn1 = tdListNext(&iter1)) != NULL) {
// SPageInfo* pageInfo = *(SPageInfo**) pn1->data;
// printf("page %d is used, dirty:%d, pos:%d\n", pageInfo->pageId, pageInfo->dirty, pos - 1);
// pos -= 1;
// }
return pn; return pn;
} }
...@@ -333,10 +351,6 @@ static void lruListMoveToFront(SList *pList, SPageInfo* pi) { ...@@ -333,10 +351,6 @@ static void lruListMoveToFront(SList *pList, SPageInfo* pi) {
tdListPrependNode(pList, pi->pn); tdListPrependNode(pList, pi->pn);
} }
static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) {
return pageSize + POINTER_BYTES + 2;
}
static SPageInfo* getPageInfoFromPayload(void* page) { static SPageInfo* getPageInfoFromPayload(void* page) {
int32_t offset = offsetof(SPageInfo, pData); int32_t offset = offsetof(SPageInfo, pData);
char* p = page - offset; char* p = page - offset;
...@@ -348,41 +362,42 @@ static SPageInfo* getPageInfoFromPayload(void* page) { ...@@ -348,41 +362,42 @@ static SPageInfo* getPageInfoFromPayload(void* page) {
int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir) { int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir) {
*pBuf = calloc(1, sizeof(SDiskbasedBuf)); *pBuf = calloc(1, sizeof(SDiskbasedBuf));
SDiskbasedBuf* pResBuf = *pBuf; SDiskbasedBuf* pPBuf = *pBuf;
if (pResBuf == NULL) { if (pPBuf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
pResBuf->pageSize = pagesize; pPBuf->pageSize = pagesize;
pResBuf->numOfPages = 0; // all pages are in buffer in the first place pPBuf->numOfPages = 0; // all pages are in buffer in the first place
pResBuf->totalBufSize = 0; pPBuf->totalBufSize = 0;
pResBuf->inMemPages = inMemBufSize/pagesize; // maximum allowed pages, it is a soft limit. pPBuf->inMemPages = inMemBufSize/pagesize; // maximum allowed pages, it is a soft limit.
pResBuf->allocateId = -1; pPBuf->allocateId = -1;
pResBuf->comp = true; pPBuf->comp = true;
pResBuf->file = NULL; pPBuf->file = NULL;
pResBuf->qId = qId; pPBuf->qId = qId;
pResBuf->fileSize = 0; pPBuf->fileSize = 0;
pResBuf->pFree = taosArrayInit(4, sizeof(SFreeListItem)); pPBuf->pFree = taosArrayInit(4, sizeof(SFreeListItem));
pPBuf->freePgList = tdListNew(POINTER_BYTES);
// at least more than 2 pages must be in memory // at least more than 2 pages must be in memory
assert(inMemBufSize >= pagesize * 2); assert(inMemBufSize >= pagesize * 2);
pResBuf->lruList = tdListNew(POINTER_BYTES); pPBuf->lruList = tdListNew(POINTER_BYTES);
// init id hash table // init id hash table
_hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT); _hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT);
pResBuf->groupSet = taosHashInit(10, fn, true, false); pPBuf->groupSet = taosHashInit(10, fn, true, false);
pResBuf->assistBuf = malloc(pResBuf->pageSize + 2); // EXTRA BYTES pPBuf->assistBuf = malloc(pPBuf->pageSize + 2); // EXTRA BYTES
pResBuf->all = taosHashInit(10, fn, true, false); pPBuf->all = taosHashInit(10, fn, true, false);
char path[PATH_MAX] = {0}; char path[PATH_MAX] = {0};
taosGetTmpfilePath(dir, "paged-buf", path); taosGetTmpfilePath(dir, "paged-buf", path);
pResBuf->path = strdup(path); pPBuf->path = strdup(path);
pResBuf->emptyDummyIdList = taosArrayInit(1, sizeof(int32_t)); pPBuf->emptyDummyIdList = taosArrayInit(1, sizeof(int32_t));
// qDebug("QInfo:0x%"PRIx64" create resBuf for output, page size:%d, inmem buf pages:%d, file:%s", qId, pResBuf->pageSize, // qDebug("QInfo:0x%"PRIx64" create resBuf for output, page size:%d, inmem buf pages:%d, file:%s", qId, pPBuf->pageSize,
// pResBuf->inMemPages, pResBuf->path); // pPBuf->inMemPages, pPBuf->path);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -401,19 +416,29 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId) { ...@@ -401,19 +416,29 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId) {
} }
} }
SPageInfo* pi = NULL;
if (listNEles(pBuf->freePgList) != 0) {
SListNode* pItem = tdListPopHead(pBuf->freePgList);
pi = *(SPageInfo**) pItem->data;
pi->used = true;
*pageId = pi->pageId;
tfree(pItem);
} else {// create a new pageinfo
// register new id in this group // register new id in this group
*pageId = (++pBuf->allocateId); *pageId = (++pBuf->allocateId);
// register page id info // register page id info
SPageInfo* pi = registerPage(pBuf, groupId, *pageId); pi = registerPage(pBuf, groupId, *pageId);
// add to hash map
taosHashPut(pBuf->all, pageId, sizeof(int32_t), &pi, POINTER_BYTES);
pBuf->totalBufSize += pBuf->pageSize;
}
// add to LRU list // add to LRU list
assert(listNEles(pBuf->lruList) < pBuf->inMemPages && pBuf->inMemPages > 0); assert(listNEles(pBuf->lruList) < pBuf->inMemPages && pBuf->inMemPages > 0);
lruListPushFront(pBuf->lruList, pi); lruListPushFront(pBuf->lruList, pi);
// add to hash map
taosHashPut(pBuf->all, pageId, sizeof(int32_t), &pi, POINTER_BYTES);
// allocate buf // allocate buf
if (availablePage == NULL) { if (availablePage == NULL) {
pi->pData = calloc(1, getAllocPageSize(pBuf->pageSize)); // add extract bytes in case of zipped buffer increased. pi->pData = calloc(1, getAllocPageSize(pBuf->pageSize)); // add extract bytes in case of zipped buffer increased.
...@@ -421,11 +446,7 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId) { ...@@ -421,11 +446,7 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId) {
pi->pData = availablePage; pi->pData = availablePage;
} }
pBuf->totalBufSize += pBuf->pageSize;
((void**)pi->pData)[0] = pi; ((void**)pi->pData)[0] = pi;
pi->used = true;
return (void *)(GET_DATA_PAYLOAD(pi)); return (void *)(GET_DATA_PAYLOAD(pi));
} }
...@@ -467,6 +488,7 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) { ...@@ -467,6 +488,7 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
(*pi)->pData = availablePage; (*pi)->pData = availablePage;
} }
// set the ptr to the new SPageInfo
((void**)((*pi)->pData))[0] = (*pi); ((void**)((*pi)->pData))[0] = (*pi);
lruListPushFront(pBuf->lruList, *pi); lruListPushFront(pBuf->lruList, *pi);
...@@ -551,6 +573,8 @@ void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) { ...@@ -551,6 +573,8 @@ void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) {
} }
tdListFree(pBuf->lruList); tdListFree(pBuf->lruList);
tdListFree(pBuf->freePgList);
taosArrayDestroy(pBuf->emptyDummyIdList); taosArrayDestroy(pBuf->emptyDummyIdList);
taosArrayDestroy(pBuf->pFree); taosArrayDestroy(pBuf->pFree);
...@@ -599,11 +623,12 @@ void dBufSetBufPageRecycled(SDiskbasedBuf *pBuf, void* pPage) { ...@@ -599,11 +623,12 @@ void dBufSetBufPageRecycled(SDiskbasedBuf *pBuf, void* pPage) {
ppi->used = false; ppi->used = false;
ppi->dirty = false; ppi->dirty = false;
// it is a in-memory page that has not been flushed to disk yet. // add this pageinfo into the free page info list
if (ppi->length != -1 && ppi->offset != -1) { SListNode* pNode = tdListPopNode(pBuf->lruList, ppi->pn);
SFreeListItem item = {.length = ppi->length, .offset = ppi->offset}; tfree(ppi->pData);
taosArrayPush(pBuf->pFree, &item); tfree(pNode);
}
tdListAppend(pBuf->freePgList, &ppi);
} }
void dBufSetPrintInfo(SDiskbasedBuf* pBuf) { void dBufSetPrintInfo(SDiskbasedBuf* pBuf) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册