executorimpl.c 148.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
15

H
Haojun Liao 已提交
16 17
#include "filter.h"
#include "function.h"
18 19
#include "functionMgt.h"
#include "os.h"
H
Haojun Liao 已提交
20
#include "querynodes.h"
21
#include "tfill.h"
dengyihao's avatar
dengyihao 已提交
22
#include "tname.h"
X
Xiaoyu Wang 已提交
23
#include "tref.h"
24

H
Haojun Liao 已提交
25
#include "tdatablock.h"
26
#include "tglobal.h"
H
Haojun Liao 已提交
27
#include "tmsg.h"
H
Haojun Liao 已提交
28
#include "tsort.h"
29
#include "ttime.h"
H
Haojun Liao 已提交
30

31
#include "executorimpl.h"
dengyihao's avatar
dengyihao 已提交
32
#include "index.h"
33
#include "query.h"
34 35
#include "tcompare.h"
#include "tcompression.h"
H
Haojun Liao 已提交
36
#include "thash.h"
37
#include "ttypes.h"
dengyihao's avatar
dengyihao 已提交
38
#include "vnode.h"
39

H
Haojun Liao 已提交
40
#define IS_MAIN_SCAN(runtime)          ((runtime)->scanFlag == MAIN_SCAN)
41 42 43 44 45 46
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)

#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)

#if 0
static UNUSED_FUNC void *u_malloc (size_t __size) {
wafwerar's avatar
wafwerar 已提交
47
  uint32_t v = taosRand();
48 49 50 51

  if (v % 1000 <= 0) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
52
    return taosMemoryMalloc(__size);
53 54 55 56
  }
}

static UNUSED_FUNC void* u_calloc(size_t num, size_t __size) {
wafwerar's avatar
wafwerar 已提交
57
  uint32_t v = taosRand();
58 59 60
  if (v % 1000 <= 0) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
61
    return taosMemoryCalloc(num, __size);
62 63 64 65
  }
}

static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
wafwerar's avatar
wafwerar 已提交
66
  uint32_t v = taosRand();
67 68 69
  if (v % 5 <= 1) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
70
    return taosMemoryRealloc(p, __size);
71 72 73 74 75 76 77 78
  }
}

#define calloc  u_calloc
#define malloc  u_malloc
#define realloc u_realloc
#endif

X
Xiaoyu Wang 已提交
79
#define CLEAR_QUERY_STATUS(q, st)   ((q)->status &= (~(st)))
80 81
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0)

L
Liu Jicong 已提交
82 83
int32_t getMaximumIdleDurationSec() { return tsShellActivityTimer * 2; }

84
static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pBlock);
85

X
Xiaoyu Wang 已提交
86
static void releaseQueryBuf(size_t numOfTables);
87

88 89
static void destroyFillOperatorInfo(void* param);
static void destroyProjectOperatorInfo(void* param);
H
Haojun Liao 已提交
90
static void destroySortOperatorInfo(void* param);
91
static void destroyAggOperatorInfo(void* param);
X
Xiaoyu Wang 已提交
92

93 94
static void destroyIntervalOperatorInfo(void* param);
static void destroyExchangeOperatorInfo(void* param);
H
Haojun Liao 已提交
95

96 97
static void destroyOperatorInfo(SOperatorInfo* pOperator);

98
void doSetOperatorCompleted(SOperatorInfo* pOperator) {
99
  pOperator->status = OP_EXEC_DONE;
H
Haojun Liao 已提交
100
  ASSERT(pOperator->pTaskInfo != NULL);
101

102
  pOperator->cost.totalCost = (taosGetTimestampUs() - pOperator->pTaskInfo->cost.start * 1000) / 1000.0;
H
Haojun Liao 已提交
103
  setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
104
}
105

H
Haojun Liao 已提交
106
int32_t operatorDummyOpenFn(SOperatorInfo* pOperator) {
107
  OPTR_SET_OPENED(pOperator);
108
  pOperator->cost.openCost = 0;
H
Haojun Liao 已提交
109
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
110 111
}

112
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t streamFn,
113
                                   __optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_explain_fn_t explain) {
114 115 116 117 118 119 120 121 122 123 124 125
  SOperatorFpSet fpSet = {
      ._openFn = openFn,
      .getNextFn = nextFn,
      .getStreamResFn = streamFn,
      .cleanupFn = cleanup,
      .closeFn = closeFn,
      .getExplainFn = explain,
  };

  return fpSet;
}

126 127
static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
                                  SGroupResInfo* pGroupResInfo);
H
Haojun Liao 已提交
128

129
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
130
static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
131

132
#if 0
L
Liu Jicong 已提交
133 134
static bool chkResultRowFromKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, char* pData,
                                int16_t bytes, bool masterscan, uint64_t uid) {
135 136 137
  bool existed = false;
  SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid);

L
Liu Jicong 已提交
138 139
  SResultRow** p1 =
      (SResultRow**)taosHashGet(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
140 141 142 143 144 145 146 147 148 149 150

  // in case of repeat scan/reverse scan, no new time window added.
  if (QUERY_IS_INTERVAL_QUERY(pRuntimeEnv->pQueryAttr)) {
    if (!masterscan) {  // the *p1 may be NULL in case of sliding+offset exists.
      return p1 != NULL;
    }

    if (p1 != NULL) {
      if (pResultRowInfo->size == 0) {
        existed = false;
      } else if (pResultRowInfo->size == 1) {
dengyihao's avatar
dengyihao 已提交
151
        //        existed = (pResultRowInfo->pResult[0] == (*p1));
152 153
      } else {  // check if current pResultRowInfo contains the existed pResultRow
        SET_RES_EXT_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid, pResultRowInfo);
L
Liu Jicong 已提交
154 155
        int64_t* index =
            taosHashGet(pRuntimeEnv->pResultRowListSet, pRuntimeEnv->keyBuf, GET_RES_EXT_WINDOW_KEY_LEN(bytes));
156 157 158 159 160 161 162 163 164 165 166 167 168
        if (index != NULL) {
          existed = true;
        } else {
          existed = false;
        }
      }
    }

    return existed;
  }

  return p1 != NULL;
}
169
#endif
170

171
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize) {
L
Liu Jicong 已提交
172
  SFilePage* pData = NULL;
173 174 175

  // in the first scan, new space needed for results
  int32_t pageId = -1;
176
  if (*currentPageId == -1) {
177
    pData = getNewBufPage(pResultBuf, &pageId);
178 179
    pData->num = sizeof(SFilePage);
  } else {
180 181
    pData = getBufPage(pResultBuf, *currentPageId);
    pageId = *currentPageId;
182

wmmhello's avatar
wmmhello 已提交
183
    if (pData->num + interBufSize > getBufPageSize(pResultBuf)) {
184
      // release current page first, and prepare the next one
185
      releaseBufPage(pResultBuf, pData);
186

187
      pData = getNewBufPage(pResultBuf, &pageId);
188 189 190 191 192 193 194 195 196 197
      if (pData != NULL) {
        pData->num = sizeof(SFilePage);
      }
    }
  }

  if (pData == NULL) {
    return NULL;
  }

198 199
  setBufPageDirty(pData, true);

200 201 202 203
  // set the number of rows in current disk page
  SResultRow* pResultRow = (SResultRow*)((char*)pData + pData->num);
  pResultRow->pageId = pageId;
  pResultRow->offset = (int32_t)pData->num;
204
  *currentPageId = pageId;
205

wmmhello's avatar
wmmhello 已提交
206
  pData->num += interBufSize;
207 208 209
  return pResultRow;
}

210 211 212 213 214 215 216
/**
 * the struct of key in hash table
 * +----------+---------------+
 * | group id |   key data    |
 * | 8 bytes  | actual length |
 * +----------+---------------+
 */
217 218 219
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData,
                                   int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo,
                                   bool isIntervalQuery, SAggSupporter* pSup) {
220
  SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
H
Haojun Liao 已提交
221

dengyihao's avatar
dengyihao 已提交
222
  SResultRowPosition* p1 =
223
      (SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
H
Haojun Liao 已提交
224

225 226
  SResultRow* pResult = NULL;

H
Haojun Liao 已提交
227 228
  // in case of repeat scan/reverse scan, no new time window added.
  if (isIntervalQuery) {
229
    if (masterscan && p1 != NULL) {  // the *p1 may be NULL in case of sliding+offset exists.
230
      pResult = getResultRowByPos(pResultBuf, p1, true);
231
      ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
H
Haojun Liao 已提交
232 233
    }
  } else {
dengyihao's avatar
dengyihao 已提交
234 235
    // In case of group by column query, the required SResultRow object must be existInCurrentResusltRowInfo in the
    // pResultRowInfo object.
H
Haojun Liao 已提交
236
    if (p1 != NULL) {
237
      // todo
238
      pResult = getResultRowByPos(pResultBuf, p1, true);
239
      ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
H
Haojun Liao 已提交
240 241 242
    }
  }

L
Liu Jicong 已提交
243
  // 1. close current opened time window
244
  if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId))) {
245
    SResultRowPosition pos = pResultRowInfo->cur;
X
Xiaoyu Wang 已提交
246
    SFilePage*         pPage = getBufPage(pResultBuf, pos.pageId);
247 248 249 250 251
    releaseBufPage(pResultBuf, pPage);
  }

  // allocate a new buffer page
  if (pResult == NULL) {
H
Haojun Liao 已提交
252
    ASSERT(pSup->resultRowSize > 0);
253
    pResult = getNewResultRow(pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
254

255 256
    // add a new result set for a new group
    SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
257
    tSimpleHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos,
L
Liu Jicong 已提交
258
                   sizeof(SResultRowPosition));
H
Haojun Liao 已提交
259 260
  }

261 262 263
  // 2. set the new time window to be the new active time window
  pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};

H
Haojun Liao 已提交
264
  // too many time window in query
265
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH &&
266
      tSimpleHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) {
267
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
H
Haojun Liao 已提交
268 269
  }

H
Haojun Liao 已提交
270
  return pResult;
H
Haojun Liao 已提交
271 272
}

273
// a new buffer page for each table. Needs to opt this design
L
Liu Jicong 已提交
274
static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, int32_t tid, uint32_t size) {
275 276 277 278
  if (pWindowRes->pageId != -1) {
    return 0;
  }

L
Liu Jicong 已提交
279
  SFilePage* pData = NULL;
280 281 282

  // in the first scan, new space needed for results
  int32_t pageId = -1;
283
  SIDList list = getDataBufPagesIdList(pResultBuf);
284 285

  if (taosArrayGetSize(list) == 0) {
286
    pData = getNewBufPage(pResultBuf, &pageId);
287
    pData->num = sizeof(SFilePage);
288 289
  } else {
    SPageInfo* pi = getLastPageInfo(list);
290
    pData = getBufPage(pResultBuf, getPageId(pi));
291
    pageId = getPageId(pi);
292

293
    if (pData->num + size > getBufPageSize(pResultBuf)) {
294
      // release current page first, and prepare the next one
295
      releaseBufPageInfo(pResultBuf, pi);
296

297
      pData = getNewBufPage(pResultBuf, &pageId);
298
      if (pData != NULL) {
299
        pData->num = sizeof(SFilePage);
300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319
      }
    }
  }

  if (pData == NULL) {
    return -1;
  }

  // set the number of rows in current disk page
  if (pWindowRes->pageId == -1) {  // not allocated yet, allocate new buffer
    pWindowRes->pageId = pageId;
    pWindowRes->offset = (int32_t)pData->num;

    pData->num += size;
    assert(pWindowRes->pageId >= 0);
  }

  return 0;
}

320
//  query_range_start, query_range_end, window_duration, window_start, window_end
321
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow) {
322 323 324
  pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP;
  pColData->info.bytes = sizeof(int64_t);

H
Haojun Liao 已提交
325
  colInfoDataEnsureCapacity(pColData, 5, false);
326 327 328 329 330 331 332 333 334
  colDataAppendInt64(pColData, 0, &pQueryWindow->skey);
  colDataAppendInt64(pColData, 1, &pQueryWindow->ekey);

  int64_t interval = 0;
  colDataAppendInt64(pColData, 2, &interval);  // this value may be variable in case of 'n' and 'y'.
  colDataAppendInt64(pColData, 3, &pQueryWindow->skey);
  colDataAppendInt64(pColData, 4, &pQueryWindow->ekey);
}

L
Liu Jicong 已提交
335
void cleanupExecTimeWindowInfo(SColumnInfoData* pColData) { colDataDestroy(pColData); }
H
Haojun Liao 已提交
336

337 338 339 340 341 342 343 344 345 346 347 348 349 350
typedef struct {
  bool    hasAgg;
  int32_t numOfRows;
  int32_t startOffset;
} SFunctionCtxStatus;

static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
  pStatus->hasAgg = pCtx->input.colDataAggIsSet;
  pStatus->numOfRows = pCtx->input.numOfRows;
  pStatus->startOffset = pCtx->input.startRowIndex;
}

static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
  pCtx->input.colDataAggIsSet = pStatus->hasAgg;
H
Haojun Liao 已提交
351
  pCtx->input.numOfRows = pStatus->numOfRows;
352 353 354 355 356
  pCtx->input.startRowIndex = pStatus->startOffset;
}

void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, int32_t offset,
                      int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput) {
357
  for (int32_t k = 0; k < numOfOutput; ++k) {
H
Haojun Liao 已提交
358
    // keep it temporarily
359 360
    SFunctionCtxStatus status = {0};
    functionCtxSave(&pCtx[k], &status);
361

362
    pCtx[k].input.startRowIndex = offset;
363
    pCtx[k].input.numOfRows = forwardStep;
364 365 366

    // not a whole block involved in query processing, statistics data can not be used
    // NOTE: the original value of isSet have been changed here
367 368
    if (pCtx[k].input.colDataAggIsSet && forwardStep < numOfTotal) {
      pCtx[k].input.colDataAggIsSet = false;
369 370
    }

371 372
    if (fmIsWindowPseudoColumnFunc(pCtx[k].functionId)) {
      SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]);
373 374

      char* p = GET_ROWCELL_INTERBUF(pEntryInfo);
375

376
      SColumnInfoData idata = {0};
dengyihao's avatar
dengyihao 已提交
377
      idata.info.type = TSDB_DATA_TYPE_BIGINT;
378
      idata.info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
dengyihao's avatar
dengyihao 已提交
379
      idata.pData = p;
380 381 382 383

      SScalarParam out = {.columnData = &idata};
      SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData};
      pCtx[k].sfp.process(&tw, 1, &out);
384
      pEntryInfo->numOfRes = 1;
385 386 387 388 389 390 391 392
    } else {
      int32_t code = TSDB_CODE_SUCCESS;
      if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) {
        code = pCtx[k].fpSet.process(&pCtx[k]);

        if (code != TSDB_CODE_SUCCESS) {
          qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code));
          taskInfo->code = code;
393
          T_LONG_JMP(taskInfo->env, code);
394
        }
395
      }
396

397
      // restore it
398
      functionCtxRestore(&pCtx[k], &status);
399
    }
400 401 402
  }
}

403 404
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
                                   bool createDummyCol);
405

406 407 408
static void doSetInputDataBlockInfo(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order) {
  SqlFunctionCtx* pCtx = pExprSup->pCtx;
  for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
409
    pCtx[i].order = order;
410
    pCtx[i].input.numOfRows = pBlock->info.rows;
411
    setBlockSMAInfo(&pCtx[i], &pExprSup->pExprInfo[i], pBlock);
412
    pCtx[i].pSrcBlock = pBlock;
413 414 415
  }
}

416
void setInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol) {
417
  if (pBlock->pBlockAgg != NULL) {
418
    doSetInputDataBlockInfo(pExprSup, pBlock, order);
419
  } else {
420
    doSetInputDataBlock(pExprSup, pBlock, order, scanFlag, createDummyCol);
H
Haojun Liao 已提交
421
  }
422 423
}

L
Liu Jicong 已提交
424 425
static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t paramIndex,
                                             int32_t numOfRows) {
426 427 428 429 430 431 432 433
  SColumnInfoData* pColInfo = NULL;
  if (pInput->pData[paramIndex] == NULL) {
    pColInfo = taosMemoryCalloc(1, sizeof(SColumnInfoData));
    if (pColInfo == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

    // Set the correct column info (data type and bytes)
434 435
    pColInfo->info.type = pFuncParam->param.nType;
    pColInfo->info.bytes = pFuncParam->param.nLen;
436 437

    pInput->pData[paramIndex] = pColInfo;
438 439
  } else {
    pColInfo = pInput->pData[paramIndex];
440 441
  }

H
Haojun Liao 已提交
442
  colInfoDataEnsureCapacity(pColInfo, numOfRows, false);
443

444
  int8_t type = pFuncParam->param.nType;
445 446
  if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT) {
    int64_t v = pFuncParam->param.i;
dengyihao's avatar
dengyihao 已提交
447
    for (int32_t i = 0; i < numOfRows; ++i) {
448 449 450 451
      colDataAppendInt64(pColInfo, i, &v);
    }
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
    double v = pFuncParam->param.d;
dengyihao's avatar
dengyihao 已提交
452
    for (int32_t i = 0; i < numOfRows; ++i) {
453 454
      colDataAppendDouble(pColInfo, i, &v);
    }
455
  } else if (type == TSDB_DATA_TYPE_VARCHAR) {
L
Liu Jicong 已提交
456
    char* tmp = taosMemoryMalloc(pFuncParam->param.nLen + VARSTR_HEADER_SIZE);
457
    STR_WITH_SIZE_TO_VARSTR(tmp, pFuncParam->param.pz, pFuncParam->param.nLen);
L
Liu Jicong 已提交
458
    for (int32_t i = 0; i < numOfRows; ++i) {
459 460
      colDataAppend(pColInfo, i, tmp, false);
    }
H
Haojun Liao 已提交
461
    taosMemoryFree(tmp);
462 463 464 465 466
  }

  return TSDB_CODE_SUCCESS;
}

467 468
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
                                   bool createDummyCol) {
469
  int32_t         code = TSDB_CODE_SUCCESS;
470
  SqlFunctionCtx* pCtx = pExprSup->pCtx;
471

472
  for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
L
Liu Jicong 已提交
473
    pCtx[i].order = order;
474 475
    pCtx[i].input.numOfRows = pBlock->info.rows;

L
Liu Jicong 已提交
476
    pCtx[i].pSrcBlock = pBlock;
X
Xiaoyu Wang 已提交
477
    pCtx[i].scanFlag = scanFlag;
H
Haojun Liao 已提交
478

479
    SInputColumnInfoData* pInput = &pCtx[i].input;
480
    pInput->uid = pBlock->info.uid;
C
Cary Xu 已提交
481
    pInput->colDataAggIsSet = false;
482

483
    SExprInfo* pOneExpr = &pExprSup->pExprInfo[i];
484
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
dengyihao's avatar
dengyihao 已提交
485
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
G
Ganlin Zhao 已提交
486 487
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
        int32_t slotId = pFuncParam->pCol->slotId;
dengyihao's avatar
dengyihao 已提交
488
        pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
489 490 491
        pInput->totalRows = pBlock->info.rows;
        pInput->numOfRows = pBlock->info.rows;
        pInput->startRowIndex = 0;
492

493
        // NOTE: the last parameter is the primary timestamp column
H
Haojun Liao 已提交
494
        // todo: refactor this
495
        if (fmIsImplicitTsFunc(pCtx[i].functionId) && (j == pOneExpr->base.numOfParams - 1)) {
L
Liu Jicong 已提交
496
          pInput->pPTS = pInput->pData[j];  // in case of merge function, this is not always the ts column data.
497
          //          ASSERT(pInput->pPTS->info.type == TSDB_DATA_TYPE_TIMESTAMP);
498
        }
499 500
        ASSERT(pInput->pData[j] != NULL);
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
501 502 503
        // todo avoid case: top(k, 12), 12 is the value parameter.
        // sum(11), 11 is also the value parameter.
        if (createDummyCol && pOneExpr->base.numOfParams == 1) {
504 505 506 507
          pInput->totalRows = pBlock->info.rows;
          pInput->numOfRows = pBlock->info.rows;
          pInput->startRowIndex = 0;

508
          code = doCreateConstantValColumnInfo(pInput, pFuncParam, j, pBlock->info.rows);
509 510 511
          if (code != TSDB_CODE_SUCCESS) {
            return code;
          }
512
        }
G
Ganlin Zhao 已提交
513 514
      }
    }
H
Haojun Liao 已提交
515
  }
516 517

  return code;
H
Haojun Liao 已提交
518 519
}

520
static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) {
521
  for (int32_t k = 0; k < pOperator->exprSupp.numOfExprs; ++k) {
H
Haojun Liao 已提交
522
    if (functionNeedToExecute(&pCtx[k])) {
523
      // todo add a dummy funtion to avoid process check
524 525 526
      if (pCtx[k].fpSet.process == NULL) {
        continue;
      }
H
Haojun Liao 已提交
527

528 529 530 531
      int32_t code = pCtx[k].fpSet.process(&pCtx[k]);
      if (code != TSDB_CODE_SUCCESS) {
        qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
        return code;
532
      }
533 534
    }
  }
535 536

  return TSDB_CODE_SUCCESS;
537 538
}

H
Haojun Liao 已提交
539
static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, SArray* pPseudoList) {
dengyihao's avatar
dengyihao 已提交
540
  size_t num = (pPseudoList != NULL) ? taosArrayGetSize(pPseudoList) : 0;
H
Haojun Liao 已提交
541 542 543 544 545
  for (int32_t i = 0; i < num; ++i) {
    pCtx[i].pOutput = taosArrayGet(pResult->pDataBlock, i);
  }
}

546
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
X
Xiaoyu Wang 已提交
547
                              int32_t numOfOutput, SArray* pPseudoList) {
H
Haojun Liao 已提交
548
  setPseudoOutputColInfo(pResult, pCtx, pPseudoList);
549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568

  if (pSrcBlock == NULL) {
    for (int32_t k = 0; k < numOfOutput; ++k) {
      int32_t outputSlotId = pExpr[k].base.resSchema.slotId;

      ASSERT(pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE);
      SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);

      int32_t type = pExpr[k].base.pParam[0].param.nType;
      if (TSDB_DATA_TYPE_NULL == type) {
        colDataAppendNNULL(pColInfoData, 0, 1);
      } else {
        colDataAppend(pColInfoData, 0, taosVariantGet(&pExpr[k].base.pParam[0].param, type), false);
      }
    }

    pResult->info.rows = 1;
    return TSDB_CODE_SUCCESS;
  }

L
Liu Jicong 已提交
569 570 571 572
  if (pResult != pSrcBlock) {
    pResult->info.groupId = pSrcBlock->info.groupId;
    memcpy(pResult->info.parTbName, pSrcBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
  }
H
Haojun Liao 已提交
573

574 575
  // if the source equals to the destination, it is to create a new column as the result of scalar
  // function or some operators.
576
  bool createNewColModel = (pResult == pSrcBlock);
577 578 579
  if (createNewColModel) {
    blockDataEnsureCapacity(pResult, pResult->info.rows);
  }
580

581 582
  int32_t numOfRows = 0;

583
  for (int32_t k = 0; k < numOfOutput; ++k) {
584 585
    int32_t               outputSlotId = pExpr[k].base.resSchema.slotId;
    SqlFunctionCtx*       pfCtx = &pCtx[k];
586
    SInputColumnInfoData* pInputData = &pfCtx->input;
587

L
Liu Jicong 已提交
588
    if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) {  // it is a project query
589
      SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
590
      if (pResult->info.rows > 0 && !createNewColModel) {
591
        colDataMergeCol(pColInfoData, pResult->info.rows, (int32_t*)&pResult->info.capacity, pInputData->pData[0],
592
                        pInputData->numOfRows);
593
      } else {
594
        colDataAssign(pColInfoData, pInputData->pData[0], pInputData->numOfRows, &pResult->info);
595
      }
596

597
      numOfRows = pInputData->numOfRows;
598
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) {
599
      SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
600

dengyihao's avatar
dengyihao 已提交
601
      int32_t offset = createNewColModel ? 0 : pResult->info.rows;
602 603 604 605 606 607 608 609

      int32_t type = pExpr[k].base.pParam[0].param.nType;
      if (TSDB_DATA_TYPE_NULL == type) {
        colDataAppendNNULL(pColInfoData, offset, pSrcBlock->info.rows);
      } else {
        for (int32_t i = 0; i < pSrcBlock->info.rows; ++i) {
          colDataAppend(pColInfoData, i + offset, taosVariantGet(&pExpr[k].base.pParam[0].param, type), false);
        }
610
      }
611 612

      numOfRows = pSrcBlock->info.rows;
H
Haojun Liao 已提交
613
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_OPERATOR) {
614 615 616
      SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
      taosArrayPush(pBlockList, &pSrcBlock);

617
      SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId);
618
      SColumnInfoData  idata = {.info = pResColData->info, .hasNull = true};
619

620
      SScalarParam dest = {.columnData = &idata};
X
Xiaoyu Wang 已提交
621
      int32_t      code = scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest);
622 623 624 625
      if (code != TSDB_CODE_SUCCESS) {
        taosArrayDestroy(pBlockList);
        return code;
      }
626

dengyihao's avatar
dengyihao 已提交
627
      int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
628
      ASSERT(pResult->info.capacity > 0);
629

630
      colDataMergeCol(pResColData, startOffset, (int32_t*)&pResult->info.capacity, &idata, dest.numOfRows);
D
dapan1121 已提交
631
      colDataDestroy(&idata);
L
Liu Jicong 已提交
632

633
      numOfRows = dest.numOfRows;
634 635
      taosArrayDestroy(pBlockList);
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) {
636 637
      // _rowts/_c0, not tbname column
      if (fmIsPseudoColumnFunc(pfCtx->functionId) && (!fmIsScanPseudoColumnFunc(pfCtx->functionId))) {
H
Haojun Liao 已提交
638
        // do nothing
639
      } else if (fmIsIndefiniteRowsFunc(pfCtx->functionId)) {
640 641
        SResultRowEntryInfo* pResInfo = GET_RES_INFO(pfCtx);
        pfCtx->fpSet.init(pfCtx, pResInfo);
642 643 644 645 646 647 648 649 650 651

        pfCtx->pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId);
        pfCtx->offset = createNewColModel ? 0 : pResult->info.rows;  // set the start offset

        // set the timestamp(_rowts) output buffer
        if (taosArrayGetSize(pPseudoList) > 0) {
          int32_t* outputColIndex = taosArrayGet(pPseudoList, 0);
          pfCtx->pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput;
        }

652 653 654 655 656
        // link pDstBlock to set selectivity value
        if (pfCtx->subsidiaries.num > 0) {
          pfCtx->pDstBlock = pResult;
        }

657
        numOfRows = pfCtx->fpSet.process(pfCtx);
H
Haojun Liao 已提交
658
      } else if (fmIsAggFunc(pfCtx->functionId)) {
G
Ganlin Zhao 已提交
659
        // selective value output should be set during corresponding function execution
660 661 662
        if (fmIsSelectValueFunc(pfCtx->functionId)) {
          continue;
        }
663 664
        // _group_key function for "partition by tbname" + csum(col_name) query
        SColumnInfoData* pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId);
665
        int32_t          slotId = pfCtx->param[0].pCol->slotId;
666 667 668

        // todo handle the json tag
        SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
669
        for (int32_t f = 0; f < pSrcBlock->info.rows; ++f) {
670 671 672 673 674 675 676 677 678
          bool isNull = colDataIsNull_s(pInput, f);
          if (isNull) {
            colDataAppendNULL(pOutput, pResult->info.rows + f);
          } else {
            char* data = colDataGetData(pInput, f);
            colDataAppend(pOutput, pResult->info.rows + f, data, isNull);
          }
        }

H
Haojun Liao 已提交
679 680 681
      } else {
        SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
        taosArrayPush(pBlockList, &pSrcBlock);
G
Ganlin Zhao 已提交
682

683
        SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId);
684
        SColumnInfoData  idata = {.info = pResColData->info, .hasNull = true};
H
Haojun Liao 已提交
685

686
        SScalarParam dest = {.columnData = &idata};
X
Xiaoyu Wang 已提交
687
        int32_t      code = scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest);
688 689 690 691
        if (code != TSDB_CODE_SUCCESS) {
          taosArrayDestroy(pBlockList);
          return code;
        }
692

dengyihao's avatar
dengyihao 已提交
693
        int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
694
        ASSERT(pResult->info.capacity > 0);
695
        colDataMergeCol(pResColData, startOffset, (int32_t*)&pResult->info.capacity, &idata, dest.numOfRows);
D
dapan1121 已提交
696
        colDataDestroy(&idata);
697 698

        numOfRows = dest.numOfRows;
H
Haojun Liao 已提交
699 700
        taosArrayDestroy(pBlockList);
      }
701
    } else {
702
      return TSDB_CODE_OPS_NOT_SUPPORT;
703 704
    }
  }
705

706 707 708
  if (!createNewColModel) {
    pResult->info.rows += numOfRows;
  }
709 710

  return TSDB_CODE_SUCCESS;
711 712
}

5
54liuyao 已提交
713
bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
714
  struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
715

716 717 718 719 720
  // in case of timestamp column, always generated results.
  int32_t functionId = pCtx->functionId;
  if (functionId == -1) {
    return false;
  }
721

722 723
  if (pCtx->scanFlag == REPEAT_SCAN) {
    return fmIsRepeatScanFunc(pCtx->functionId);
724 725
  }

726 727
  if (isRowEntryCompleted(pResInfo)) {
    return false;
728 729
  }

730 731 732
  return true;
}

733 734 735 736 737 738 739
static int32_t doCreateConstantValColumnAggInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t type,
                                                int32_t paramIndex, int32_t numOfRows) {
  if (pInput->pData[paramIndex] == NULL) {
    pInput->pData[paramIndex] = taosMemoryCalloc(1, sizeof(SColumnInfoData));
    if (pInput->pData[paramIndex] == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
740

741 742 743
    // Set the correct column info (data type and bytes)
    pInput->pData[paramIndex]->info.type = type;
    pInput->pData[paramIndex]->info.bytes = tDataTypes[type].bytes;
744
  }
H
Haojun Liao 已提交
745

746 747 748 749 750 751
  SColumnDataAgg* da = NULL;
  if (pInput->pColumnDataAgg[paramIndex] == NULL) {
    da = taosMemoryCalloc(1, sizeof(SColumnDataAgg));
    pInput->pColumnDataAgg[paramIndex] = da;
    if (da == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
752 753
    }
  } else {
754
    da = pInput->pColumnDataAgg[paramIndex];
755 756
  }

757
  ASSERT(!IS_VAR_DATA_TYPE(type));
758

759 760
  if (type == TSDB_DATA_TYPE_BIGINT) {
    int64_t v = pFuncParam->param.i;
761
    *da = (SColumnDataAgg){.numOfNull = 0, .min = v, .max = v, .sum = v * numOfRows};
762 763
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
    double v = pFuncParam->param.d;
764
    *da = (SColumnDataAgg){.numOfNull = 0};
765

766 767 768 769 770 771
    *(double*)&da->min = v;
    *(double*)&da->max = v;
    *(double*)&da->sum = v * numOfRows;
  } else if (type == TSDB_DATA_TYPE_BOOL) {  // todo validate this data type
    bool v = pFuncParam->param.i;

772
    *da = (SColumnDataAgg){.numOfNull = 0};
773 774 775 776 777
    *(bool*)&da->min = 0;
    *(bool*)&da->max = v;
    *(bool*)&da->sum = v * numOfRows;
  } else if (type == TSDB_DATA_TYPE_TIMESTAMP) {
    // do nothing
778
  } else {
779
    ASSERT(0);
780 781
  }

782 783
  return TSDB_CODE_SUCCESS;
}
784

785
void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pBlock) {
786 787 788 789 790 791 792 793 794
  int32_t numOfRows = pBlock->info.rows;

  SInputColumnInfoData* pInput = &pCtx->input;
  pInput->numOfRows = numOfRows;
  pInput->totalRows = numOfRows;

  if (pBlock->pBlockAgg != NULL) {
    pInput->colDataAggIsSet = true;

795 796
    for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
      SFunctParam* pFuncParam = &pExprInfo->base.pParam[j];
797

798 799
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
        int32_t slotId = pFuncParam->pCol->slotId;
800 801 802 803
        pInput->pColumnDataAgg[j] = pBlock->pBlockAgg[slotId];
        if (pInput->pColumnDataAgg[j] == NULL) {
          pInput->colDataAggIsSet = false;
        }
804 805 806 807

        // Here we set the column info data since the data type for each column data is required, but
        // the data in the corresponding SColumnInfoData will not be used.
        pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
808 809
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
        doCreateConstantValColumnAggInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows);
810 811
      }
    }
812
  } else {
813
    pInput->colDataAggIsSet = false;
814 815 816
  }
}

L
Liu Jicong 已提交
817
bool isTaskKilled(SExecTaskInfo* pTaskInfo) {
818 819
  // query has been executed more than tsShellActivityTimer, and the retrieve has not arrived
  // abort current query execution.
L
Liu Jicong 已提交
820 821
  if (pTaskInfo->owner != 0 &&
      ((taosGetTimestampSec() - pTaskInfo->cost.start / 1000) > 10 * getMaximumIdleDurationSec())
822
    /*(!needBuildResAfterQueryComplete(pTaskInfo))*/) {
823
    assert(pTaskInfo->cost.start != 0);
L
Liu Jicong 已提交
824 825 826
    //    qDebug("QInfo:%" PRIu64 " retrieve not arrive beyond %d ms, abort current query execution, start:%" PRId64
    //           ", current:%d", pQInfo->qId, 1, pQInfo->startExecTs, taosGetTimestampSec());
    //    return true;
827 828 829 830 831
  }

  return false;
}

L
Liu Jicong 已提交
832
void setTaskKilled(SExecTaskInfo* pTaskInfo) { pTaskInfo->code = TSDB_CODE_TSC_QUERY_CANCELLED; }
833 834

/////////////////////////////////////////////////////////////////////////////////////////////
835
STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key) {
L
Liu Jicong 已提交
836
  STimeWindow win = {0};
837
  win.skey = taosTimeTruncate(key, pInterval, precision);
838 839

  /*
H
Haojun Liao 已提交
840
   * if the realSkey > INT64_MAX - pInterval->interval, the query duration between
841 842
   * realSkey and realEkey must be less than one interval.Therefore, no need to adjust the query ranges.
   */
843 844 845
  win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
  if (win.ekey < win.skey) {
    win.ekey = INT64_MAX;
846
  }
847 848

  return win;
849 850
}

L
Liu Jicong 已提交
851 852
int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
                              uint32_t* status) {
853
  *status = BLK_DATA_NOT_LOAD;
854

H
Haojun Liao 已提交
855
  pBlock->pDataBlock = NULL;
L
Liu Jicong 已提交
856
  pBlock->pBlockAgg = NULL;
H
Haojun Liao 已提交
857

L
Liu Jicong 已提交
858 859
  //  int64_t groupId = pRuntimeEnv->current->groupIndex;
  //  bool    ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
860

H
Haojun Liao 已提交
861
  STaskCostInfo* pCost = &pTaskInfo->cost;
862

863 864
//  pCost->totalBlocks += 1;
//  pCost->totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
865
#if 0
866 867 868
  // Calculate all time windows that are overlapping or contain current data block.
  // If current data block is contained by all possible time window, do not load current data block.
  if (/*pQueryAttr->pFilters || */pQueryAttr->groupbyColumn || pQueryAttr->sw.gap > 0 ||
H
Haojun Liao 已提交
869
      (QUERY_IS_INTERVAL_QUERY(pQueryAttr) && overlapWithTimeWindow(pTaskInfo, &pBlock->info))) {
870
    (*status) = BLK_DATA_DATA_LOAD;
871 872 873
  }

  // check if this data block is required to load
874
  if ((*status) != BLK_DATA_DATA_LOAD) {
875 876 877 878 879 880 881
    bool needFilter = true;

    // the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet,
    // the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer
    if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
      SResultRow* pResult = NULL;

H
Haojun Liao 已提交
882
      bool  masterScan = IS_MAIN_SCAN(pRuntimeEnv);
883 884 885 886 887 888
      TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;

      STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
      if (pQueryAttr->pointInterpQuery) {
        needFilter = chkWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, groupId,
                                    pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
889
                                    pTableScanInfo->rowEntryInfoOffset);
890 891 892
      } else {
        if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.uid, &win, masterScan, &pResult, groupId,
                                    pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
893
                                    pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) {
894
          T_LONG_JMP(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
895 896 897 898
        }
      }
    } else if (pQueryAttr->stableQuery && (!pQueryAttr->tsCompQuery) && (!pQueryAttr->diffQuery)) { // stable aggregate, not interval aggregate or normal column aggregate
      doSetTableGroupOutputBuf(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx,
899
                               pTableScanInfo->rowEntryInfoOffset, pTableScanInfo->numOfOutput,
900 901 902 903 904 905
                               pRuntimeEnv->current->groupIndex);
    }

    if (needFilter) {
      (*status) = doFilterByBlockTimeWindow(pTableScanInfo, pBlock);
    } else {
906
      (*status) = BLK_DATA_DATA_LOAD;
907 908 909 910
    }
  }

  SDataBlockInfo* pBlockInfo = &pBlock->info;
H
Haojun Liao 已提交
911
//  *status = updateBlockLoadStatus(pRuntimeEnv->pQueryAttr, *status);
912

913
  if ((*status) == BLK_DATA_NOT_LOAD || (*status) == BLK_DATA_FILTEROUT) {
914 915
    //qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey,
//           pBlockInfo->window.ekey, pBlockInfo->rows);
916
    pCost->skipBlocks += 1;
917
  } else if ((*status) == BLK_DATA_SMA_LOAD) {
918 919
    // this function never returns error?
    pCost->loadBlockStatis += 1;
920
//    tsdbRetrieveDatablockSMA(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg);
921 922

    if (pBlock->pBlockAgg == NULL) {  // data block statistics does not exist, load data block
923
//      pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL);
924 925 926
      pCost->totalCheckedRows += pBlock->info.rows;
    }
  } else {
927
    assert((*status) == BLK_DATA_DATA_LOAD);
928 929 930

    // load the data block statistics to perform further filter
    pCost->loadBlockStatis += 1;
931
//    tsdbRetrieveDatablockSMA(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg);
932 933 934 935 936 937

    if (pQueryAttr->topBotQuery && pBlock->pBlockAgg != NULL) {
      { // set previous window
        if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
          SResultRow* pResult = NULL;

H
Haojun Liao 已提交
938
          bool  masterScan = IS_MAIN_SCAN(pRuntimeEnv);
939 940 941 942 943
          TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;

          STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
          if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.uid, &win, masterScan, &pResult, groupId,
                                      pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
944
                                      pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) {
945
            T_LONG_JMP(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
946 947 948 949 950 951 952 953 954 955
          }
        }
      }
      bool load = false;
      for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
        int32_t functionId = pTableScanInfo->pCtx[i].functionId;
        if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM) {
//          load = topbot_datablock_filter(&pTableScanInfo->pCtx[i], (char*)&(pBlock->pBlockAgg[i].min),
//                                         (char*)&(pBlock->pBlockAgg[i].max));
          if (!load) { // current block has been discard due to filter applied
956
            pCost->skipBlocks += 1;
957 958
            //qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId,
//                   pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
959
            (*status) = BLK_DATA_FILTEROUT;
960 961 962 963 964 965 966
            return TSDB_CODE_SUCCESS;
          }
        }
      }
    }

    // current block has been discard due to filter applied
H
Haojun Liao 已提交
967
//    if (!doFilterByBlockSMA(pRuntimeEnv, pBlock->pBlockAgg, pTableScanInfo->pCtx, pBlockInfo->rows)) {
968
//      pCost->skipBlocks += 1;
969 970
//      qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey,
//             pBlockInfo->window.ekey, pBlockInfo->rows);
971
//      (*status) = BLK_DATA_FILTEROUT;
972 973 974 975 976
//      return TSDB_CODE_SUCCESS;
//    }

    pCost->totalCheckedRows += pBlockInfo->rows;
    pCost->loadBlocks += 1;
977
//    pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL);
978 979 980 981 982
//    if (pBlock->pDataBlock == NULL) {
//      return terrno;
//    }

//    if (pQueryAttr->pFilters != NULL) {
983
//      filterSetColFieldData(pQueryAttr->pFilters, taosArrayGetSize(pBlock->pDataBlock), pBlock->pDataBlock);
984
//    }
985

986 987 988 989
//    if (pQueryAttr->pFilters != NULL || pRuntimeEnv->pTsBuf != NULL) {
//      filterColRowsInDataBlock(pRuntimeEnv, pBlock, ascQuery);
//    }
  }
H
Haojun Liao 已提交
990
#endif
991 992 993
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
994
static void updateTableQueryInfoForReverseScan(STableQueryInfo* pTableQueryInfo) {
995 996 997 998 999
  if (pTableQueryInfo == NULL) {
    return;
  }
}

L
Liu Jicong 已提交
1000
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) {
1001
  if (status == TASK_NOT_COMPLETED) {
H
Haojun Liao 已提交
1002
    pTaskInfo->status = status;
1003 1004
  } else {
    // QUERY_NOT_COMPLETED is not compatible with any other status, so clear its position first
1005
    CLEAR_QUERY_STATUS(pTaskInfo, TASK_NOT_COMPLETED);
H
Haojun Liao 已提交
1006
    pTaskInfo->status |= status;
1007 1008 1009
  }
}

1010
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset) {
5
54liuyao 已提交
1011
  bool init = false;
1012
  for (int32_t i = 0; i < numOfOutput; ++i) {
1013
    pCtx[i].resultInfo = getResultEntryInfo(pResult, i, rowEntryInfoOffset);
5
54liuyao 已提交
1014 1015 1016
    if (init) {
      continue;
    }
1017 1018 1019 1020 1021

    struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
    if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) {
      continue;
    }
1022 1023 1024 1025 1026

    if (fmIsWindowPseudoColumnFunc(pCtx[i].functionId)) {
      continue;
    }

1027 1028 1029 1030 1031 1032
    if (!pResInfo->initialized) {
      if (pCtx[i].functionId != -1) {
        pCtx[i].fpSet.init(&pCtx[i], pResInfo);
      } else {
        pResInfo->initialized = true;
      }
5
54liuyao 已提交
1033 1034
    } else {
      init = true;
1035 1036 1037 1038
    }
  }
}

1039 1040
static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep,
                                                int32_t status);
1041

H
Haojun Liao 已提交
1042 1043
void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo) {
  if (pFilterInfo == NULL || pBlock->info.rows == 0) {
S
shenglian zhou 已提交
1044 1045
    return;
  }
1046

1047
  SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
H
Haojun Liao 已提交
1048
  int32_t code = filterSetDataFromSlotId(pFilterInfo, &param1);
1049

1050
  SColumnInfoData* p = NULL;
1051
  int32_t          status = 0;
H
Haojun Liao 已提交
1052

1053
  // todo the keep seems never to be True??
H
Haojun Liao 已提交
1054
  bool keep = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status);
1055
  extractQualifiedTupleByFilterResult(pBlock, p, keep, status);
H
Haojun Liao 已提交
1056

1057
  if (pColMatchInfo != NULL) {
H
Haojun Liao 已提交
1058 1059
    size_t  size = taosArrayGetSize(pColMatchInfo->pList);
    for (int32_t i = 0; i < size; ++i) {
H
Haojun Liao 已提交
1060
      SColMatchItem* pInfo = taosArrayGet(pColMatchInfo->pList, i);
1061
      if (pInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
1062
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pInfo->dstSlotId);
1063
        if (pColData->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
H
Haojun Liao 已提交
1064
          blockDataUpdateTsWindow(pBlock, pInfo->dstSlotId);
1065 1066 1067 1068 1069 1070
          break;
        }
      }
    }
  }

1071 1072
  colDataDestroy(p);
  taosMemoryFree(p);
1073 1074
}

1075
void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep, int32_t status) {
1076 1077 1078 1079
  if (keep) {
    return;
  }

H
Haojun Liao 已提交
1080 1081 1082
  int32_t totalRows = pBlock->info.rows;

  if (status == FILTER_RESULT_ALL_QUALIFIED) {
1083
    // here nothing needs to be done
H
Haojun Liao 已提交
1084
  } else if (status == FILTER_RESULT_NONE_QUALIFIED) {
1085
    pBlock->info.rows = 0;
H
Haojun Liao 已提交
1086
  } else {
1087
    SSDataBlock* px = createOneDataBlock(pBlock, true);
1088

1089 1090
    size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
    for (int32_t i = 0; i < numOfCols; ++i) {
1091 1092
      SColumnInfoData* pSrc = taosArrayGet(px->pDataBlock, i);
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
1093
      // it is a reserved column for scalar function, and no data in this column yet.
1094
      if (pDst->pData == NULL || pSrc->pData == NULL) {
1095 1096 1097
        continue;
      }

1098 1099
      colInfoDataCleanup(pDst, pBlock->info.rows);

1100
      int32_t numOfRows = 0;
1101
      for (int32_t j = 0; j < totalRows; ++j) {
1102
        if (((int8_t*)p->pData)[j] == 0) {
D
dapan1121 已提交
1103 1104
          continue;
        }
1105

D
dapan1121 已提交
1106
        if (colDataIsNull_s(pSrc, j)) {
1107
          colDataAppendNULL(pDst, numOfRows);
D
dapan1121 已提交
1108
        } else {
1109
          colDataAppend(pDst, numOfRows, colDataGetData(pSrc, j), false);
D
dapan1121 已提交
1110
        }
1111
        numOfRows += 1;
H
Haojun Liao 已提交
1112
      }
1113

1114
      // todo this value can be assigned directly
1115 1116 1117 1118 1119
      if (pBlock->info.rows == totalRows) {
        pBlock->info.rows = numOfRows;
      } else {
        ASSERT(pBlock->info.rows == numOfRows);
      }
1120
    }
1121

dengyihao's avatar
dengyihao 已提交
1122
    blockDataDestroy(px);  // fix memory leak
1123 1124 1125
  }
}

1126
void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
1127
  // for simple group by query without interval, all the tables belong to one group result.
1128 1129 1130
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
  SAggOperatorInfo* pAggInfo = pOperator->info;

1131
  SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo;
1132 1133
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
  int32_t*        rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
1134

1135
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId,
L
Liu Jicong 已提交
1136
                                                  sizeof(groupId), true, groupId, pTaskInfo, false, &pAggInfo->aggSup);
L
Liu Jicong 已提交
1137
  assert(pResultRow != NULL);
1138 1139 1140 1141 1142 1143

  /*
   * not assign result buffer yet, add new result buffer
   * all group belong to one result set, and each group result has different group id so set the id to be one
   */
  if (pResultRow->pageId == -1) {
dengyihao's avatar
dengyihao 已提交
1144 1145
    int32_t ret =
        addNewWindowResultBuf(pResultRow, pAggInfo->aggSup.pResultBuf, groupId, pAggInfo->binfo.pRes->info.rowSize);
1146 1147 1148 1149 1150
    if (ret != TSDB_CODE_SUCCESS) {
      return;
    }
  }

1151
  setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
1152 1153
}

1154 1155 1156
static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
  SAggOperatorInfo* pAggInfo = pOperator->info;
  if (pAggInfo->groupId != UINT64_MAX && pAggInfo->groupId == groupId) {
1157 1158
    return;
  }
1159 1160

  doSetTableGroupOutputBuf(pOperator, numOfOutput, groupId);
1161 1162

  // record the current active group id
H
Haojun Liao 已提交
1163
  pAggInfo->groupId = groupId;
1164 1165
}

dengyihao's avatar
dengyihao 已提交
1166 1167
static void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs,
                              const int32_t* rowCellOffset) {
1168
  bool returnNotNull = false;
1169
  for (int32_t j = 0; j < numOfExprs; ++j) {
1170
    struct SResultRowEntryInfo* pResInfo = getResultEntryInfo(pRow, j, rowCellOffset);
1171 1172 1173 1174 1175 1176 1177
    if (!isRowEntryInitialized(pResInfo)) {
      continue;
    }

    if (pRow->numOfRows < pResInfo->numOfRes) {
      pRow->numOfRows = pResInfo->numOfRes;
    }
1178

1179
    if (fmIsNotNullOutputFunc(pCtx[j].functionId)) {
1180 1181
      returnNotNull = true;
    }
1182
  }
S
shenglian zhou 已提交
1183 1184
  // if all expr skips all blocks, e.g. all null inputs for max function, output one row in final result.
  //  except for first/last, which require not null output, output no rows
1185
  if (pRow->numOfRows == 0 && !returnNotNull) {
1186
    pRow->numOfRows = 1;
1187 1188 1189
  }
}

1190 1191
static void doCopyResultToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx,
                                    SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo) {
1192 1193 1194
  for (int32_t j = 0; j < numOfExprs; ++j) {
    int32_t slotId = pExprInfo[j].base.resSchema.slotId;

1195
    pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
1196
    if (pCtx[j].fpSet.finalize) {
1197
      if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_group_key") == 0) {
1198 1199
        // for groupkey along with functions that output multiple lines(e.g. Histogram)
        // need to match groupkey result for each output row of that function.
1200 1201 1202 1203 1204
        if (pCtx[j].resultInfo->numOfRes != 0) {
          pCtx[j].resultInfo->numOfRes = pRow->numOfRows;
        }
      }

1205 1206 1207
      int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
      if (TAOS_FAILED(code)) {
        qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
1208
        T_LONG_JMP(pTaskInfo->env, code);
1209 1210
      }
    } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
1211
      // do nothing
1212
    } else {
1213 1214
      // expand the result into multiple rows. E.g., _wstart, top(k, 20)
      // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
1215 1216 1217 1218 1219 1220 1221
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
      char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
      for (int32_t k = 0; k < pRow->numOfRows; ++k) {
        colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
      }
    }
  }
1222 1223
}

1224 1225 1226
// todo refactor. SResultRow has direct pointer in miainfo
int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
                           SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252
  SFilePage*  page = getBufPage(pBuf, resultRowPosition->pageId);
  SResultRow* pRow = (SResultRow*)((char*)page + resultRowPosition->offset);

  SqlFunctionCtx* pCtx = pSup->pCtx;
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  const int32_t*  rowEntryOffset = pSup->rowEntryInfoOffset;

  doUpdateNumOfRows(pCtx, pRow, pSup->numOfExprs, rowEntryOffset);
  if (pRow->numOfRows == 0) {
    releaseBufPage(pBuf, page);
    return 0;
  }

  int32_t size = pBlock->info.capacity;
  while (pBlock->info.rows + pRow->numOfRows > size) {
    size = size * 1.25;
  }

  int32_t code = blockDataEnsureCapacity(pBlock, size);
  if (TAOS_FAILED(code)) {
    releaseBufPage(pBuf, page);
    qError("%s ensure result data capacity failed, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
    T_LONG_JMP(pTaskInfo->env, code);
  }

  doCopyResultToDataBlock(pExprInfo, pSup->numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
1253 1254

  releaseBufPage(pBuf, page);
1255
  pBlock->info.rows += pRow->numOfRows;
1256 1257 1258
  return 0;
}

1259 1260 1261 1262 1263 1264 1265
int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
                           SGroupResInfo* pGroupResInfo) {
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  int32_t         numOfExprs = pSup->numOfExprs;
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
  SqlFunctionCtx* pCtx = pSup->pCtx;

1266
  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
1267

1268
  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
L
Liu Jicong 已提交
1269 1270
    SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
    SFilePage*  page = getBufPage(pBuf, pPos->pos.pageId);
1271

1272
    SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset);
1273

H
Haojun Liao 已提交
1274
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
1275 1276

    // no results, continue to check the next one
1277 1278
    if (pRow->numOfRows == 0) {
      pGroupResInfo->index += 1;
1279
      releaseBufPage(pBuf, page);
1280 1281 1282
      continue;
    }

1283 1284 1285 1286 1287
    if (pBlock->info.groupId == 0) {
      pBlock->info.groupId = pPos->groupId;
    } else {
      // current value belongs to different group, it can't be packed into one datablock
      if (pBlock->info.groupId != pPos->groupId) {
1288
        releaseBufPage(pBuf, page);
1289 1290 1291 1292
        break;
      }
    }

1293
    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
1294
      ASSERT(pBlock->info.rows > 0);
1295
      releaseBufPage(pBuf, page);
1296 1297 1298 1299
      break;
    }

    pGroupResInfo->index += 1;
1300
    doCopyResultToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
1301

1302
    releaseBufPage(pBuf, page);
1303
    pBlock->info.rows += pRow->numOfRows;
1304 1305
  }

X
Xiaoyu Wang 已提交
1306 1307
  qDebug("%s result generated, rows:%d, groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
         pBlock->info.groupId);
1308

1309
  blockDataUpdateTsWindow(pBlock, 0);
1310 1311 1312
  return 0;
}

1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352
void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
                           SDiskbasedBuf* pBuf) {
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  SSDataBlock*   pBlock = pbInfo->pRes;

  // set output datablock version
  pBlock->info.version = pTaskInfo->version;

  blockDataCleanup(pBlock);
  if (!hasRemainResults(pGroupResInfo)) {
    return;
  }

  // clear the existed group id
  pBlock->info.groupId = 0;
  ASSERT(!pbInfo->mergeResultBlock);
  doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
  if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
    SStreamStateAggOperatorInfo* pInfo = pOperator->info;

    char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t));
    if (tbname != NULL) {
      memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
    } else {
      pBlock->info.parTbName[0] = 0;
    }
  } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
             pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
             pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
    SStreamSessionAggOperatorInfo* pInfo = pOperator->info;

    char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t));
    if (tbname != NULL) {
      memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
    } else {
      pBlock->info.parTbName[0] = 0;
    }
  }
}

X
Xiaoyu Wang 已提交
1353 1354
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
                            SDiskbasedBuf* pBuf) {
1355
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1356
  SSDataBlock*   pBlock = pbInfo->pRes;
1357

1358 1359 1360
  // set output datablock version
  pBlock->info.version = pTaskInfo->version;

1361
  blockDataCleanup(pBlock);
1362
  if (!hasRemainResults(pGroupResInfo)) {
1363 1364 1365
    return;
  }

1366 1367
  // clear the existed group id
  pBlock->info.groupId = 0;
1368 1369 1370
  if (!pbInfo->mergeResultBlock) {
    doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
  } else {
dengyihao's avatar
dengyihao 已提交
1371
    while (hasRemainResults(pGroupResInfo)) {
1372 1373 1374
      doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
      if (pBlock->info.rows >= pOperator->resultInfo.threshold) {
        break;
1375 1376
      }

1377 1378
      // clearing group id to continue to merge data that belong to different groups
      pBlock->info.groupId = 0;
1379
    }
1380 1381 1382

    // clear the group id info in SSDataBlock, since the client does not need it
    pBlock->info.groupId = 0;
1383 1384 1385
  }
}

L
Liu Jicong 已提交
1386 1387
void queryCostStatis(SExecTaskInfo* pTaskInfo) {
  STaskCostInfo* pSummary = &pTaskInfo->cost;
1388

1389 1390
  SFileBlockLoadRecorder* pRecorder = pSummary->pRecoder;
  if (pSummary->pRecoder != NULL) {
1391
    qDebug(
H
Haojun Liao 已提交
1392 1393 1394 1395 1396
        "%s :cost summary: elapsed time:%.2f ms, extract tableList:%.2f ms, createGroupIdMap:%.2f ms, total blocks:%d, "
        "load block SMA:%d, load data block:%d, total rows:%" PRId64 ", check rows:%" PRId64,
        GET_TASKID(pTaskInfo), pSummary->elapsedTime / 1000.0, pSummary->extractListTime, pSummary->groupIdMapTime,
        pRecorder->totalBlocks, pRecorder->loadBlockStatis, pRecorder->loadBlocks, pRecorder->totalRows,
        pRecorder->totalCheckedRows);
1397
  }
1398 1399
}

L
Liu Jicong 已提交
1400 1401 1402
// static void updateOffsetVal(STaskRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBlockInfo) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//   STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
1403
//
L
Liu Jicong 已提交
1404
//   int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
1405
//
L
Liu Jicong 已提交
1406 1407 1408 1409
//   if (pQueryAttr->limit.offset == pBlockInfo->rows) {  // current block will ignore completed
//     pTableQueryInfo->lastKey = QUERY_IS_ASC_QUERY(pQueryAttr) ? pBlockInfo->window.ekey + step :
//     pBlockInfo->window.skey + step; pQueryAttr->limit.offset = 0; return;
//   }
1410
//
L
Liu Jicong 已提交
1411 1412 1413 1414 1415
//   if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//     pQueryAttr->pos = (int32_t)pQueryAttr->limit.offset;
//   } else {
//     pQueryAttr->pos = pBlockInfo->rows - (int32_t)pQueryAttr->limit.offset - 1;
//   }
1416
//
L
Liu Jicong 已提交
1417
//   assert(pQueryAttr->pos >= 0 && pQueryAttr->pos <= pBlockInfo->rows - 1);
1418
//
L
Liu Jicong 已提交
1419 1420
//   SArray *         pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
//   SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
1421
//
L
Liu Jicong 已提交
1422 1423
//   // update the pQueryAttr->limit.offset value, and pQueryAttr->pos value
//   TSKEY *keys = (TSKEY *) pColInfoData->pData;
1424
//
L
Liu Jicong 已提交
1425 1426 1427
//   // update the offset value
//   pTableQueryInfo->lastKey = keys[pQueryAttr->pos];
//   pQueryAttr->limit.offset = 0;
1428
//
L
Liu Jicong 已提交
1429
//   int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock);
1430
//
L
Liu Jicong 已提交
1431 1432 1433 1434
//   //qDebug("QInfo:0x%"PRIx64" check data block, brange:%" PRId64 "-%" PRId64 ", numBlocksOfStep:%d, numOfRes:%d,
//   lastKey:%"PRId64, GET_TASKID(pRuntimeEnv),
//          pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes, pQuery->current->lastKey);
// }
1435

L
Liu Jicong 已提交
1436 1437
// void skipBlocks(STaskRuntimeEnv *pRuntimeEnv) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
1438
//
L
Liu Jicong 已提交
1439 1440 1441
//   if (pQueryAttr->limit.offset <= 0 || pQueryAttr->numOfFilterCols > 0) {
//     return;
//   }
1442
//
L
Liu Jicong 已提交
1443 1444
//   pQueryAttr->pos = 0;
//   int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
1445
//
L
Liu Jicong 已提交
1446 1447
//   STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
//   TsdbQueryHandleT pTsdbReadHandle = pRuntimeEnv->pTsdbReadHandle;
1448
//
L
Liu Jicong 已提交
1449 1450 1451
//   SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
//   while (tsdbNextDataBlock(pTsdbReadHandle)) {
//     if (isTaskKilled(pRuntimeEnv->qinfo)) {
1452
//       T_LONG_JMP(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
L
Liu Jicong 已提交
1453
//     }
1454
//
L
Liu Jicong 已提交
1455
//     tsdbRetrieveDataBlockInfo(pTsdbReadHandle, &blockInfo);
1456
//
L
Liu Jicong 已提交
1457 1458 1459 1460
//     if (pQueryAttr->limit.offset > blockInfo.rows) {
//       pQueryAttr->limit.offset -= blockInfo.rows;
//       pTableQueryInfo->lastKey = (QUERY_IS_ASC_QUERY(pQueryAttr)) ? blockInfo.window.ekey : blockInfo.window.skey;
//       pTableQueryInfo->lastKey += step;
1461
//
L
Liu Jicong 已提交
1462 1463 1464 1465 1466 1467 1468
//       //qDebug("QInfo:0x%"PRIx64" skip rows:%d, offset:%" PRId64, GET_TASKID(pRuntimeEnv), blockInfo.rows,
//              pQuery->limit.offset);
//     } else {  // find the appropriated start position in current block
//       updateOffsetVal(pRuntimeEnv, &blockInfo);
//       break;
//     }
//   }
1469
//
L
Liu Jicong 已提交
1470
//   if (terrno != TSDB_CODE_SUCCESS) {
1471
//     T_LONG_JMP(pRuntimeEnv->env, terrno);
L
Liu Jicong 已提交
1472 1473 1474 1475 1476 1477 1478
//   }
// }

// static TSKEY doSkipIntervalProcess(STaskRuntimeEnv* pRuntimeEnv, STimeWindow* win, SDataBlockInfo* pBlockInfo,
// STableQueryInfo* pTableQueryInfo) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//   SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
1479
//
L
Liu Jicong 已提交
1480 1481 1482
//   assert(pQueryAttr->limit.offset == 0);
//   STimeWindow tw = *win;
//   getNextTimeWindow(pQueryAttr, &tw);
1483
//
L
Liu Jicong 已提交
1484 1485
//   if ((tw.skey <= pBlockInfo->window.ekey && QUERY_IS_ASC_QUERY(pQueryAttr)) ||
//       (tw.ekey >= pBlockInfo->window.skey && !QUERY_IS_ASC_QUERY(pQueryAttr))) {
1486
//
L
Liu Jicong 已提交
1487 1488 1489 1490
//     // load the data block and check data remaining in current data block
//     // TODO optimize performance
//     SArray *         pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
//     SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
1491
//
L
Liu Jicong 已提交
1492 1493 1494 1495
//     tw = *win;
//     int32_t startPos =
//         getNextQualifiedWindow(pQueryAttr, &tw, pBlockInfo, pColInfoData->pData, binarySearchForKey, -1);
//     assert(startPos >= 0);
1496
//
L
Liu Jicong 已提交
1497 1498
//     // set the abort info
//     pQueryAttr->pos = startPos;
1499
//
L
Liu Jicong 已提交
1500 1501 1502 1503
//     // reset the query start timestamp
//     pTableQueryInfo->win.skey = ((TSKEY *)pColInfoData->pData)[startPos];
//     pQueryAttr->window.skey = pTableQueryInfo->win.skey;
//     TSKEY key = pTableQueryInfo->win.skey;
1504
//
L
Liu Jicong 已提交
1505 1506
//     pWindowResInfo->prevSKey = tw.skey;
//     int32_t index = pRuntimeEnv->resultRowInfo.curIndex;
1507
//
L
Liu Jicong 已提交
1508 1509
//     int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock);
//     pRuntimeEnv->resultRowInfo.curIndex = index;  // restore the window index
1510
//
L
Liu Jicong 已提交
1511 1512 1513 1514
//     //qDebug("QInfo:0x%"PRIx64" check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d,
//     lastKey:%" PRId64,
//            GET_TASKID(pRuntimeEnv), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes,
//            pQueryAttr->current->lastKey);
1515
//
L
Liu Jicong 已提交
1516 1517 1518 1519 1520
//     return key;
//   } else {  // do nothing
//     pQueryAttr->window.skey      = tw.skey;
//     pWindowResInfo->prevSKey = tw.skey;
//     pTableQueryInfo->lastKey = tw.skey;
1521
//
L
Liu Jicong 已提交
1522 1523
//     return tw.skey;
//   }
1524
//
L
Liu Jicong 已提交
1525 1526 1527 1528 1529 1530 1531 1532 1533 1534
//   return true;
// }

// static bool skipTimeInterval(STaskRuntimeEnv *pRuntimeEnv, TSKEY* start) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//   if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//     assert(*start <= pRuntimeEnv->current->lastKey);
//   } else {
//     assert(*start >= pRuntimeEnv->current->lastKey);
//   }
1535
//
L
Liu Jicong 已提交
1536 1537 1538 1539 1540
//   // if queried with value filter, do NOT forward query start position
//   if (pQueryAttr->limit.offset <= 0 || pQueryAttr->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL ||
//   pRuntimeEnv->pFillInfo != NULL) {
//     return true;
//   }
1541
//
L
Liu Jicong 已提交
1542 1543 1544 1545 1546 1547 1548
//   /*
//    * 1. for interval without interpolation query we forward pQueryAttr->interval.interval at a time for
//    *    pQueryAttr->limit.offset times. Since hole exists, pQueryAttr->interval.interval*pQueryAttr->limit.offset
//    value is
//    *    not valid. otherwise, we only forward pQueryAttr->limit.offset number of points
//    */
//   assert(pRuntimeEnv->resultRowInfo.prevSKey == TSKEY_INITIAL_VAL);
1549
//
L
Liu Jicong 已提交
1550 1551
//   STimeWindow w = TSWINDOW_INITIALIZER;
//   bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
1552
//
L
Liu Jicong 已提交
1553 1554
//   SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
//   STableQueryInfo *pTableQueryInfo = pRuntimeEnv->current;
1555
//
L
Liu Jicong 已提交
1556 1557 1558
//   SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
//   while (tsdbNextDataBlock(pRuntimeEnv->pTsdbReadHandle)) {
//     tsdbRetrieveDataBlockInfo(pRuntimeEnv->pTsdbReadHandle, &blockInfo);
1559
//
L
Liu Jicong 已提交
1560 1561 1562 1563 1564 1565 1566 1567 1568
//     if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//       if (pWindowResInfo->prevSKey == TSKEY_INITIAL_VAL) {
//         getAlignQueryTimeWindow(pQueryAttr, blockInfo.window.skey, blockInfo.window.skey, pQueryAttr->window.ekey,
//         &w); pWindowResInfo->prevSKey = w.skey;
//       }
//     } else {
//       getAlignQueryTimeWindow(pQueryAttr, blockInfo.window.ekey, pQueryAttr->window.ekey, blockInfo.window.ekey, &w);
//       pWindowResInfo->prevSKey = w.skey;
//     }
1569
//
L
Liu Jicong 已提交
1570 1571
//     // the first time window
//     STimeWindow win = getActiveTimeWindow(pWindowResInfo, pWindowResInfo->prevSKey, pQueryAttr);
1572
//
L
Liu Jicong 已提交
1573 1574
//     while (pQueryAttr->limit.offset > 0) {
//       STimeWindow tw = win;
1575
//
L
Liu Jicong 已提交
1576 1577 1578
//       if ((win.ekey <= blockInfo.window.ekey && ascQuery) || (win.ekey >= blockInfo.window.skey && !ascQuery)) {
//         pQueryAttr->limit.offset -= 1;
//         pWindowResInfo->prevSKey = win.skey;
1579
//
L
Liu Jicong 已提交
1580 1581 1582 1583 1584 1585
//         // current time window is aligned with blockInfo.window.ekey
//         // restart it from next data block by set prevSKey to be TSKEY_INITIAL_VAL;
//         if ((win.ekey == blockInfo.window.ekey && ascQuery) || (win.ekey == blockInfo.window.skey && !ascQuery)) {
//           pWindowResInfo->prevSKey = TSKEY_INITIAL_VAL;
//         }
//       }
1586
//
L
Liu Jicong 已提交
1587 1588 1589 1590
//       if (pQueryAttr->limit.offset == 0) {
//         *start = doSkipIntervalProcess(pRuntimeEnv, &win, &blockInfo, pTableQueryInfo);
//         return true;
//       }
1591
//
L
Liu Jicong 已提交
1592 1593
//       // current window does not ended in current data block, try next data block
//       getNextTimeWindow(pQueryAttr, &tw);
1594
//
L
Liu Jicong 已提交
1595 1596 1597 1598 1599 1600 1601 1602 1603
//       /*
//        * If the next time window still starts from current data block,
//        * load the primary timestamp column first, and then find the start position for the next queried time window.
//        * Note that only the primary timestamp column is required.
//        * TODO: Optimize for this cases. All data blocks are not needed to be loaded, only if the first actually
//        required
//        * time window resides in current data block.
//        */
//       if ((tw.skey <= blockInfo.window.ekey && ascQuery) || (tw.ekey >= blockInfo.window.skey && !ascQuery)) {
1604
//
L
Liu Jicong 已提交
1605 1606
//         SArray *pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
//         SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
1607
//
L
Liu Jicong 已提交
1608 1609 1610
//         if ((win.ekey > blockInfo.window.ekey && ascQuery) || (win.ekey < blockInfo.window.skey && !ascQuery)) {
//           pQueryAttr->limit.offset -= 1;
//         }
1611
//
L
Liu Jicong 已提交
1612 1613 1614 1615 1616 1617 1618 1619
//         if (pQueryAttr->limit.offset == 0) {
//           *start = doSkipIntervalProcess(pRuntimeEnv, &win, &blockInfo, pTableQueryInfo);
//           return true;
//         } else {
//           tw = win;
//           int32_t startPos =
//               getNextQualifiedWindow(pQueryAttr, &tw, &blockInfo, pColInfoData->pData, binarySearchForKey, -1);
//           assert(startPos >= 0);
1620
//
L
Liu Jicong 已提交
1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631
//           // set the abort info
//           pQueryAttr->pos = startPos;
//           pTableQueryInfo->lastKey = ((TSKEY *)pColInfoData->pData)[startPos];
//           pWindowResInfo->prevSKey = tw.skey;
//           win = tw;
//         }
//       } else {
//         break;  // offset is not 0, and next time window begins or ends in the next block.
//       }
//     }
//   }
1632
//
L
Liu Jicong 已提交
1633 1634
//   // check for error
//   if (terrno != TSDB_CODE_SUCCESS) {
1635
//     T_LONG_JMP(pRuntimeEnv->env, terrno);
L
Liu Jicong 已提交
1636
//   }
1637
//
L
Liu Jicong 已提交
1638 1639
//   return true;
// }
1640

1641
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num) {
H
Haojun Liao 已提交
1642
  if (p->pDownstream == NULL) {
H
Haojun Liao 已提交
1643
    assert(p->numOfDownstream == 0);
1644 1645
  }

wafwerar's avatar
wafwerar 已提交
1646
  p->pDownstream = taosMemoryCalloc(1, num * POINTER_BYTES);
1647 1648 1649 1650 1651 1652 1653
  if (p->pDownstream == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  memcpy(p->pDownstream, pDownstream, num * POINTER_BYTES);
  p->numOfDownstream = num;
  return TSDB_CODE_SUCCESS;
1654 1655
}

1656 1657 1658 1659
typedef struct SFetchRspHandleWrapper {
  uint32_t exchangeId;
  int32_t  sourceIndex;
} SFetchRspHandleWrapper;
1660

D
dapan1121 已提交
1661
int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
1662
  SFetchRspHandleWrapper* pWrapper = (SFetchRspHandleWrapper*)param;
1663 1664 1665 1666

  SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pWrapper->exchangeId);
  if (pExchangeInfo == NULL) {
    qWarn("failed to acquire exchange operator, since it may have been released");
1667
    taosMemoryFree(pMsg->pData);
1668 1669 1670
    return TSDB_CODE_SUCCESS;
  }

X
Xiaoyu Wang 已提交
1671
  int32_t          index = pWrapper->sourceIndex;
1672
  SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index);
1673

H
Haojun Liao 已提交
1674 1675
  if (code == TSDB_CODE_SUCCESS) {
    pSourceDataInfo->pRsp = pMsg->pData;
1676

H
Haojun Liao 已提交
1677 1678
    SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
    pRsp->numOfRows = htonl(pRsp->numOfRows);
dengyihao's avatar
dengyihao 已提交
1679
    pRsp->compLen = htonl(pRsp->compLen);
1680
    pRsp->numOfCols = htonl(pRsp->numOfCols);
dengyihao's avatar
dengyihao 已提交
1681
    pRsp->useconds = htobe64(pRsp->useconds);
1682
    pRsp->numOfBlocks = htonl(pRsp->numOfBlocks);
1683

1684
    ASSERT(pRsp != NULL);
H
Haojun Liao 已提交
1685 1686
    qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%d", pSourceDataInfo->taskId, index, pRsp->numOfBlocks,
           pRsp->numOfRows);
H
Haojun Liao 已提交
1687
  } else {
1688
    taosMemoryFree(pMsg->pData);
H
Haojun Liao 已提交
1689
    pSourceDataInfo->code = code;
1690
    qDebug("%s fetch rsp received, index:%d, error:%s", pSourceDataInfo->taskId, index, tstrerror(code));
H
Haojun Liao 已提交
1691
  }
H
Haojun Liao 已提交
1692

H
Haojun Liao 已提交
1693
  pSourceDataInfo->status = EX_SOURCE_DATA_READY;
1694 1695 1696 1697

  tsem_post(&pExchangeInfo->ready);
  taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId);

wmmhello's avatar
wmmhello 已提交
1698
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1699 1700
}

D
dapan1121 已提交
1701
void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
S
Shengliang Guan 已提交
1702 1703
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
  assert(pMsg->info.ahandle != NULL);
H
Haojun Liao 已提交
1704 1705 1706 1707

  SDataBuf buf = {.len = pMsg->contLen, .pData = NULL};

  if (pMsg->contLen > 0) {
wafwerar's avatar
wafwerar 已提交
1708
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
H
Haojun Liao 已提交
1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719
    if (buf.pData == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
    } else {
      memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
    }
  }

  pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
  rpcFreeCont(pMsg->pCont);
  destroySendMsgInfo(pSendInfo);
1720 1721
}

L
Liu Jicong 已提交
1722
static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) {
1723
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
1724

L
Liu Jicong 已提交
1725 1726
  SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, sourceIndex);
  SSourceDataInfo*       pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
1727

1728 1729
  ASSERT(pDataInfo->status == EX_SOURCE_DATA_NOT_READY);

1730
  SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper));
X
Xiaoyu Wang 已提交
1731
  pWrapper->exchangeId = pExchangeInfo->self;
1732 1733
  pWrapper->sourceIndex = sourceIndex;

D
dapan1121 已提交
1734 1735
  if (pSource->localExec) {
    SDataBuf pBuf = {0};
1736 1737 1738
    int32_t  code =
        (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->schedId, pTaskInfo->id.queryId,
                                    pSource->taskId, 0, pSource->execId, &pBuf.pData, pTaskInfo->localFetch.explainRes);
D
dapan1121 已提交
1739
    loadRemoteDataCallback(pWrapper, &pBuf, code);
D
dapan1121 已提交
1740
    taosMemoryFree(pWrapper);
D
dapan1121 已提交
1741
  } else {
D
dapan1121 已提交
1742 1743 1744
    SResFetchReq* pMsg = taosMemoryCalloc(1, sizeof(SResFetchReq));
    if (NULL == pMsg) {
      pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
H
Haojun Liao 已提交
1745
      taosMemoryFree(pWrapper);
D
dapan1121 已提交
1746 1747 1748
      return pTaskInfo->code;
    }

D
dapan1121 已提交
1749
    qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", execId:%d, %d/%" PRIzu,
1750 1751
           GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId,
           pSource->execId, sourceIndex, totalSources);
D
dapan1121 已提交
1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762

    pMsg->header.vgId = htonl(pSource->addr.nodeId);
    pMsg->sId = htobe64(pSource->schedId);
    pMsg->taskId = htobe64(pSource->taskId);
    pMsg->queryId = htobe64(pTaskInfo->id.queryId);
    pMsg->execId = htonl(pSource->execId);

    // send the fetch remote task result reques
    SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
    if (NULL == pMsgSendInfo) {
      taosMemoryFreeClear(pMsg);
H
Haojun Liao 已提交
1763
      taosMemoryFree(pWrapper);
D
dapan1121 已提交
1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774
      qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
      pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
      return pTaskInfo->code;
    }

    pMsgSendInfo->param = pWrapper;
    pMsgSendInfo->paramFreeFp = taosMemoryFree;
    pMsgSendInfo->msgInfo.pData = pMsg;
    pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq);
    pMsgSendInfo->msgType = pSource->fetchMsgType;
    pMsgSendInfo->fp = loadRemoteDataCallback;
1775

D
dapan1121 已提交
1776
    int64_t transporterId = 0;
1777 1778
    int32_t code =
        asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
D
dapan1121 已提交
1779
  }
1780

1781 1782 1783
  return TSDB_CODE_SUCCESS;
}

1784 1785 1786 1787 1788 1789 1790 1791
void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int32_t numOfRows, int32_t dataLen, int64_t startTs,
                          SOperatorInfo* pOperator) {
  pInfo->totalRows += numOfRows;
  pInfo->totalSize += dataLen;
  pInfo->totalElapsed += (taosGetTimestampUs() - startTs);
  pOperator->resultInfo.totalRows += numOfRows;
}

H
Haojun Liao 已提交
1792
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart) {
H
Haojun Liao 已提交
1793
  if (pColList == NULL) {  // data from other sources
1794
    blockDataCleanup(pRes);
dengyihao's avatar
dengyihao 已提交
1795
    *pNextStart = (char*)blockDecode(pRes, pData);
H
Haojun Liao 已提交
1796
  } else {  // extract data according to pColList
1797 1798 1799 1800 1801
    char* pStart = pData;

    int32_t numOfCols = htonl(*(int32_t*)pStart);
    pStart += sizeof(int32_t);

1802
    // todo refactor:extract method
1803
    SSysTableSchema* pSchema = (SSysTableSchema*)pStart;
dengyihao's avatar
dengyihao 已提交
1804
    for (int32_t i = 0; i < numOfCols; ++i) {
1805 1806 1807 1808 1809 1810 1811
      SSysTableSchema* p = (SSysTableSchema*)pStart;

      p->colId = htons(p->colId);
      p->bytes = htonl(p->bytes);
      pStart += sizeof(SSysTableSchema);
    }

1812
    SSDataBlock* pBlock = createDataBlock();
dengyihao's avatar
dengyihao 已提交
1813
    for (int32_t i = 0; i < numOfCols; ++i) {
1814 1815
      SColumnInfoData idata = createColumnInfoData(pSchema[i].type, pSchema[i].bytes, pSchema[i].colId);
      blockDataAppendColInfo(pBlock, &idata);
1816 1817
    }

1818
    blockDecode(pBlock, pStart);
1819
    blockDataEnsureCapacity(pRes, pBlock->info.rows);
1820

H
Haojun Liao 已提交
1821
    // data from mnode
1822
    pRes->info.rows = pBlock->info.rows;
1823 1824
    relocateColumnData(pRes, pColList, pBlock->pDataBlock, false);
    blockDataDestroy(pBlock);
1825
  }
1826

1827 1828
  // todo move this to time window aggregator, since the primary timestamp may not be known by exchange operator.
  blockDataUpdateTsWindow(pRes, 0);
1829 1830
  return TSDB_CODE_SUCCESS;
}
1831

L
Liu Jicong 已提交
1832 1833
static void* setAllSourcesCompleted(SOperatorInfo* pOperator, int64_t startTs) {
  SExchangeInfo* pExchangeInfo = pOperator->info;
1834
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
H
Haojun Liao 已提交
1835

1836
  int64_t              el = taosGetTimestampUs() - startTs;
H
Haojun Liao 已提交
1837
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
1838

H
Haojun Liao 已提交
1839
  pLoadInfo->totalElapsed += el;
H
Haojun Liao 已提交
1840

1841
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
L
Liu Jicong 已提交
1842 1843 1844
  qDebug("%s all %" PRIzu " sources are exhausted, total rows: %" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms",
         GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize,
         pLoadInfo->totalElapsed / 1000.0);
1845 1846 1847 1848 1849

  doSetOperatorCompleted(pOperator);
  return NULL;
}

1850 1851
static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
                                           SExecTaskInfo* pTaskInfo) {
1852 1853 1854 1855 1856
  int32_t code = 0;
  int64_t startTs = taosGetTimestampUs();
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);

  while (1) {
1857
//    printf("1\n");
1858
    tsem_wait(&pExchangeInfo->ready);
1859
//    printf("2\n");
1860

1861
//    int32_t completed = 0;
1862 1863
    for (int32_t i = 0; i < totalSources; ++i) {
      SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
1864
      if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
1865 1866
//        printf("========:%d is completed\n", i);
//        completed += 1;
H
Haojun Liao 已提交
1867 1868
        continue;
      }
1869

1870
//      printf("index:%d --------3\n", i);
1871
      if (pDataInfo->status != EX_SOURCE_DATA_READY) {
1872
//        printf("-----------%d, status:%d, continue\n", i, pDataInfo->status);
1873 1874 1875
        continue;
      }

1876 1877 1878 1879 1880
      if (pDataInfo->code != TSDB_CODE_SUCCESS) {
        code = pDataInfo->code;
        goto _error;
      }

L
Liu Jicong 已提交
1881
      SRetrieveTableRsp*     pRsp = pDataInfo->pRsp;
X
Xiaoyu Wang 已提交
1882
      SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, i);
1883

1884
      // todo
H
Haojun Liao 已提交
1885
      SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
1886
      if (pRsp->numOfRows == 0) {
1887 1888 1889 1890 1891 1892 1893 1894 1895 1896
        pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;

        int32_t completed = 0;
        for (int32_t k = 0; k < totalSources; ++k) {
          SSourceDataInfo* p = taosArrayGet(pExchangeInfo->pSourceDataInfo, k);
          if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
            completed += 1;
          }
        }

1897
        qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64
1898
                   ", totalRows:%" PRIu64 ", completed:%d try next %d/%" PRIzu,
D
dapan1121 已提交
1899
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows,
1900
               pExchangeInfo->loadInfo.totalRows, completed + 1, i + 1, totalSources);
D
dapan1121 已提交
1901
        taosMemoryFreeClear(pDataInfo->pRsp);
1902 1903 1904 1905 1906 1907 1908

        if (completed == totalSources) {
          setAllSourcesCompleted(pOperator, startTs);
          return;
        } else {
          break;
        }
1909
      }
H
Haojun Liao 已提交
1910

1911
      SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
dengyihao's avatar
dengyihao 已提交
1912 1913 1914
      int32_t            index = 0;
      char*              pStart = pRetrieveRsp->data;
      while (index++ < pRetrieveRsp->numOfBlocks) {
1915
        SSDataBlock* pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false);
H
Haojun Liao 已提交
1916
        code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart);
1917 1918 1919 1920 1921 1922
        if (code != 0) {
          taosMemoryFreeClear(pDataInfo->pRsp);
          goto _error;
        }

        taosArrayPush(pExchangeInfo->pResultBlockList, &pb);
1923 1924
      }

1925
      updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
1926

1927
      int32_t completed = 0;
1928
      if (pRsp->completed == 1) {
1929 1930 1931 1932 1933 1934 1935 1936 1937
        pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;

        for (int32_t k = 0; k < totalSources; ++k) {
          SSourceDataInfo* p = taosArrayGet(pExchangeInfo->pSourceDataInfo, k);
          if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
            completed += 1;
          }
        }

dengyihao's avatar
dengyihao 已提交
1938
        qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
1939 1940
               " execId:%d index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64
               ", total:%.2f Kb, completed:%d try next %d/%" PRIzu,
H
Haojun Liao 已提交
1941 1942
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfBlocks,
               pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0,
1943
               completed, i + 1, totalSources);
1944
      } else {
dengyihao's avatar
dengyihao 已提交
1945
        qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
1946
               " execId:%d blocks:%d, numOfRows:%d, totalRows:%" PRIu64 ", total:%.2f Kb",
dengyihao's avatar
dengyihao 已提交
1947 1948
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRsp->numOfBlocks,
               pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0);
1949 1950
      }

1951 1952
      taosMemoryFreeClear(pDataInfo->pRsp);

1953 1954
      if (pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED) {
        pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
1955 1956
        code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
        if (code != TSDB_CODE_SUCCESS) {
1957
          taosMemoryFreeClear(pDataInfo->pRsp);
1958 1959 1960 1961
          goto _error;
        }
      }

1962 1963 1964 1965
      if (completed == totalSources) {
        setAllSourcesCompleted(pOperator, startTs);
      }

1966
      return;
1967 1968
    }

1969 1970 1971 1972 1973 1974 1975 1976
    int32_t completed = 0;
    for (int32_t k = 0; k < totalSources; ++k) {
      SSourceDataInfo* p = taosArrayGet(pExchangeInfo->pSourceDataInfo, k);
      if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
        completed += 1;
      }
    }

1977
    if (completed == totalSources) {
1978 1979
      setAllSourcesCompleted(pOperator, startTs);
      return;
1980 1981 1982
    }
  }

1983
  _error:
1984 1985 1986
  pTaskInfo->code = code;
}

L
Liu Jicong 已提交
1987 1988 1989
static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
  SExchangeInfo* pExchangeInfo = pOperator->info;
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1990

L
Liu Jicong 已提交
1991
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);
1992 1993 1994
  int64_t startTs = taosGetTimestampUs();

  // Asynchronously send all fetch requests to all sources.
L
Liu Jicong 已提交
1995
  for (int32_t i = 0; i < totalSources; ++i) {
1996 1997
    int32_t code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1998 1999
      pTaskInfo->code = code;
      return code;
2000 2001 2002 2003
    }
  }

  int64_t endTs = taosGetTimestampUs();
2004
  qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%.2fms", GET_TASKID(pTaskInfo),
X
Xiaoyu Wang 已提交
2005
         totalSources, (endTs - startTs) / 1000.0);
2006

2007
  pOperator->status = OP_RES_TO_RETURN;
H
Haojun Liao 已提交
2008
  pOperator->cost.openCost = taosGetTimestampUs() - startTs;
2009

2010
  tsem_wait(&pExchangeInfo->ready);
2011
  tsem_post(&pExchangeInfo->ready);
H
Haojun Liao 已提交
2012
  return TSDB_CODE_SUCCESS;
2013 2014
}

2015
static int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
2016 2017
  SExchangeInfo* pExchangeInfo = pOperator->info;
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2018

L
Liu Jicong 已提交
2019
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);
2020
  int64_t startTs = taosGetTimestampUs();
2021

L
Liu Jicong 已提交
2022
  while (1) {
2023
    if (pExchangeInfo->current >= totalSources) {
2024 2025
      setAllSourcesCompleted(pOperator, startTs);
      return TSDB_CODE_SUCCESS;
2026
    }
2027

2028 2029 2030
    doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
    tsem_wait(&pExchangeInfo->ready);

dengyihao's avatar
dengyihao 已提交
2031
    SSourceDataInfo*       pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
X
Xiaoyu Wang 已提交
2032
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
2033

H
Haojun Liao 已提交
2034
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
2035 2036
      qError("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d error happens, code:%s", GET_TASKID(pTaskInfo),
             pSource->addr.nodeId, pSource->taskId, pSource->execId, tstrerror(pDataInfo->code));
H
Haojun Liao 已提交
2037
      pOperator->pTaskInfo->code = pDataInfo->code;
2038
      return pOperator->pTaskInfo->code;
H
Haojun Liao 已提交
2039 2040
    }

L
Liu Jicong 已提交
2041
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
H
Haojun Liao 已提交
2042
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
2043
    if (pRsp->numOfRows == 0) {
2044
      qDebug("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d %d of total completed, rowsOfSource:%" PRIu64
2045
                 ", totalRows:%" PRIu64 " try next",
D
dapan1121 已提交
2046
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pExchangeInfo->current + 1,
H
Haojun Liao 已提交
2047
             pDataInfo->totalRows, pLoadInfo->totalRows);
H
Haojun Liao 已提交
2048

2049
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
2050
      pExchangeInfo->current += 1;
D
dapan1121 已提交
2051
      taosMemoryFreeClear(pDataInfo->pRsp);
2052 2053
      continue;
    }
H
Haojun Liao 已提交
2054

2055 2056 2057
    SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;

    char*   pStart = pRetrieveRsp->data;
H
Haojun Liao 已提交
2058
    int32_t code = extractDataBlockFromFetchRsp(NULL, pStart, NULL, &pStart);
2059 2060

    if (pRsp->completed == 1) {
D
dapan1121 已提交
2061
      qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, rowsOfSource:%" PRIu64
2062
                 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
2063
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows,
2064 2065
             pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1,
             totalSources);
2066

2067
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
2068 2069
      pExchangeInfo->current += 1;
    } else {
D
dapan1121 已提交
2070
      qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, totalRows:%" PRIu64
2071
                 ", totalBytes:%" PRIu64,
2072
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows,
2073
             pLoadInfo->totalRows, pLoadInfo->totalSize);
2074 2075
    }

2076 2077 2078
    updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
    pDataInfo->totalRows += pRetrieveRsp->numOfRows;

2079
    taosMemoryFreeClear(pDataInfo->pRsp);
2080
    return TSDB_CODE_SUCCESS;
2081
  }
2082 2083
}

L
Liu Jicong 已提交
2084
static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
2085
  if (OPTR_IS_OPENED(pOperator)) {
H
Haojun Liao 已提交
2086 2087 2088
    return TSDB_CODE_SUCCESS;
  }

2089 2090
  int64_t st = taosGetTimestampUs();

L
Liu Jicong 已提交
2091
  SExchangeInfo* pExchangeInfo = pOperator->info;
2092
  if (!pExchangeInfo->seqLoadData) {
H
Haojun Liao 已提交
2093 2094 2095 2096 2097 2098
    int32_t code = prepareConcurrentlyLoad(pOperator);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
  }

2099
  OPTR_SET_OPENED(pOperator);
2100
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
2101 2102 2103
  return TSDB_CODE_SUCCESS;
}

2104 2105 2106 2107 2108
static void freeBlock(void* pParam) {
  SSDataBlock* pBlock = *(SSDataBlock**)pParam;
  blockDataDestroy(pBlock);
}

2109
static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
2110 2111
  SExchangeInfo* pExchangeInfo = pOperator->info;
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2112

2113
  pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
2114
  if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2115 2116
    return NULL;
  }
2117

2118
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
H
Haojun Liao 已提交
2119

2120
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
2121
  if (pOperator->status == OP_EXEC_DONE) {
L
Liu Jicong 已提交
2122 2123 2124
    qDebug("%s all %" PRIzu " source(s) are exhausted, total rows:%" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms",
           GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize,
           pLoadInfo->totalElapsed / 1000.0);
2125 2126 2127
    return NULL;
  }

2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139 2140
  size_t size = taosArrayGetSize(pExchangeInfo->pResultBlockList);
  if (size == 0 || pExchangeInfo->rspBlockIndex >= size) {
    pExchangeInfo->rspBlockIndex = 0;
    taosArrayClearEx(pExchangeInfo->pResultBlockList, freeBlock);
    if (pExchangeInfo->seqLoadData) {
      seqLoadRemoteData(pOperator);
    } else {
      concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
    }

    if (taosArrayGetSize(pExchangeInfo->pResultBlockList) == 0) {
      return NULL;
    }
2141
  }
2142 2143 2144

  // we have buffered retrieved datablock, return it directly
  return taosArrayGetP(pExchangeInfo->pResultBlockList, pExchangeInfo->rspBlockIndex++);
H
Haojun Liao 已提交
2145
}
2146

2147 2148 2149 2150 2151 2152 2153 2154
static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) {
  SExchangeInfo* pExchangeInfo = pOperator->info;
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

L
Liu Jicong 已提交
2155
  while (1) {
2156 2157 2158 2159 2160 2161 2162
    SSDataBlock* pBlock = doLoadRemoteDataImpl(pOperator);
    if (pBlock == NULL) {
      return NULL;
    }

    SLimitInfo* pLimitInfo = &pExchangeInfo->limitInfo;
    if (hasLimitOffsetInfo(pLimitInfo)) {
2163
      int32_t status = handleLimitOffset(pOperator, pLimitInfo, pBlock, false);
2164 2165 2166
      if (status == PROJECT_RETRIEVE_CONTINUE) {
        continue;
      } else if (status == PROJECT_RETRIEVE_DONE) {
2167
        size_t rows = pBlock->info.rows;
2168 2169 2170 2171 2172 2173
        pExchangeInfo->limitInfo.numOfOutputRows += rows;

        if (rows == 0) {
          doSetOperatorCompleted(pOperator);
          return NULL;
        } else {
2174
          return pBlock;
2175 2176 2177
        }
      }
    } else {
2178
      return pBlock;
2179 2180 2181 2182
    }
  }
}

2183
static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const char* id) {
2184
  pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
H
Haojun Liao 已提交
2185 2186
  if (pInfo->pSourceDataInfo == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
2187 2188
  }

L
Liu Jicong 已提交
2189
  for (int32_t i = 0; i < numOfSources; ++i) {
2190
    SSourceDataInfo dataInfo = {0};
H
Haojun Liao 已提交
2191
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
2192
    dataInfo.taskId = id;
L
Liu Jicong 已提交
2193
    dataInfo.index = i;
X
Xiaoyu Wang 已提交
2194
    SSourceDataInfo* pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
2195
    if (pDs == NULL) {
H
Haojun Liao 已提交
2196 2197 2198 2199 2200 2201 2202 2203
      taosArrayDestroy(pInfo->pSourceDataInfo);
      return TSDB_CODE_OUT_OF_MEMORY;
    }
  }

  return TSDB_CODE_SUCCESS;
}

2204
static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* pInfo, const char* id) {
2205
  size_t numOfSources = LIST_LENGTH(pExNode->pSrcEndPoints);
H
Haojun Liao 已提交
2206

2207
  if (numOfSources == 0) {
X
Xiaoyu Wang 已提交
2208
    qError("%s invalid number: %d of sources in exchange operator", id, (int32_t)numOfSources);
2209 2210 2211
    return TSDB_CODE_INVALID_PARA;
  }

H
Haojun Liao 已提交
2212
  pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode));
wmmhello's avatar
wmmhello 已提交
2213
  if (pInfo->pSources == NULL) {
2214
    return TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
2215 2216
  }

L
Liu Jicong 已提交
2217
  for (int32_t i = 0; i < numOfSources; ++i) {
D
dapan1121 已提交
2218
    SDownstreamSourceNode* pNode = (SDownstreamSourceNode*)nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i);
H
Haojun Liao 已提交
2219 2220
    taosArrayPush(pInfo->pSources, pNode);
  }
2221

2222
  initLimitInfo(pExNode->node.pLimit, pExNode->node.pSlimit, &pInfo->limitInfo);
2223 2224
  pInfo->self = taosAddRef(exchangeObjRefPool, pInfo);

2225
  return initDataSource(numOfSources, pInfo, id);
2226 2227 2228 2229 2230 2231
}

SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo) {
  SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo));
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
H
Haojun Liao 已提交
2232
    goto _error;
2233
  }
H
Haojun Liao 已提交
2234

2235
  int32_t code = initExchangeOperator(pExNode, pInfo, GET_TASKID(pTaskInfo));
2236 2237 2238
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2239 2240

  tsem_init(&pInfo->ready, 0, 0);
2241 2242
  pInfo->pDummyBlock = createResDataBlock(pExNode->node.pOutputDataBlockDesc);
  pInfo->pResultBlockList = taosArrayInit(1, POINTER_BYTES);
2243

2244
  pInfo->seqLoadData = false;
2245
  pInfo->pTransporter = pTransporter;
2246

2247
  pOperator->name = "ExchangeOperator";
X
Xiaoyu Wang 已提交
2248
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
X
Xiaoyu Wang 已提交
2249
  pOperator->blocking = false;
2250 2251
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
2252
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pDummyBlock->pDataBlock);
X
Xiaoyu Wang 已提交
2253
  pOperator->pTaskInfo = pTaskInfo;
2254

5
54liuyao 已提交
2255 2256
  pOperator->fpSet =
      createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL, destroyExchangeOperatorInfo, NULL);
2257
  return pOperator;
H
Haojun Liao 已提交
2258

2259
  _error:
H
Haojun Liao 已提交
2260
  if (pInfo != NULL) {
2261
    doDestroyExchangeOperatorInfo(pInfo);
H
Haojun Liao 已提交
2262 2263
  }

wafwerar's avatar
wafwerar 已提交
2264
  taosMemoryFreeClear(pOperator);
2265
  pTaskInfo->code = code;
H
Haojun Liao 已提交
2266
  return NULL;
2267 2268
}

dengyihao's avatar
dengyihao 已提交
2269 2270
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
                                const char* pKey);
2271

L
Liu Jicong 已提交
2272
static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char** buf, int32_t rowIndex) {
2273 2274 2275 2276
  size_t size = taosArrayGetSize(groupInfo);
  if (size == 0) {
    return true;
  }
2277

2278 2279
  for (int32_t i = 0; i < size; ++i) {
    int32_t* index = taosArrayGet(groupInfo, i);
2280

2281
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, *index);
L
Liu Jicong 已提交
2282
    bool             isNull = colDataIsNull(pColInfo, rowIndex, pBlock->info.rows, NULL);
2283

2284 2285 2286
    if ((isNull && buf[i] != NULL) || (!isNull && buf[i] == NULL)) {
      return false;
    }
2287

2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300
    char* pCell = colDataGetData(pColInfo, rowIndex);
    if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
      if (varDataLen(pCell) != varDataLen(buf[i])) {
        return false;
      } else {
        if (memcmp(varDataVal(pCell), varDataVal(buf[i]), varDataLen(pCell)) != 0) {
          return false;
        }
      }
    } else {
      if (memcmp(pCell, buf[i], pColInfo->info.bytes) != 0) {
        return false;
      }
2301 2302 2303
    }
  }

2304
  return 0;
2305 2306
}

2307
static bool saveCurrentTuple(char** rowColData, SArray* pColumnList, SSDataBlock* pBlock, int32_t rowIndex) {
L
Liu Jicong 已提交
2308
  int32_t size = (int32_t)taosArrayGetSize(pColumnList);
2309

L
Liu Jicong 已提交
2310 2311
  for (int32_t i = 0; i < size; ++i) {
    int32_t*         index = taosArrayGet(pColumnList, i);
2312
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, *index);
H
Haojun Liao 已提交
2313

2314 2315 2316
    char* data = colDataGetData(pColInfo, rowIndex);
    memcpy(rowColData[i], data, colDataGetLength(pColInfo, rowIndex));
  }
2317

2318 2319
  return true;
}
2320

X
Xiaoyu Wang 已提交
2321
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag) {
2322
  // todo add more information about exchange operation
2323
  int32_t type = pOperator->operatorType;
X
Xiaoyu Wang 已提交
2324
  if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE || type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN ||
2325
      type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN ||
2326
      type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN) {
2327 2328 2329
    *order = TSDB_ORDER_ASC;
    *scanFlag = MAIN_SCAN;
    return TSDB_CODE_SUCCESS;
2330
  } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
2331 2332 2333 2334
    STableScanInfo* pTableScanInfo = pOperator->info;
    *order = pTableScanInfo->cond.order;
    *scanFlag = pTableScanInfo->scanFlag;
    return TSDB_CODE_SUCCESS;
2335 2336 2337 2338 2339
  } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN) {
    STableMergeScanInfo* pTableScanInfo = pOperator->info;
    *order = pTableScanInfo->cond.order;
    *scanFlag = pTableScanInfo->scanFlag;
    return TSDB_CODE_SUCCESS;
2340
  } else {
H
Haojun Liao 已提交
2341
    if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) {
2342
      return TSDB_CODE_INVALID_PARA;
H
Haojun Liao 已提交
2343
    } else {
2344
      return getTableScanInfo(pOperator->pDownstream[0], order, scanFlag);
2345 2346 2347
    }
  }
}
2348

2349
// this is a blocking operator
L
Liu Jicong 已提交
2350
static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
2351 2352
  if (OPTR_IS_OPENED(pOperator)) {
    return TSDB_CODE_SUCCESS;
2353 2354
  }

H
Haojun Liao 已提交
2355
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
2356
  SAggOperatorInfo* pAggInfo = pOperator->info;
H
Haojun Liao 已提交
2357

2358 2359
  SExprSupp*     pSup = &pOperator->exprSupp;
  SOperatorInfo* downstream = pOperator->pDownstream[0];
2360

2361 2362
  int64_t st = taosGetTimestampUs();

2363 2364 2365
  int32_t order = TSDB_ORDER_ASC;
  int32_t scanFlag = MAIN_SCAN;

H
Haojun Liao 已提交
2366
  while (1) {
2367
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
2368 2369 2370 2371
    if (pBlock == NULL) {
      break;
    }

2372 2373
    int32_t code = getTableScanInfo(pOperator, &order, &scanFlag);
    if (code != TSDB_CODE_SUCCESS) {
2374
      T_LONG_JMP(pTaskInfo->env, code);
2375
    }
2376

2377
    // there is an scalar expression that needs to be calculated before apply the group aggregation.
2378 2379 2380
    if (pAggInfo->scalarExprSup.pExprInfo != NULL) {
      SExprSupp* pSup1 = &pAggInfo->scalarExprSup;
      code = projectApplyFunctions(pSup1->pExprInfo, pBlock, pBlock, pSup1->pCtx, pSup1->numOfExprs, NULL);
2381
      if (code != TSDB_CODE_SUCCESS) {
2382
        T_LONG_JMP(pTaskInfo->env, code);
2383
      }
2384 2385
    }

2386
    // the pDataBlock are always the same one, no need to call this again
2387
    setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.groupId);
2388
    setInputDataBlock(pSup, pBlock, order, scanFlag, true);
2389
    code = doAggregateImpl(pOperator, pSup->pCtx);
2390
    if (code != 0) {
2391
      T_LONG_JMP(pTaskInfo->env, code);
2392
    }
2393 2394
  }

2395
  initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0);
H
Haojun Liao 已提交
2396
  OPTR_SET_OPENED(pOperator);
2397

2398
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
2399 2400 2401
  return TSDB_CODE_SUCCESS;
}

2402
static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
2403
  SAggOperatorInfo* pAggInfo = pOperator->info;
H
Haojun Liao 已提交
2404 2405 2406 2407 2408 2409
  SOptrBasicInfo*   pInfo = &pAggInfo->binfo;

  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

L
Liu Jicong 已提交
2410
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2411
  pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
H
Haojun Liao 已提交
2412
  if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
2413
    doSetOperatorCompleted(pOperator);
H
Haojun Liao 已提交
2414 2415 2416
    return NULL;
  }

H
Haojun Liao 已提交
2417
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
S
slzhou 已提交
2418 2419
  while (1) {
    doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf);
H
Haojun Liao 已提交
2420
    doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
S
slzhou 已提交
2421

2422
    if (!hasRemainResults(&pAggInfo->groupResInfo)) {
S
slzhou 已提交
2423 2424 2425
      doSetOperatorCompleted(pOperator);
      break;
    }
2426

S
slzhou 已提交
2427 2428 2429 2430
    if (pInfo->pRes->info.rows > 0) {
      break;
    }
  }
2431

2432
  size_t rows = blockDataGetNumOfRows(pInfo->pRes);
2433 2434
  pOperator->resultInfo.totalRows += rows;

2435
  return (rows == 0) ? NULL : pInfo->pRes;
2436 2437
}

wmmhello's avatar
wmmhello 已提交
2438
int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* length) {
2439
  if (result == NULL || length == NULL) {
wmmhello's avatar
wmmhello 已提交
2440 2441 2442
    return TSDB_CODE_TSC_INVALID_INPUT;
  }
  SOptrBasicInfo* pInfo = (SOptrBasicInfo*)(pOperator->info);
2443
  SAggSupporter*  pSup = (SAggSupporter*)POINTER_SHIFT(pOperator->info, sizeof(SOptrBasicInfo));
2444
  int32_t         size = tSimpleHashGetSize(pSup->pResultRowHashTable);
2445 2446 2447
  size_t          keyLen = sizeof(uint64_t) * 2;  // estimate the key length
  int32_t         totalSize =
      sizeof(int32_t) + sizeof(int32_t) + size * (sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize);
wmmhello's avatar
wmmhello 已提交
2448

C
Cary Xu 已提交
2449 2450 2451 2452 2453 2454
  // no result
  if (getTotalBufSize(pSup->pResultBuf) == 0) {
    *result = NULL;
    *length = 0;
    return TSDB_CODE_SUCCESS;
  }
2455

wmmhello's avatar
wmmhello 已提交
2456
  *result = (char*)taosMemoryCalloc(1, totalSize);
L
Liu Jicong 已提交
2457
  if (*result == NULL) {
wmmhello's avatar
wmmhello 已提交
2458
    return TSDB_CODE_OUT_OF_MEMORY;
wmmhello's avatar
wmmhello 已提交
2459
  }
wmmhello's avatar
wmmhello 已提交
2460

wmmhello's avatar
wmmhello 已提交
2461
  int32_t offset = sizeof(int32_t);
wmmhello's avatar
wmmhello 已提交
2462 2463
  *(int32_t*)(*result + offset) = size;
  offset += sizeof(int32_t);
2464 2465

  // prepare memory
2466
  SResultRowPosition* pos = &pInfo->resultRowInfo.cur;
dengyihao's avatar
dengyihao 已提交
2467 2468
  void*               pPage = getBufPage(pSup->pResultBuf, pos->pageId);
  SResultRow*         pRow = (SResultRow*)((char*)pPage + pos->offset);
2469 2470
  setBufPageDirty(pPage, true);
  releaseBufPage(pSup->pResultBuf, pPage);
L
Liu Jicong 已提交
2471

2472 2473 2474 2475
  int32_t iter = 0;
  void*   pIter = NULL;
  while ((pIter = tSimpleHashIterate(pSup->pResultRowHashTable, pIter, &iter))) {
    void*               key = tSimpleHashGetKey(pIter, &keyLen);
2476
    SResultRowPosition* p1 = (SResultRowPosition*)pIter;
2477

dengyihao's avatar
dengyihao 已提交
2478
    pPage = (SFilePage*)getBufPage(pSup->pResultBuf, p1->pageId);
2479
    pRow = (SResultRow*)((char*)pPage + p1->offset);
2480 2481
    setBufPageDirty(pPage, true);
    releaseBufPage(pSup->pResultBuf, pPage);
wmmhello's avatar
wmmhello 已提交
2482 2483 2484

    // recalculate the result size
    int32_t realTotalSize = offset + sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize;
L
Liu Jicong 已提交
2485
    if (realTotalSize > totalSize) {
wmmhello's avatar
wmmhello 已提交
2486
      char* tmp = (char*)taosMemoryRealloc(*result, realTotalSize);
L
Liu Jicong 已提交
2487
      if (tmp == NULL) {
wafwerar's avatar
wafwerar 已提交
2488
        taosMemoryFree(*result);
wmmhello's avatar
wmmhello 已提交
2489
        *result = NULL;
wmmhello's avatar
wmmhello 已提交
2490
        return TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
2491
      } else {
wmmhello's avatar
wmmhello 已提交
2492 2493 2494 2495 2496 2497 2498 2499 2500 2501 2502 2503
        *result = tmp;
      }
    }
    // save key
    *(int32_t*)(*result + offset) = keyLen;
    offset += sizeof(int32_t);
    memcpy(*result + offset, key, keyLen);
    offset += keyLen;

    // save value
    *(int32_t*)(*result + offset) = pSup->resultRowSize;
    offset += sizeof(int32_t);
2504
    memcpy(*result + offset, pRow, pSup->resultRowSize);
wmmhello's avatar
wmmhello 已提交
2505 2506 2507
    offset += pSup->resultRowSize;
  }

wmmhello's avatar
wmmhello 已提交
2508 2509 2510 2511
  *(int32_t*)(*result) = offset;
  *length = offset;

  return TDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2512 2513
}

2514 2515 2516 2517 2518
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf) {
  if (pLimitInfo->remainGroupOffset > 0) {
    if (pLimitInfo->currentGroupId == 0) {  // it is the first group
      pLimitInfo->currentGroupId = pBlock->info.groupId;
      blockDataCleanup(pBlock);
2519
      return PROJECT_RETRIEVE_CONTINUE;
2520 2521 2522
    } else if (pLimitInfo->currentGroupId != pBlock->info.groupId) {
      // now it is the data from a new group
      pLimitInfo->remainGroupOffset -= 1;
2523 2524

      // ignore data block in current group
2525 2526
      if (pLimitInfo->remainGroupOffset > 0) {
        blockDataCleanup(pBlock);
2527 2528 2529 2530 2531
        return PROJECT_RETRIEVE_CONTINUE;
      }
    }

    // set current group id of the project operator
2532
    pLimitInfo->currentGroupId = pBlock->info.groupId;
2533 2534
  }

2535
  // here check for a new group data, we need to handle the data of the previous group.
2536 2537 2538
  if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.groupId) {
    pLimitInfo->numOfOutputGroups += 1;
    if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
2539
      pOperator->status = OP_EXEC_DONE;
2540
      blockDataCleanup(pBlock);
2541 2542 2543 2544 2545

      return PROJECT_RETRIEVE_DONE;
    }

    // reset the value for a new group data
2546 2547
    pLimitInfo->numOfOutputRows = 0;
    pLimitInfo->remainOffset = pLimitInfo->limit.offset;
2548 2549 2550 2551 2552

    // existing rows that belongs to previous group.
    if (pBlock->info.rows > 0) {
      return PROJECT_RETRIEVE_DONE;
    }
2553 2554 2555 2556 2557
  }

  // here we reach the start position, according to the limit/offset requirements.

  // set current group id
2558
  pLimitInfo->currentGroupId = pBlock->info.groupId;
2559

2560 2561 2562
  if (pLimitInfo->remainOffset >= pBlock->info.rows) {
    pLimitInfo->remainOffset -= pBlock->info.rows;
    blockDataCleanup(pBlock);
2563
    return PROJECT_RETRIEVE_CONTINUE;
2564 2565 2566
  } else if (pLimitInfo->remainOffset < pBlock->info.rows && pLimitInfo->remainOffset > 0) {
    blockDataTrimFirstNRows(pBlock, pLimitInfo->remainOffset);
    pLimitInfo->remainOffset = 0;
2567 2568
  }

2569
  // check for the limitation in each group
2570 2571 2572 2573
  if (pLimitInfo->limit.limit >= 0 && pLimitInfo->numOfOutputRows + pBlock->info.rows >= pLimitInfo->limit.limit) {
    int32_t keepRows = (int32_t)(pLimitInfo->limit.limit - pLimitInfo->numOfOutputRows);
    blockDataKeepFirstNRows(pBlock, keepRows);
    if (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups) {
2574 2575 2576
      pOperator->status = OP_EXEC_DONE;
    }

2577
    return PROJECT_RETRIEVE_DONE;
2578
  }
2579

2580
  // todo optimize performance
2581 2582
  // If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
  // they may not belong to the same group the limit/offset value is not valid in this case.
2583 2584
  if ((!holdDataInBuf) || (pBlock->info.rows >= pOperator->resultInfo.threshold) || pLimitInfo->slimit.offset != -1 ||
      pLimitInfo->slimit.limit != -1) {
2585
    return PROJECT_RETRIEVE_DONE;
L
Liu Jicong 已提交
2586
  } else {  // not full enough, continue to accumulate the output data in the buffer.
2587 2588 2589 2590
    return PROJECT_RETRIEVE_CONTINUE;
  }
}

2591
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
L
Liu Jicong 已提交
2592 2593
static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo,
                                               SResultInfo* pResultInfo, SExecTaskInfo* pTaskInfo) {
2594
  pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
2595 2596 2597 2598 2599
  SSDataBlock* pResBlock = pInfo->pFinalRes;

  int32_t order = TSDB_ORDER_ASC;
  int32_t scanFlag = MAIN_SCAN;
  getTableScanInfo(pOperator, &order, &scanFlag);
H
Haojun Liao 已提交
2600

L
Liu Jicong 已提交
2601 2602
  int64_t ekey =
      Q_STATUS_EQUAL(pTaskInfo->status, TASK_COMPLETED) ? pInfo->win.ekey : pInfo->existNewGroupBlock->info.window.ekey;
2603 2604
  taosResetFillInfo(pInfo->pFillInfo, getFillInfoStart(pInfo->pFillInfo));

2605
  blockDataCleanup(pInfo->pRes);
2606 2607 2608 2609
  doApplyScalarCalculation(pOperator, pInfo->existNewGroupBlock, order, scanFlag);

  taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, ekey);
  taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->pRes);
2610

2611 2612
  int32_t numOfResultRows = pResultInfo->capacity - pResBlock->info.rows;
  taosFillResultDataBlock(pInfo->pFillInfo, pResBlock, numOfResultRows);
H
Haojun Liao 已提交
2613

2614
  pInfo->curGroupId = pInfo->existNewGroupBlock->info.groupId;
2615 2616 2617
  pInfo->existNewGroupBlock = NULL;
}

L
Liu Jicong 已提交
2618 2619
static void doHandleRemainBlockFromNewGroup(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo,
                                            SResultInfo* pResultInfo, SExecTaskInfo* pTaskInfo) {
2620
  if (taosFillHasMoreResults(pInfo->pFillInfo)) {
H
Haojun Liao 已提交
2621 2622
    int32_t numOfResultRows = pResultInfo->capacity - pInfo->pFinalRes->info.rows;
    taosFillResultDataBlock(pInfo->pFillInfo, pInfo->pFinalRes, numOfResultRows);
2623 2624
    pInfo->pRes->info.groupId = pInfo->curGroupId;
    return;
2625 2626 2627 2628
  }

  // handle the cached new group data block
  if (pInfo->existNewGroupBlock) {
2629 2630 2631 2632 2633 2634
    doHandleRemainBlockForNewGroupImpl(pOperator, pInfo, pResultInfo, pTaskInfo);
  }
}

static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag) {
  SFillOperatorInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
2635
  SExprSupp*         pSup = &pOperator->exprSupp;
2636
  setInputDataBlock(pSup, pBlock, order, scanFlag, false);
2637 2638
  projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs, NULL);

2639 2640 2641 2642
  // reset the row value before applying the no-fill functions to the input data block, which is "pBlock" in this case.
  pInfo->pRes->info.rows = 0;
  SExprSupp* pNoFillSupp = &pInfo->noFillExprSupp;
  setInputDataBlock(pNoFillSupp, pBlock, order, scanFlag, false);
2643

2644 2645
  projectApplyFunctions(pNoFillSupp->pExprInfo, pInfo->pRes, pBlock, pNoFillSupp->pCtx, pNoFillSupp->numOfExprs, NULL);
  pInfo->pRes->info.groupId = pBlock->info.groupId;
2646 2647
}

S
slzhou 已提交
2648
static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
2649 2650
  SFillOperatorInfo* pInfo = pOperator->info;
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
2651

H
Haojun Liao 已提交
2652
  SResultInfo* pResultInfo = &pOperator->resultInfo;
H
Haojun Liao 已提交
2653
  SSDataBlock* pResBlock = pInfo->pFinalRes;
2654 2655

  blockDataCleanup(pResBlock);
2656

H
Haojun Liao 已提交
2657 2658
  int32_t order = TSDB_ORDER_ASC;
  int32_t scanFlag = MAIN_SCAN;
2659
  getTableScanInfo(pOperator, &order, &scanFlag);
2660

2661
  doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, pTaskInfo);
2662
  if (pResBlock->info.rows > 0) {
2663
    pResBlock->info.groupId = pInfo->curGroupId;
2664
    return pResBlock;
H
Haojun Liao 已提交
2665
  }
2666

H
Haojun Liao 已提交
2667
  SOperatorInfo* pDownstream = pOperator->pDownstream[0];
L
Liu Jicong 已提交
2668
  while (1) {
2669
    SSDataBlock* pBlock = pDownstream->fpSet.getNextFn(pDownstream);
2670 2671
    if (pBlock == NULL) {
      if (pInfo->totalInputRows == 0) {
2672
        doSetOperatorCompleted(pOperator);
2673 2674
        return NULL;
      }
2675

2676
      taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
2677
    } else {
2678
      blockDataUpdateTsWindow(pBlock, pInfo->primarySrcSlotId);
2679 2680

      blockDataCleanup(pInfo->pRes);
2681 2682
      blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);
      blockDataEnsureCapacity(pInfo->pFinalRes, pBlock->info.rows);
2683
      doApplyScalarCalculation(pOperator, pBlock, order, scanFlag);
2684

H
Haojun Liao 已提交
2685 2686 2687
      if (pInfo->curGroupId == 0 || pInfo->curGroupId == pInfo->pRes->info.groupId) {
        pInfo->curGroupId = pInfo->pRes->info.groupId;  // the first data block
        pInfo->totalInputRows += pInfo->pRes->info.rows;
2688

H
Haojun Liao 已提交
2689 2690 2691 2692 2693
        if (order == pInfo->pFillInfo->order) {
          taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, pBlock->info.window.ekey);
        } else {
          taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, pBlock->info.window.skey);
        }
H
Haojun Liao 已提交
2694
        taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->pRes);
L
Liu Jicong 已提交
2695
      } else if (pInfo->curGroupId != pBlock->info.groupId) {  // the new group data block
2696 2697 2698 2699 2700
        pInfo->existNewGroupBlock = pBlock;

        // Fill the previous group data block, before handle the data block of new group.
        // Close the fill operation for previous group data block
        taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
2701 2702 2703
      }
    }

2704 2705
    int32_t numOfResultRows = pOperator->resultInfo.capacity - pResBlock->info.rows;
    taosFillResultDataBlock(pInfo->pFillInfo, pResBlock, numOfResultRows);
2706 2707

    // current group has no more result to return
2708
    if (pResBlock->info.rows > 0) {
2709 2710
      // 1. The result in current group not reach the threshold of output result, continue
      // 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately
2711
      if (pResBlock->info.rows > pResultInfo->threshold || pBlock == NULL || pInfo->existNewGroupBlock != NULL) {
2712
        pResBlock->info.groupId = pInfo->curGroupId;
2713
        return pResBlock;
2714 2715
      }

2716
      doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, pTaskInfo);
2717
      if (pResBlock->info.rows >= pOperator->resultInfo.threshold || pBlock == NULL) {
2718
        pResBlock->info.groupId = pInfo->curGroupId;
2719
        return pResBlock;
2720 2721 2722
      }
    } else if (pInfo->existNewGroupBlock) {  // try next group
      assert(pBlock != NULL);
2723 2724 2725 2726

      blockDataCleanup(pResBlock);

      doHandleRemainBlockForNewGroupImpl(pOperator, pInfo, pResultInfo, pTaskInfo);
2727
      if (pResBlock->info.rows > pResultInfo->threshold) {
2728
        pResBlock->info.groupId = pInfo->curGroupId;
2729
        return pResBlock;
2730 2731 2732 2733 2734 2735 2736
      }
    } else {
      return NULL;
    }
  }
}

S
slzhou 已提交
2737 2738 2739 2740 2741 2742 2743 2744
static SSDataBlock* doFill(SOperatorInfo* pOperator) {
  SFillOperatorInfo* pInfo = pOperator->info;
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;

  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

S
slzhou 已提交
2745
  SSDataBlock* fillResult = NULL;
S
slzhou 已提交
2746
  while (true) {
S
slzhou 已提交
2747
    fillResult = doFillImpl(pOperator);
S
slzhou 已提交
2748 2749 2750 2751 2752
    if (fillResult == NULL) {
      doSetOperatorCompleted(pOperator);
      break;
    }

H
Haojun Liao 已提交
2753
    doFilter(fillResult, pOperator->exprSupp.pFilterInfo, &pInfo->matchInfo );
S
slzhou 已提交
2754 2755 2756 2757 2758
    if (fillResult->info.rows > 0) {
      break;
    }
  }

S
slzhou 已提交
2759
  if (fillResult != NULL) {
2760
    pOperator->resultInfo.totalRows += fillResult->info.rows;
S
slzhou 已提交
2761
  }
S
slzhou 已提交
2762

S
slzhou 已提交
2763
  return fillResult;
S
slzhou 已提交
2764 2765
}

2766
void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
C
Cary Xu 已提交
2767 2768 2769 2770 2771
  for (int32_t i = 0; i < numOfExprs; ++i) {
    SExprInfo* pExprInfo = &pExpr[i];
    for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
      if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_COLUMN) {
        taosMemoryFreeClear(pExprInfo->base.pParam[j].pCol);
H
Haojun Liao 已提交
2772
      }
2773
    }
C
Cary Xu 已提交
2774 2775 2776

    taosMemoryFree(pExprInfo->base.pParam);
    taosMemoryFree(pExprInfo->pExpr);
H
Haojun Liao 已提交
2777 2778 2779
  }
}

2780 2781 2782 2783 2784
static void destroyOperatorInfo(SOperatorInfo* pOperator) {
  if (pOperator == NULL) {
    return;
  }

2785
  if (pOperator->fpSet.closeFn != NULL) {
2786
    pOperator->fpSet.closeFn(pOperator->info);
2787 2788
  }

H
Haojun Liao 已提交
2789
  if (pOperator->pDownstream != NULL) {
L
Liu Jicong 已提交
2790
    for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
H
Haojun Liao 已提交
2791
      destroyOperatorInfo(pOperator->pDownstream[i]);
2792 2793
    }

wafwerar's avatar
wafwerar 已提交
2794
    taosMemoryFreeClear(pOperator->pDownstream);
H
Haojun Liao 已提交
2795
    pOperator->numOfDownstream = 0;
2796 2797
  }

2798
  cleanupExprSupp(&pOperator->exprSupp);
wafwerar's avatar
wafwerar 已提交
2799
  taosMemoryFreeClear(pOperator);
2800 2801
}

2802 2803 2804 2805 2806 2807
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz) {
  *defaultPgsz = 4096;
  while (*defaultPgsz < rowSize * 4) {
    *defaultPgsz <<= 1u;
  }

2808
  // The default buffer for each operator in query is 10MB.
2809
  // at least four pages need to be in buffer
2810 2811
  // TODO: make this variable to be configurable.
  *defaultBufsz = 4096 * 2560;
2812 2813 2814 2815 2816 2817 2818
  if ((*defaultBufsz) <= (*defaultPgsz)) {
    (*defaultBufsz) = (*defaultPgsz) * 4;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
2819 2820
int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
                         const char* pKey) {
L
Liu Jicong 已提交
2821
  int32_t    code = 0;
2822 2823
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);

2824
  pAggSup->currentPageId = -1;
dengyihao's avatar
dengyihao 已提交
2825 2826
  pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
  pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
2827
  pAggSup->pResultRowHashTable = tSimpleHashInit(10, hashFn);
2828

H
Haojun Liao 已提交
2829
  if (pAggSup->keyBuf == NULL || pAggSup->pResultRowHashTable == NULL) {
2830 2831 2832
    return TSDB_CODE_OUT_OF_MEMORY;
  }

dengyihao's avatar
dengyihao 已提交
2833
  uint32_t defaultPgsz = 0;
2834 2835
  uint32_t defaultBufsz = 0;
  getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz);
H
Haojun Liao 已提交
2836

wafwerar's avatar
wafwerar 已提交
2837
  if (!osTempSpaceAvailable()) {
H
Haojun Liao 已提交
2838 2839 2840
    code = TSDB_CODE_NO_AVAIL_DISK;
    qError("Init stream agg supporter failed since %s, %s", terrstr(code), pKey);
    return code;
wafwerar's avatar
wafwerar 已提交
2841
  }
2842

H
Haojun Liao 已提交
2843
  code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, tsTempDir);
H
Haojun Liao 已提交
2844
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2845
    qError("Create agg result buf failed since %s, %s", tstrerror(code), pKey);
H
Haojun Liao 已提交
2846 2847 2848
    return code;
  }

H
Haojun Liao 已提交
2849
  return code;
2850 2851
}

2852
void cleanupAggSup(SAggSupporter* pAggSup) {
wafwerar's avatar
wafwerar 已提交
2853
  taosMemoryFreeClear(pAggSup->keyBuf);
2854
  tSimpleHashCleanup(pAggSup->pResultRowHashTable);
H
Haojun Liao 已提交
2855
  destroyDiskbasedBuf(pAggSup->pResultBuf);
2856 2857
}

L
Liu Jicong 已提交
2858 2859
int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
                    const char* pkey) {
2860 2861 2862 2863 2864
  int32_t code = initExprSupp(pSup, pExprInfo, numOfCols);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

2865 2866 2867 2868 2869
  code = doInitAggInfoSup(pAggSup, pSup->pCtx, numOfCols, keyBufSize, pkey);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

L
Liu Jicong 已提交
2870
  for (int32_t i = 0; i < numOfCols; ++i) {
2871
    pSup->pCtx[i].saveHandle.pBuf = pAggSup->pResultBuf;
2872 2873
  }

2874
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
2875 2876
}

L
Liu Jicong 已提交
2877
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows) {
wmmhello's avatar
wmmhello 已提交
2878
  ASSERT(numOfRows != 0);
2879 2880
  pResultInfo->capacity = numOfRows;
  pResultInfo->threshold = numOfRows * 0.75;
2881

2882 2883
  if (pResultInfo->threshold == 0) {
    pResultInfo->threshold = numOfRows;
2884 2885 2886
  }
}

2887 2888 2889 2890 2891
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock) {
  pInfo->pRes = pBlock;
  initResultRowInfo(&pInfo->resultRowInfo);
}

5
54liuyao 已提交
2892
void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
2893 2894 2895 2896 2897 2898 2899 2900 2901 2902
  if (pCtx == NULL) {
    return NULL;
  }

  for (int32_t i = 0; i < numOfOutput; ++i) {
    for (int32_t j = 0; j < pCtx[i].numOfParams; ++j) {
      taosVariantDestroy(&pCtx[i].param[j].param);
    }

    taosMemoryFreeClear(pCtx[i].subsidiaries.pCtx);
2903
    taosMemoryFreeClear(pCtx[i].subsidiaries.buf);
2904 2905 2906 2907 2908 2909 2910 2911
    taosMemoryFree(pCtx[i].input.pData);
    taosMemoryFree(pCtx[i].input.pColumnDataAgg);
  }

  taosMemoryFreeClear(pCtx);
  return NULL;
}

2912
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr) {
2913 2914 2915 2916
  pSup->pExprInfo = pExprInfo;
  pSup->numOfExprs = numOfExpr;
  if (pSup->pExprInfo != NULL) {
    pSup->pCtx = createSqlFunctionCtx(pExprInfo, numOfExpr, &pSup->rowEntryInfoOffset);
2917 2918 2919
    if (pSup->pCtx == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
2920
  }
2921 2922

  return TSDB_CODE_SUCCESS;
2923 2924
}

2925 2926 2927 2928
void cleanupExprSupp(SExprSupp* pSupp) {
  destroySqlFunctionCtx(pSupp->pCtx, pSupp->numOfExprs);
  if (pSupp->pExprInfo != NULL) {
    destroyExprInfo(pSupp->pExprInfo, pSupp->numOfExprs);
C
Cary Xu 已提交
2929
    taosMemoryFreeClear(pSupp->pExprInfo);
2930
  }
H
Haojun Liao 已提交
2931 2932 2933 2934 2935 2936

  if (pSupp->pFilterInfo != NULL) {
    filterFreeInfo(pSupp->pFilterInfo);
    pSupp->pFilterInfo = NULL;
  }

2937 2938 2939
  taosMemoryFree(pSupp->rowEntryInfoOffset);
}

2940 2941
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode,
                                           SExecTaskInfo* pTaskInfo) {
wafwerar's avatar
wafwerar 已提交
2942
  SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo));
L
Liu Jicong 已提交
2943
  SOperatorInfo*    pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
2944 2945 2946
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
H
Haojun Liao 已提交
2947

H
Haojun Liao 已提交
2948 2949 2950 2951
  SSDataBlock* pResBlock = createResDataBlock(pAggNode->node.pOutputDataBlockDesc);
  initBasicInfo(&pInfo->binfo, pResBlock);

  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
2952
  initResultSizeInfo(&pOperator->resultInfo, 4096);
H
Haojun Liao 已提交
2953

2954 2955 2956
  int32_t    num = 0;
  SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
  int32_t    code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str);
L
Liu Jicong 已提交
2957
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2958 2959
    goto _error;
  }
H
Haojun Liao 已提交
2960

H
Haojun Liao 已提交
2961 2962 2963 2964 2965 2966
  int32_t    numOfScalarExpr = 0;
  SExprInfo* pScalarExprInfo = NULL;
  if (pAggNode->pExprs != NULL) {
    pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr);
  }

2967 2968 2969 2970
  code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2971

H
Haojun Liao 已提交
2972 2973 2974 2975 2976
  code = filterInitFromNode((SNode*)pAggNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
2977
  pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock;
2978
  pInfo->groupId = UINT64_MAX;
dengyihao's avatar
dengyihao 已提交
2979
  pOperator->name = "TableAggregate";
2980
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
2981
  pOperator->blocking = true;
dengyihao's avatar
dengyihao 已提交
2982 2983 2984
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
2985

5
54liuyao 已提交
2986 2987
  pOperator->fpSet =
      createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, NULL, destroyAggOperatorInfo, NULL);
H
Haojun Liao 已提交
2988

2989 2990 2991 2992 2993 2994
  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
    STableScanInfo* pTableScanInfo = downstream->info;
    pTableScanInfo->pdInfo.pExprSup = &pOperator->exprSupp;
    pTableScanInfo->pdInfo.pAggSup = &pInfo->aggSup;
  }

H
Haojun Liao 已提交
2995 2996 2997 2998
  code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2999 3000

  return pOperator;
H
Haojun Liao 已提交
3001

3002
_error:
H
Haojun Liao 已提交
3003 3004 3005 3006
  if (pInfo != NULL) {
    destroyAggOperatorInfo(pInfo);
  }

3007 3008 3009
  if (pOperator != NULL) {
    cleanupExprSupp(&pOperator->exprSupp);
  }
H
Haojun Liao 已提交
3010

3011
  taosMemoryFreeClear(pOperator);
H
Haojun Liao 已提交
3012
  pTaskInfo->code = code;
H
Haojun Liao 已提交
3013
  return NULL;
3014 3015
}

3016
void cleanupBasicInfo(SOptrBasicInfo* pInfo) {
3017
  assert(pInfo != NULL);
H
Haojun Liao 已提交
3018
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
3019 3020
}

H
Haojun Liao 已提交
3021 3022 3023 3024 3025 3026 3027
static void freeItem(void* pItem) {
  void** p = pItem;
  if (*p != NULL) {
    taosMemoryFreeClear(*p);
  }
}

3028
void destroyAggOperatorInfo(void* param) {
L
Liu Jicong 已提交
3029
  SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param;
L
Liu Jicong 已提交
3030 3031
  cleanupBasicInfo(&pInfo->binfo);

H
Haojun Liao 已提交
3032
  cleanupAggSup(&pInfo->aggSup);
S
shenglian zhou 已提交
3033
  cleanupExprSupp(&pInfo->scalarExprSup);
H
Haojun Liao 已提交
3034
  cleanupGroupResInfo(&pInfo->groupResInfo);
D
dapan1121 已提交
3035
  taosMemoryFreeClear(param);
3036
}
3037

3038
void destroyFillOperatorInfo(void* param) {
L
Liu Jicong 已提交
3039
  SFillOperatorInfo* pInfo = (SFillOperatorInfo*)param;
3040
  pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo);
H
Haojun Liao 已提交
3041
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
H
Haojun Liao 已提交
3042 3043
  pInfo->pFinalRes = blockDataDestroy(pInfo->pFinalRes);

3044
  cleanupExprSupp(&pInfo->noFillExprSupp);
H
Haojun Liao 已提交
3045

wafwerar's avatar
wafwerar 已提交
3046
  taosMemoryFreeClear(pInfo->p);
H
Haojun Liao 已提交
3047
  taosArrayDestroy(pInfo->matchInfo.pList);
D
dapan1121 已提交
3048
  taosMemoryFreeClear(param);
3049 3050
}

3051
void destroyExchangeOperatorInfo(void* param) {
L
Liu Jicong 已提交
3052
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
3053 3054 3055
  taosRemoveRef(exchangeObjRefPool, pExInfo->self);
}

L
Liu Jicong 已提交
3056
void freeSourceDataInfo(void* p) {
3057 3058 3059 3060
  SSourceDataInfo* pInfo = (SSourceDataInfo*)p;
  taosMemoryFreeClear(pInfo->pRsp);
}

3061
void doDestroyExchangeOperatorInfo(void* param) {
X
Xiaoyu Wang 已提交
3062
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
3063

H
Haojun Liao 已提交
3064
  taosArrayDestroy(pExInfo->pSources);
3065
  taosArrayDestroyEx(pExInfo->pSourceDataInfo, freeSourceDataInfo);
3066 3067 3068 3069

  if (pExInfo->pResultBlockList != NULL) {
    taosArrayDestroyEx(pExInfo->pResultBlockList, freeBlock);
    pExInfo->pResultBlockList = NULL;
H
Haojun Liao 已提交
3070 3071
  }

3072
  blockDataDestroy(pExInfo->pDummyBlock);
L
Liu Jicong 已提交
3073

3074
  tsem_destroy(&pExInfo->ready);
D
dapan1121 已提交
3075
  taosMemoryFreeClear(param);
H
Haojun Liao 已提交
3076 3077
}

H
Haojun Liao 已提交
3078 3079 3080 3081
static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, SExprInfo* pNotFillExpr,
                            int32_t numOfNotFillCols, SNodeListNode* pValNode, STimeWindow win, int32_t capacity,
                            const char* id, SInterval* pInterval, int32_t fillType, int32_t order) {
  SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pNotFillExpr, numOfNotFillCols, pValNode);
H
Haojun Liao 已提交
3082

H
Haojun Liao 已提交
3083 3084 3085
  int64_t     startKey = (order == TSDB_ORDER_ASC) ? win.skey : win.ekey;
  STimeWindow w = getAlignQueryTimeWindow(pInterval, pInterval->precision, startKey);
  w = getFirstQualifiedTimeWindow(startKey, &w, pInterval, order);
H
Haojun Liao 已提交
3086

L
Liu Jicong 已提交
3087 3088
  pInfo->pFillInfo = taosCreateFillInfo(w.skey, numOfCols, numOfNotFillCols, capacity, pInterval, fillType, pColInfo,
                                        pInfo->primaryTsCol, order, id);
H
Haojun Liao 已提交
3089

H
Haojun Liao 已提交
3090 3091 3092 3093 3094 3095 3096
  if (order == TSDB_ORDER_ASC) {
    pInfo->win.skey = win.skey;
    pInfo->win.ekey = win.ekey;
  } else {
    pInfo->win.skey = win.ekey;
    pInfo->win.ekey = win.skey;
  }
L
Liu Jicong 已提交
3097
  pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES);
3098

H
Haojun Liao 已提交
3099
  if (pInfo->pFillInfo == NULL || pInfo->p == NULL) {
H
Haojun Liao 已提交
3100 3101
    taosMemoryFree(pInfo->pFillInfo);
    taosMemoryFree(pInfo->p);
H
Haojun Liao 已提交
3102 3103 3104 3105 3106 3107
    return TSDB_CODE_OUT_OF_MEMORY;
  } else {
    return TSDB_CODE_SUCCESS;
  }
}

3108
static bool isWstartColumnExist(SFillOperatorInfo* pInfo) {
3109
  if (pInfo->noFillExprSupp.numOfExprs == 0) {
3110 3111
    return false;
  }
3112 3113 3114

  for (int32_t i = 0; i < pInfo->noFillExprSupp.numOfExprs; ++i) {
    SExprInfo* exprInfo = pInfo->noFillExprSupp.pExprInfo + i;
3115
    if (exprInfo->pExpr->nodeType == QUERY_NODE_COLUMN && exprInfo->base.numOfParams == 1 &&
3116 3117 3118 3119 3120 3121 3122
        exprInfo->base.pParam[0].pCol->colType == COLUMN_TYPE_WINDOW_START) {
      return true;
    }
  }
  return false;
}

3123 3124
static int32_t createPrimaryTsExprIfNeeded(SFillOperatorInfo* pInfo, SFillPhysiNode* pPhyFillNode, SExprSupp* pExprSupp,
                                           const char* idStr) {
3125
  bool wstartExist = isWstartColumnExist(pInfo);
3126

3127 3128
  if (wstartExist == false) {
    if (pPhyFillNode->pWStartTs->type != QUERY_NODE_TARGET) {
3129
      qError("pWStartTs of fill physical node is not a target node, %s", idStr);
3130 3131 3132
      return TSDB_CODE_QRY_SYS_ERROR;
    }

3133 3134
    SExprInfo* pExpr = taosMemoryRealloc(pExprSupp->pExprInfo, (pExprSupp->numOfExprs + 1) * sizeof(SExprInfo));
    if (pExpr == NULL) {
3135 3136 3137
      return TSDB_CODE_OUT_OF_MEMORY;
    }

3138 3139 3140
    createExprFromTargetNode(&pExpr[pExprSupp->numOfExprs], (STargetNode*)pPhyFillNode->pWStartTs);
    pExprSupp->numOfExprs += 1;
    pExprSupp->pExprInfo = pExpr;
3141
  }
3142

3143 3144 3145
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
3146 3147
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode,
                                      SExecTaskInfo* pTaskInfo) {
3148 3149 3150 3151 3152 3153
  SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo));
  SOperatorInfo*     pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

H
Haojun Liao 已提交
3154 3155 3156 3157
  pInfo->pRes = createResDataBlock(pPhyFillNode->node.pOutputDataBlockDesc);
  SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pInfo->numOfExpr);
  pOperator->exprSupp.pExprInfo = pExprInfo;

3158 3159 3160 3161 3162 3163 3164 3165
  SExprSupp* pNoFillSupp = &pInfo->noFillExprSupp;
  pNoFillSupp->pExprInfo = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &pNoFillSupp->numOfExprs);
  int32_t code = createPrimaryTsExprIfNeeded(pInfo, pPhyFillNode, pNoFillSupp, pTaskInfo->id.str);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  code = initExprSupp(pNoFillSupp, pNoFillSupp->pExprInfo, pNoFillSupp->numOfExprs);
3166 3167 3168
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
H
Haojun Liao 已提交
3169

L
Liu Jicong 已提交
3170
  SInterval* pInterval =
3171
      QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType
3172 3173
      ? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval
      : &((SIntervalAggOperatorInfo*)downstream->info)->interval;
3174

3175
  int32_t order = (pPhyFillNode->inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
3176
  int32_t type = convertFillType(pPhyFillNode->mode);
3177

H
Haojun Liao 已提交
3178
  SResultInfo* pResultInfo = &pOperator->resultInfo;
H
Haojun Liao 已提交
3179

3180
  initResultSizeInfo(&pOperator->resultInfo, 4096);
H
Haojun Liao 已提交
3181 3182 3183 3184 3185
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
  code = initExprSupp(&pOperator->exprSupp, pExprInfo, pInfo->numOfExpr);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
H
Haojun Liao 已提交
3186

H
Haojun Liao 已提交
3187 3188
  pInfo->primaryTsCol = ((STargetNode*)pPhyFillNode->pWStartTs)->slotId;
  pInfo->primarySrcSlotId = ((SColumnNode*)((STargetNode*)pPhyFillNode->pWStartTs)->pExpr)->slotId;
3189

3190
  int32_t numOfOutputCols = 0;
H
Haojun Liao 已提交
3191 3192
  code = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc, &numOfOutputCols,
                             COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo);
3193

3194
  code = initFillInfo(pInfo, pExprInfo, pInfo->numOfExpr, pNoFillSupp->pExprInfo, pNoFillSupp->numOfExprs,
3195 3196
                      (SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange, pResultInfo->capacity,
                      pTaskInfo->id.str, pInterval, type, order);
3197 3198 3199
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
3200

H
Haojun Liao 已提交
3201
  pInfo->pFinalRes = createOneDataBlock(pInfo->pRes, false);
H
Haojun Liao 已提交
3202 3203
  blockDataEnsureCapacity(pInfo->pFinalRes, pOperator->resultInfo.capacity);

H
Haojun Liao 已提交
3204 3205 3206 3207 3208
  code = filterInitFromNode((SNode*)pPhyFillNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

3209 3210 3211 3212
  pOperator->name = "FillOperator";
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL;
3213
  pOperator->exprSupp.numOfExprs = pInfo->numOfExpr;
3214 3215
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
3216

5
54liuyao 已提交
3217
  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroyFillOperatorInfo, NULL);
3218

3219
  code = appendDownstream(pOperator, &downstream, 1);
3220
  return pOperator;
H
Haojun Liao 已提交
3221

3222
_error:
H
Haojun Liao 已提交
3223 3224 3225 3226
  if (pInfo != NULL) {
    destroyFillOperatorInfo(pInfo);
  }

3227
  pTaskInfo->code = code;
wafwerar's avatar
wafwerar 已提交
3228
  taosMemoryFreeClear(pOperator);
H
Haojun Liao 已提交
3229
  return NULL;
3230 3231
}

D
dapan1121 已提交
3232
static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPTR_EXEC_MODEL model, char* dbFName) {
wafwerar's avatar
wafwerar 已提交
3233
  SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
H
Haojun Liao 已提交
3234 3235 3236 3237 3238
  if (pTaskInfo == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

3239
  setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
H
Haojun Liao 已提交
3240

3241
  pTaskInfo->schemaInfo.dbname = strdup(dbFName);
3242
  pTaskInfo->cost.created = taosGetTimestampMs();
H
Haojun Liao 已提交
3243
  pTaskInfo->id.queryId = queryId;
dengyihao's avatar
dengyihao 已提交
3244
  pTaskInfo->execModel = model;
H
Haojun Liao 已提交
3245
  pTaskInfo->pTableInfoList = tableListCreate();
H
Haojun Liao 已提交
3246

wafwerar's avatar
wafwerar 已提交
3247
  char* p = taosMemoryCalloc(1, 128);
L
Liu Jicong 已提交
3248
  snprintf(p, 128, "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, queryId);
H
Haojun Liao 已提交
3249
  pTaskInfo->id.str = p;
H
Haojun Liao 已提交
3250

3251 3252
  return pTaskInfo;
}
H
Haojun Liao 已提交
3253

H
Haojun Liao 已提交
3254 3255
SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode);

3256
int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, SExecTaskInfo* pTaskInfo) {
3257 3258
  SMetaReader mr = {0};
  metaReaderInit(&mr, pHandle->meta, 0);
3259
  int32_t code = metaGetTableEntryByUid(&mr, pScanNode->uid);
3260
  if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
3261 3262
    qError("failed to get the table meta, uid:0x%" PRIx64 ", suid:0x%" PRIx64 ", %s", pScanNode->uid, pScanNode->suid,
           GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
3263

D
dapan1121 已提交
3264
    metaReaderClear(&mr);
3265
    return terrno;
D
dapan1121 已提交
3266
  }
3267

3268 3269
  SSchemaInfo* pSchemaInfo = &pTaskInfo->schemaInfo;
  pSchemaInfo->tablename = strdup(mr.me.name);
3270 3271

  if (mr.me.type == TSDB_SUPER_TABLE) {
3272 3273
    pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
    pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version;
3274
  } else if (mr.me.type == TSDB_CHILD_TABLE) {
3275 3276
    tDecoderClear(&mr.coder);

3277 3278
    tb_uid_t suid = mr.me.ctbEntry.suid;
    metaGetTableEntryByUid(&mr, suid);
3279 3280
    pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
    pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version;
3281
  } else {
3282
    pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow);
3283
  }
3284 3285

  metaReaderClear(&mr);
3286

H
Haojun Liao 已提交
3287 3288 3289 3290 3291
  pSchemaInfo->qsw = extractQueriedColumnSchema(pScanNode);
  return TSDB_CODE_SUCCESS;
}

SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) {
3292 3293 3294
  int32_t numOfCols = LIST_LENGTH(pScanNode->pScanCols);
  int32_t numOfTags = LIST_LENGTH(pScanNode->pScanPseudoCols);

3295
  SSchemaWrapper* pqSw = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
3296
  pqSw->pSchema = taosMemoryCalloc(numOfCols + numOfTags, sizeof(SSchema));
3297

L
Liu Jicong 已提交
3298
  for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
3299
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pScanNode->pScanCols, i);
3300 3301
    SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;

H
Haojun Liao 已提交
3302 3303 3304
    SSchema* pSchema = &pqSw->pSchema[pqSw->nCols++];
    pSchema->colId = pColNode->colId;
    pSchema->type = pColNode->node.resType.type;
H
Haojun Liao 已提交
3305 3306
    pSchema->bytes = pColNode->node.resType.bytes;
    tstrncpy(pSchema->name, pColNode->colName, tListLen(pSchema->name));
3307 3308
  }

3309
  // this the tags and pseudo function columns, we only keep the tag columns
3310
  for (int32_t i = 0; i < numOfTags; ++i) {
3311 3312 3313 3314 3315 3316 3317 3318 3319
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pScanNode->pScanPseudoCols, i);

    int32_t type = nodeType(pNode->pExpr);
    if (type == QUERY_NODE_COLUMN) {
      SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;

      SSchema* pSchema = &pqSw->pSchema[pqSw->nCols++];
      pSchema->colId = pColNode->colId;
      pSchema->type = pColNode->node.resType.type;
H
Haojun Liao 已提交
3320
      pSchema->bytes = pColNode->node.resType.bytes;
H
Haojun Liao 已提交
3321
      tstrncpy(pSchema->name, pColNode->colName, tListLen(pSchema->name));
3322 3323 3324
    }
  }

H
Haojun Liao 已提交
3325
  return pqSw;
3326 3327
}

3328 3329
static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) {
  taosMemoryFreeClear(pSchemaInfo->dbname);
3330
  taosMemoryFreeClear(pSchemaInfo->tablename);
3331 3332
  tDeleteSSchemaWrapper(pSchemaInfo->sw);
  tDeleteSSchemaWrapper(pSchemaInfo->qsw);
3333 3334
}

3335
static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) { tDeleteSSchemaWrapper(pStreamInfo->schema); }
3336

3337
bool groupbyTbname(SNodeList* pGroupList) {
3338
  bool bytbname = false;
3339
  if (LIST_LENGTH(pGroupList) == 1) {
3340 3341 3342 3343 3344 3345 3346 3347 3348 3349
    SNode* p = nodesListGetNode(pGroupList, 0);
    if (p->type == QUERY_NODE_FUNCTION) {
      // partition by tbname/group by tbname
      bytbname = (strcmp(((struct SFunctionNode*)p)->functionName, "tbname") == 0);
    }
  }

  return bytbname;
}

3350 3351
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond,
                                  SNode* pTagIndexCond, const char* pUser) {
3352
  int32_t type = nodeType(pPhyNode);
3353 3354
  STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList;
  const char* idstr = GET_TASKID(pTaskInfo);
3355

X
Xiaoyu Wang 已提交
3356
  if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
3357
    SOperatorInfo* pOperator = NULL;
H
Haojun Liao 已提交
3358
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
dengyihao's avatar
dengyihao 已提交
3359
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
3360

3361 3362 3363 3364 3365 3366
      // NOTE: this is an patch to fix the physical plan
      // TODO remove it later
      if (pTableScanNode->scan.node.pLimit != NULL) {
        pTableScanNode->groupSort = true;
      }

L
Liu Jicong 已提交
3367 3368
      int32_t code =
          createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle,
H
Haojun Liao 已提交
3369
                                  pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
3370
      if (code) {
wmmhello's avatar
wmmhello 已提交
3371
        pTaskInfo->code = code;
3372
        qError("failed to createScanTableListInfo, code:%s, %s", tstrerror(code), idstr);
D
dapan1121 已提交
3373 3374
        return NULL;
      }
wmmhello's avatar
wmmhello 已提交
3375

3376
      code = extractTableSchemaInfo(pHandle, &pTableScanNode->scan, pTaskInfo);
S
slzhou 已提交
3377
      if (code) {
3378
        pTaskInfo->code = terrno;
wmmhello's avatar
wmmhello 已提交
3379 3380 3381
        return NULL;
      }

3382
      pOperator = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
3383 3384
      STableScanInfo* pScanInfo = pOperator->info;
      pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
S
slzhou 已提交
3385 3386
    } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
      STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
3387 3388 3389

      int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, true, pHandle,
                                             pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
L
Liu Jicong 已提交
3390
      if (code) {
wmmhello's avatar
wmmhello 已提交
3391
        pTaskInfo->code = code;
H
Haojun Liao 已提交
3392
        qError("failed to createScanTableListInfo, code: %s", tstrerror(code));
wmmhello's avatar
wmmhello 已提交
3393 3394
        return NULL;
      }
3395

3396
      code = extractTableSchemaInfo(pHandle, &pTableScanNode->scan, pTaskInfo);
wmmhello's avatar
wmmhello 已提交
3397 3398 3399 3400
      if (code) {
        pTaskInfo->code = terrno;
        return NULL;
      }
wmmhello's avatar
wmmhello 已提交
3401

3402
      pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo);
wmmhello's avatar
wmmhello 已提交
3403

3404 3405
      STableScanInfo* pScanInfo = pOperator->info;
      pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
H
Haojun Liao 已提交
3406
    } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
3407 3408
      pOperator = createExchangeOperatorInfo(pHandle ? pHandle->pMsgCb->clientRpc : NULL, (SExchangePhysiNode*)pPhyNode,
                                             pTaskInfo);
H
Haojun Liao 已提交
3409
    } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
5
54liuyao 已提交
3410
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
5
54liuyao 已提交
3411
      if (pHandle->vnode) {
L
Liu Jicong 已提交
3412 3413
        int32_t code =
            createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort,
H
Haojun Liao 已提交
3414
                                    pHandle, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
L
Liu Jicong 已提交
3415
        if (code) {
wmmhello's avatar
wmmhello 已提交
3416
          pTaskInfo->code = code;
H
Haojun Liao 已提交
3417
          qError("failed to createScanTableListInfo, code: %s", tstrerror(code));
wmmhello's avatar
wmmhello 已提交
3418 3419
          return NULL;
        }
L
Liu Jicong 已提交
3420 3421

#ifndef NDEBUG
H
Haojun Liao 已提交
3422
        int32_t sz = tableListGetSize(pTableListInfo);
H
Haojun Liao 已提交
3423 3424
        qDebug("create stream task, total:%d", sz);

L
Liu Jicong 已提交
3425
        for (int32_t i = 0; i < sz; i++) {
H
Haojun Liao 已提交
3426
          STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i);
H
Haojun Liao 已提交
3427
          qDebug("add table uid:%" PRIu64", gid:%"PRIu64, pKeyInfo->uid, pKeyInfo->groupId);
L
Liu Jicong 已提交
3428 3429
        }
#endif
3430
      }
3431

H
Haojun Liao 已提交
3432
      pTaskInfo->schemaInfo.qsw = extractQueriedColumnSchema(&pTableScanNode->scan);
3433
      pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTaskInfo);
H
Haojun Liao 已提交
3434
    } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
L
Liu Jicong 已提交
3435
      SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
3436
      pOperator = createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pUser, pTaskInfo);
3437
    } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
X
Xiaoyu Wang 已提交
3438
      STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode;
3439 3440

      int32_t code = createScanTableListInfo(pScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond,
H
Haojun Liao 已提交
3441
                                             pTagIndexCond, pTaskInfo);
3442
      if (code != TSDB_CODE_SUCCESS) {
3443
        pTaskInfo->code = code;
H
Haojun Liao 已提交
3444
        qError("failed to getTableList, code: %s", tstrerror(code));
3445 3446 3447
        return NULL;
      }

3448
      pOperator = createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo);
3449
    } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) {
3450
      SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode;
3451 3452

      if (pBlockNode->tableType == TSDB_SUPER_TABLE) {
H
Haojun Liao 已提交
3453 3454
        SArray* pList = taosArrayInit(4, sizeof(STableKeyInfo));
        int32_t code = vnodeGetAllTableList(pHandle->vnode, pBlockNode->uid, pList);
3455 3456 3457 3458
        if (code != TSDB_CODE_SUCCESS) {
          pTaskInfo->code = terrno;
          return NULL;
        }
H
Haojun Liao 已提交
3459

H
Haojun Liao 已提交
3460
        for(int32_t i = 0; i < tableListGetSize(pTableListInfo); ++i) {
H
Haojun Liao 已提交
3461
          STableKeyInfo* p = taosArrayGet(pList, i);
H
Haojun Liao 已提交
3462
          tableListAddTableInfo(pTableListInfo, p->uid, 0);
H
Haojun Liao 已提交
3463 3464
        }
        taosArrayDestroy(pList);
3465
      } else {  // Create group with only one table
H
Haojun Liao 已提交
3466
        tableListAddTableInfo(pTableListInfo, pBlockNode->uid, 0);
3467 3468
      }

3469
      pOperator = createDataBlockInfoScanOperator(pHandle, pBlockNode, pTaskInfo);
H
Haojun Liao 已提交
3470 3471 3472
    } else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) {
      SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode;

L
Liu Jicong 已提交
3473
      int32_t code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo,
H
Haojun Liao 已提交
3474
                                             pTagCond, pTagIndexCond, pTaskInfo);
3475 3476 3477 3478
      if (code != TSDB_CODE_SUCCESS) {
        pTaskInfo->code = code;
        return NULL;
      }
3479

3480
      code = extractTableSchemaInfo(pHandle, &pScanNode->scan, pTaskInfo);
3481 3482 3483
      if (code != TSDB_CODE_SUCCESS) {
        pTaskInfo->code = code;
        return NULL;
H
Haojun Liao 已提交
3484 3485
      }

3486
      pOperator = createCacherowsScanOperator(pScanNode, pHandle, pTaskInfo);
3487
    } else if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
3488
      pOperator = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo);
H
Haojun Liao 已提交
3489 3490
    } else {
      ASSERT(0);
H
Haojun Liao 已提交
3491
    }
3492 3493 3494 3495 3496

    if (pOperator != NULL) {
      pOperator->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId;
    }

3497
    return pOperator;
H
Haojun Liao 已提交
3498 3499
  }

3500
  size_t  size = LIST_LENGTH(pPhyNode->pChildren);
3501
  SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES);
3502 3503 3504 3505
  if (ops == NULL) {
    return NULL;
  }

dengyihao's avatar
dengyihao 已提交
3506
  for (int32_t i = 0; i < size; ++i) {
3507
    SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
3508
    ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, pTagCond, pTagIndexCond, pUser);
3509
    if (ops[i] == NULL) {
H
Haojun Liao 已提交
3510
      taosMemoryFree(ops);
3511 3512
      return NULL;
    }
3513
  }
H
Haojun Liao 已提交
3514

3515
  SOperatorInfo* pOptr = NULL;
H
Haojun Liao 已提交
3516
  if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
3517
    pOptr = createProjectOperatorInfo(ops[0], (SProjectPhysiNode*)pPhyNode, pTaskInfo);
3518
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_AGG == type) {
H
Haojun Liao 已提交
3519 3520
    SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode;
    if (pAggNode->pGroupKeys != NULL) {
H
Haojun Liao 已提交
3521
      pOptr = createGroupOperatorInfo(ops[0], pAggNode, pTaskInfo);
H
Haojun Liao 已提交
3522
    } else {
H
Haojun Liao 已提交
3523
      pOptr = createAggregateOperatorInfo(ops[0], pAggNode, pTaskInfo);
H
Haojun Liao 已提交
3524
    }
3525
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL == type) {
H
Haojun Liao 已提交
3526
    SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
3527

H
Haojun Liao 已提交
3528 3529
    bool isStream = (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type);
    pOptr = createIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo, isStream);
3530
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type) {
3531
    pOptr = createStreamIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo);
3532 3533
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == type) {
    SMergeAlignedIntervalPhysiNode* pIntervalPhyNode = (SMergeAlignedIntervalPhysiNode*)pPhyNode;
3534
    pOptr = createMergeAlignedIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo);
S
shenglian zhou 已提交
3535
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) {
X
Xiaoyu Wang 已提交
3536
    SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode;
3537
    pOptr = createMergeIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo);
5
54liuyao 已提交
3538
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) {
3539
    int32_t children = 0;
5
54liuyao 已提交
3540 3541
    pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL == type) {
5
54liuyao 已提交
3542
    int32_t children = pHandle->numOfVgroups;
5
54liuyao 已提交
3543
    pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
H
Haojun Liao 已提交
3544
  } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
3545
    pOptr = createSortOperatorInfo(ops[0], (SSortPhysiNode*)pPhyNode, pTaskInfo);
S
shenglian zhou 已提交
3546 3547
  } else if (QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT == type) {
    pOptr = createGroupSortOperatorInfo(ops[0], (SGroupSortPhysiNode*)pPhyNode, pTaskInfo);
X
Xiaoyu Wang 已提交
3548
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE == type) {
3549
    SMergePhysiNode* pMergePhyNode = (SMergePhysiNode*)pPhyNode;
3550
    pOptr = createMultiwayMergeOperatorInfo(ops, size, pMergePhyNode, pTaskInfo);
3551
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION == type) {
H
Haojun Liao 已提交
3552
    SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
3553
    pOptr = createSessionAggOperatorInfo(ops[0], pSessionNode, pTaskInfo);
3554
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION == type) {
3555 3556 3557 3558 3559
    pOptr = createStreamSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION == type) {
    int32_t children = 0;
    pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION == type) {
3560
    int32_t children = pHandle->numOfVgroups;
3561
    pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
H
Haojun Liao 已提交
3562
  } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
3563
    pOptr = createPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo);
3564
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION == type) {
3565
    pOptr = createStreamPartitionOperatorInfo(ops[0], (SStreamPartitionPhysiNode*)pPhyNode, pTaskInfo);
3566
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE == type) {
dengyihao's avatar
dengyihao 已提交
3567
    SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*)pPhyNode;
3568
    pOptr = createStatewindowOperatorInfo(ops[0], pStateNode, pTaskInfo);
3569
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) {
5
54liuyao 已提交
3570
    pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
3571
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) {
3572
    pOptr = createMergeJoinOperatorInfo(ops, size, (SSortMergeJoinPhysiNode*)pPhyNode, pTaskInfo);
3573
  } else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) {
H
Haojun Liao 已提交
3574
    pOptr = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, pTaskInfo);
5
54liuyao 已提交
3575 3576
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL == type) {
    pOptr = createStreamFillOperatorInfo(ops[0], (SStreamFillPhysiNode*)pPhyNode, pTaskInfo);
H
Haojun Liao 已提交
3577 3578
  } else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) {
    pOptr = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo);
3579 3580
  } else if (QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC == type) {
    pOptr = createTimeSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo);
H
Haojun Liao 已提交
3581 3582
  } else {
    ASSERT(0);
H
Haojun Liao 已提交
3583
  }
3584

3585
  taosMemoryFree(ops);
3586 3587 3588 3589
  if (pOptr) {
    pOptr->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId;
  }

3590
  return pOptr;
3591
}
H
Haojun Liao 已提交
3592

L
Liu Jicong 已提交
3593 3594 3595 3596 3597 3598 3599 3600 3601 3602 3603 3604 3605
static int32_t extractTbscanInStreamOpTree(SOperatorInfo* pOperator, STableScanInfo** ppInfo) {
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
    if (pOperator->numOfDownstream == 0) {
      qError("failed to find stream scan operator");
      return TSDB_CODE_QRY_APP_ERROR;
    }

    if (pOperator->numOfDownstream > 1) {
      qError("join not supported for stream block scan");
      return TSDB_CODE_QRY_APP_ERROR;
    }
    return extractTbscanInStreamOpTree(pOperator->pDownstream[0], ppInfo);
  } else {
3606 3607 3608
    SStreamScanInfo* pInfo = pOperator->info;
    ASSERT(pInfo->pTableScanOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
    *ppInfo = pInfo->pTableScanOp->info;
L
Liu Jicong 已提交
3609 3610 3611 3612
    return 0;
  }
}

3613 3614 3615 3616 3617 3618 3619 3620 3621 3622 3623 3624 3625 3626 3627 3628 3629 3630 3631 3632 3633 3634
int32_t extractTableScanNode(SPhysiNode* pNode, STableScanPhysiNode** ppNode) {
  if (pNode->pChildren == NULL || LIST_LENGTH(pNode->pChildren) == 0) {
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == pNode->type) {
      *ppNode = (STableScanPhysiNode*)pNode;
      return 0;
    } else {
      ASSERT(0);
      terrno = TSDB_CODE_QRY_APP_ERROR;
      return -1;
    }
  } else {
    if (LIST_LENGTH(pNode->pChildren) != 1) {
      ASSERT(0);
      terrno = TSDB_CODE_QRY_APP_ERROR;
      return -1;
    }
    SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pNode->pChildren, 0);
    return extractTableScanNode(pChildNode, ppNode);
  }
  return -1;
}

3635
#if 0
L
Liu Jicong 已提交
3636 3637 3638 3639 3640
int32_t rebuildReader(SOperatorInfo* pOperator, SSubplan* plan, SReadHandle* pHandle, int64_t uid, int64_t ts) {
  STableScanInfo* pTableScanInfo = NULL;
  if (extractTbscanInStreamOpTree(pOperator, &pTableScanInfo) < 0) {
    return -1;
  }
3641

L
Liu Jicong 已提交
3642 3643 3644 3645
  STableScanPhysiNode* pNode = NULL;
  if (extractTableScanNode(plan->pNode, &pNode) < 0) {
    ASSERT(0);
  }
3646

H
Haojun Liao 已提交
3647
  tsdbReaderClose(pTableScanInfo->dataReader);
3648

L
Liu Jicong 已提交
3649
  STableListInfo info = {0};
H
Haojun Liao 已提交
3650
  pTableScanInfo->dataReader = doCreateDataReader(pNode, pHandle, &info, NULL);
L
Liu Jicong 已提交
3651 3652 3653 3654
  if (pTableScanInfo->dataReader == NULL) {
    ASSERT(0);
    qError("failed to create data reader");
    return TSDB_CODE_QRY_APP_ERROR;
3655
  }
L
Liu Jicong 已提交
3656
  // TODO: set uid and ts to data reader
3657 3658
  return 0;
}
3659
#endif
3660

C
Cary Xu 已提交
3661
int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t* length, int32_t* nOptrWithVal) {
wmmhello's avatar
wmmhello 已提交
3662
  int32_t code = TDB_CODE_SUCCESS;
3663
  char*   pCurrent = NULL;
wmmhello's avatar
wmmhello 已提交
3664
  int32_t currLength = 0;
3665
  if (ops->fpSet.encodeResultRow) {
C
Cary Xu 已提交
3666
    if (result == NULL || length == NULL || nOptrWithVal == NULL) {
wmmhello's avatar
wmmhello 已提交
3667 3668 3669
      return TSDB_CODE_TSC_INVALID_INPUT;
    }
    code = ops->fpSet.encodeResultRow(ops, &pCurrent, &currLength);
wmmhello's avatar
wmmhello 已提交
3670

3671 3672
    if (code != TDB_CODE_SUCCESS) {
      if (*result != NULL) {
wmmhello's avatar
wmmhello 已提交
3673 3674 3675 3676
        taosMemoryFree(*result);
        *result = NULL;
      }
      return code;
C
Cary Xu 已提交
3677 3678 3679
    } else if (currLength == 0) {
      ASSERT(!pCurrent);
      goto _downstream;
wmmhello's avatar
wmmhello 已提交
3680
    }
wmmhello's avatar
wmmhello 已提交
3681

C
Cary Xu 已提交
3682 3683
    ++(*nOptrWithVal);

C
Cary Xu 已提交
3684
    ASSERT(currLength >= 0);
wmmhello's avatar
wmmhello 已提交
3685

3686
    if (*result == NULL) {
wmmhello's avatar
wmmhello 已提交
3687
      *result = (char*)taosMemoryCalloc(1, currLength + sizeof(int32_t));
wmmhello's avatar
wmmhello 已提交
3688 3689 3690 3691 3692 3693
      if (*result == NULL) {
        taosMemoryFree(pCurrent);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      memcpy(*result + sizeof(int32_t), pCurrent, currLength);
      *(int32_t*)(*result) = currLength + sizeof(int32_t);
3694
    } else {
wmmhello's avatar
wmmhello 已提交
3695
      int32_t sizePre = *(int32_t*)(*result);
3696
      char*   tmp = (char*)taosMemoryRealloc(*result, sizePre + currLength);
wmmhello's avatar
wmmhello 已提交
3697 3698 3699 3700 3701 3702 3703 3704 3705 3706 3707 3708
      if (tmp == NULL) {
        taosMemoryFree(pCurrent);
        taosMemoryFree(*result);
        *result = NULL;
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      *result = tmp;
      memcpy(*result + sizePre, pCurrent, currLength);
      *(int32_t*)(*result) += currLength;
    }
    taosMemoryFree(pCurrent);
    *length = *(int32_t*)(*result);
wmmhello's avatar
wmmhello 已提交
3709 3710
  }

3711
  _downstream:
wmmhello's avatar
wmmhello 已提交
3712
  for (int32_t i = 0; i < ops->numOfDownstream; ++i) {
C
Cary Xu 已提交
3713
    code = encodeOperator(ops->pDownstream[i], result, length, nOptrWithVal);
3714
    if (code != TDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
3715
      return code;
wmmhello's avatar
wmmhello 已提交
3716 3717
    }
  }
wmmhello's avatar
wmmhello 已提交
3718
  return TDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
3719 3720
}

H
Haojun Liao 已提交
3721
int32_t decodeOperator(SOperatorInfo* ops, const char* result, int32_t length) {
wmmhello's avatar
wmmhello 已提交
3722
  int32_t code = TDB_CODE_SUCCESS;
3723 3724
  if (ops->fpSet.decodeResultRow) {
    if (result == NULL) {
wmmhello's avatar
wmmhello 已提交
3725 3726
      return TSDB_CODE_TSC_INVALID_INPUT;
    }
H
Haojun Liao 已提交
3727

3728
    ASSERT(length == *(int32_t*)result);
H
Haojun Liao 已提交
3729 3730

    const char* data = result + sizeof(int32_t);
L
Liu Jicong 已提交
3731
    code = ops->fpSet.decodeResultRow(ops, (char*)data);
3732
    if (code != TDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
3733 3734
      return code;
    }
wmmhello's avatar
wmmhello 已提交
3735

wmmhello's avatar
wmmhello 已提交
3736
    int32_t totalLength = *(int32_t*)result;
3737 3738
    int32_t dataLength = *(int32_t*)data;

3739
    if (totalLength == dataLength + sizeof(int32_t)) {  // the last data
wmmhello's avatar
wmmhello 已提交
3740 3741
      result = NULL;
      length = 0;
3742
    } else {
wmmhello's avatar
wmmhello 已提交
3743 3744 3745 3746
      result += dataLength;
      *(int32_t*)(result) = totalLength - dataLength;
      length = totalLength - dataLength;
    }
wmmhello's avatar
wmmhello 已提交
3747 3748
  }

wmmhello's avatar
wmmhello 已提交
3749 3750
  for (int32_t i = 0; i < ops->numOfDownstream; ++i) {
    code = decodeOperator(ops->pDownstream[i], result, length);
3751
    if (code != TDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
3752
      return code;
wmmhello's avatar
wmmhello 已提交
3753 3754
    }
  }
wmmhello's avatar
wmmhello 已提交
3755
  return TDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
3756 3757
}

D
dapan1121 已提交
3758
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle) {
D
dapan1121 已提交
3759
  SExecTaskInfo* pTask = *(SExecTaskInfo**)pTaskInfo;
3760

D
dapan1121 已提交
3761
  switch (pNode->type) {
D
dapan1121 已提交
3762 3763 3764 3765 3766 3767
    case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: {
      SInserterParam* pInserterParam = taosMemoryCalloc(1, sizeof(SInserterParam));
      if (NULL == pInserterParam) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      pInserterParam->readHandle = readHandle;
L
Liu Jicong 已提交
3768

D
dapan1121 已提交
3769 3770 3771
      *pParam = pInserterParam;
      break;
    }
D
dapan1121 已提交
3772
    case QUERY_NODE_PHYSICAL_PLAN_DELETE: {
3773
      SDeleterParam* pDeleterParam = taosMemoryCalloc(1, sizeof(SDeleterParam));
D
dapan1121 已提交
3774 3775 3776
      if (NULL == pDeleterParam) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
H
Haojun Liao 已提交
3777 3778 3779 3780
      int32_t tbNum = tableListGetSize(pTask->pTableInfoList);
      pDeleterParam->suid = tableListGetSuid(pTask->pTableInfoList);

      // TODO extract uid list
D
dapan1121 已提交
3781 3782 3783 3784 3785
      pDeleterParam->pUidList = taosArrayInit(tbNum, sizeof(uint64_t));
      if (NULL == pDeleterParam->pUidList) {
        taosMemoryFree(pDeleterParam);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
H
Haojun Liao 已提交
3786

D
dapan1121 已提交
3787
      for (int32_t i = 0; i < tbNum; ++i) {
H
Haojun Liao 已提交
3788
        STableKeyInfo* pTable = tableListGetInfo(pTask->pTableInfoList, i);
D
dapan1121 已提交
3789 3790 3791 3792 3793 3794 3795 3796 3797 3798 3799 3800 3801
        taosArrayPush(pDeleterParam->pUidList, &pTable->uid);
      }

      *pParam = pDeleterParam;
      break;
    }
    default:
      break;
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
3802
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
D
dapan1121 已提交
3803
                               char* sql, EOPTR_EXEC_MODEL model) {
H
Haojun Liao 已提交
3804 3805
  uint64_t queryId = pPlan->id.queryId;

D
dapan1121 已提交
3806
  *pTaskInfo = createExecTaskInfo(queryId, taskId, model, pPlan->dbFName);
H
Haojun Liao 已提交
3807 3808 3809
  if (*pTaskInfo == NULL) {
    goto _complete;
  }
H
Haojun Liao 已提交
3810

3811
  if (pHandle) {
L
Liu Jicong 已提交
3812
    /*(*pTaskInfo)->streamInfo.fillHistoryVer1 = pHandle->fillHistoryVer1;*/
3813 3814 3815
    if (pHandle->pStateBackend) {
      (*pTaskInfo)->streamInfo.pState = pHandle->pStateBackend;
    }
H
Haojun Liao 已提交
3816 3817
  }

3818
  (*pTaskInfo)->sql = sql;
D
dapan1121 已提交
3819
  sql = NULL;
H
Haojun Liao 已提交
3820

3821
  (*pTaskInfo)->pSubplan = pPlan;
3822 3823
  (*pTaskInfo)->pRoot =
      createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user);
L
Liu Jicong 已提交
3824

D
dapan1121 已提交
3825
  if (NULL == (*pTaskInfo)->pRoot) {
H
Haojun Liao 已提交
3826
    terrno = (*pTaskInfo)->code;
D
dapan1121 已提交
3827
    goto _complete;
3828 3829
  }

H
Haojun Liao 已提交
3830
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
3831

H
Haojun Liao 已提交
3832
_complete:
D
dapan1121 已提交
3833
  taosMemoryFree(sql);
H
Haojun Liao 已提交
3834
  doDestroyTask(*pTaskInfo);
H
Haojun Liao 已提交
3835
  return terrno;
H
Haojun Liao 已提交
3836 3837
}

L
Liu Jicong 已提交
3838
void doDestroyTask(SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
3839 3840
  qDebug("%s execTask is freed", GET_TASKID(pTaskInfo));

H
Haojun Liao 已提交
3841
  pTaskInfo->pTableInfoList = tableListDestroy(pTaskInfo->pTableInfoList);
H
Haojun Liao 已提交
3842
  destroyOperatorInfo(pTaskInfo->pRoot);
3843
  cleanupTableSchemaInfo(&pTaskInfo->schemaInfo);
3844
  cleanupStreamInfo(&pTaskInfo->streamInfo);
3845

D
dapan1121 已提交
3846
  if (!pTaskInfo->localFetch.localExec) {
D
dapan1121 已提交
3847 3848
    nodesDestroyNode((SNode*)pTaskInfo->pSubplan);
  }
3849

wafwerar's avatar
wafwerar 已提交
3850 3851 3852
  taosMemoryFreeClear(pTaskInfo->sql);
  taosMemoryFreeClear(pTaskInfo->id.str);
  taosMemoryFreeClear(pTaskInfo);
3853 3854 3855 3856
}

static int64_t getQuerySupportBufSize(size_t numOfTables) {
  size_t s1 = sizeof(STableQueryInfo);
L
Liu Jicong 已提交
3857 3858
  //  size_t s3 = sizeof(STableCheckInfo);  buffer consumption in tsdb
  return (int64_t)(s1 * 1.5 * numOfTables);
3859 3860 3861 3862 3863 3864 3865
}

int32_t checkForQueryBuf(size_t numOfTables) {
  int64_t t = getQuerySupportBufSize(numOfTables);
  if (tsQueryBufferSizeBytes < 0) {
    return TSDB_CODE_SUCCESS;
  } else if (tsQueryBufferSizeBytes > 0) {
L
Liu Jicong 已提交
3866
    while (1) {
3867 3868 3869 3870 3871 3872 3873 3874 3875 3876 3877 3878 3879 3880 3881 3882 3883 3884 3885 3886 3887 3888 3889 3890 3891 3892
      int64_t s = tsQueryBufferSizeBytes;
      int64_t remain = s - t;
      if (remain >= 0) {
        if (atomic_val_compare_exchange_64(&tsQueryBufferSizeBytes, s, remain) == s) {
          return TSDB_CODE_SUCCESS;
        }
      } else {
        return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
      }
    }
  }

  // disable query processing if the value of tsQueryBufferSize is zero.
  return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
}

void releaseQueryBuf(size_t numOfTables) {
  if (tsQueryBufferSizeBytes < 0) {
    return;
  }

  int64_t t = getQuerySupportBufSize(numOfTables);

  // restore value is not enough buffer available
  atomic_add_fetch_64(&tsQueryBufferSizeBytes, t);
}
D
dapan1121 已提交
3893

H
Haojun Liao 已提交
3894
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList) {
3895
  SExplainExecInfo  execInfo = {0};
H
Haojun Liao 已提交
3896
  SExplainExecInfo* pExplainInfo = taosArrayPush(pExecInfoList, &execInfo);
3897

H
Haojun Liao 已提交
3898 3899 3900 3901 3902
  pExplainInfo->numOfRows = operatorInfo->resultInfo.totalRows;
  pExplainInfo->startupCost = operatorInfo->cost.openCost;
  pExplainInfo->totalCost = operatorInfo->cost.totalCost;
  pExplainInfo->verboseLen = 0;
  pExplainInfo->verboseInfo = NULL;
D
dapan1121 已提交
3903

3904
  if (operatorInfo->fpSet.getExplainFn) {
3905 3906
    int32_t code =
        operatorInfo->fpSet.getExplainFn(operatorInfo, &pExplainInfo->verboseInfo, &pExplainInfo->verboseLen);
D
dapan1121 已提交
3907
    if (code) {
3908
      qError("%s operator getExplainFn failed, code:%s", GET_TASKID(operatorInfo->pTaskInfo), tstrerror(code));
D
dapan1121 已提交
3909 3910 3911
      return code;
    }
  }
dengyihao's avatar
dengyihao 已提交
3912

D
dapan1121 已提交
3913
  int32_t code = 0;
D
dapan1121 已提交
3914
  for (int32_t i = 0; i < operatorInfo->numOfDownstream; ++i) {
H
Haojun Liao 已提交
3915 3916
    code = getOperatorExplainExecInfo(operatorInfo->pDownstream[i], pExecInfoList);
    if (code != TSDB_CODE_SUCCESS) {
3917
      //      taosMemoryFreeClear(*pRes);
D
dapan1121 已提交
3918 3919 3920 3921 3922
      return TSDB_CODE_QRY_OUT_OF_MEMORY;
    }
  }

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
3923
}
5
54liuyao 已提交
3924

3925 3926
int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId,
                     SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup) {
3927 3928 3929 3930 3931 3932
  SWinKey key = {
      .ts = win->skey,
      .groupId = tableGroupId,
  };
  char*   value = NULL;
  int32_t size = pAggSup->resultRowSize;
5
54liuyao 已提交
3933

3934
  if (streamStateAddIfNotExist(pState, &key, (void**)&value, &size) < 0) {
3935 3936 3937 3938 3939 3940 3941 3942 3943 3944
    return TSDB_CODE_QRY_OUT_OF_MEMORY;
  }
  *pResult = (SResultRow*)value;
  ASSERT(*pResult);
  // set time window for current result
  (*pResult)->win = (*win);
  setResultRowInitCtx(*pResult, pCtx, numOfOutput, rowEntryInfoOffset);
  return TSDB_CODE_SUCCESS;
}

3945 3946
int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult) {
  streamStateReleaseBuf(pState, pKey, pResult);
3947 3948 3949
  return TSDB_CODE_SUCCESS;
}

3950 3951
int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize) {
  streamStatePut(pState, pKey, pResult, resSize);
3952 3953 3954
  return TSDB_CODE_SUCCESS;
}

3955
int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup,
3956
                                   SGroupResInfo* pGroupResInfo) {
3957
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
3958 3959 3960 3961 3962 3963 3964 3965 3966 3967 3968 3969
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  int32_t         numOfExprs = pSup->numOfExprs;
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
  SqlFunctionCtx* pCtx = pSup->pCtx;

  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);

  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
    SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
    int32_t     size = 0;
    void*       pVal = NULL;
    SWinKey     key = {
3970 3971
        .ts = *(TSKEY*)pPos->key,
        .groupId = pPos->groupId,
3972
    };
3973
    int32_t code = streamStateGet(pState, &key, &pVal, &size);
3974 3975 3976 3977 3978 3979
    ASSERT(code == 0);
    SResultRow* pRow = (SResultRow*)pVal;
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
    // no results, continue to check the next one
    if (pRow->numOfRows == 0) {
      pGroupResInfo->index += 1;
3980
      releaseOutputBuf(pState, &key, pRow);
3981 3982 3983 3984 3985
      continue;
    }

    if (pBlock->info.groupId == 0) {
      pBlock->info.groupId = pPos->groupId;
3986 3987 3988 3989
      SStreamIntervalOperatorInfo* pInfo = pOperator->info;
      char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t));
      if (tbname != NULL) {
        memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
L
Liu Jicong 已提交
3990 3991
      } else {
        pBlock->info.parTbName[0] = 0;
3992
      }
3993 3994 3995
    } else {
      // current value belongs to different group, it can't be packed into one datablock
      if (pBlock->info.groupId != pPos->groupId) {
3996
        releaseOutputBuf(pState, &key, pRow);
3997 3998 3999 4000 4001 4002
        break;
      }
    }

    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
      ASSERT(pBlock->info.rows > 0);
4003
      releaseOutputBuf(pState, &key, pRow);
4004 4005 4006 4007 4008 4009 4010 4011 4012 4013
      break;
    }

    pGroupResInfo->index += 1;

    for (int32_t j = 0; j < numOfExprs; ++j) {
      int32_t slotId = pExprInfo[j].base.resSchema.slotId;

      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
      if (pCtx[j].fpSet.finalize) {
4014 4015 4016 4017
        int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
        if (TAOS_FAILED(code1)) {
          qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code1));
          T_LONG_JMP(pTaskInfo->env, code1);
4018 4019 4020 4021 4022 4023 4024 4025 4026 4027 4028 4029 4030
        }
      } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
        // do nothing, todo refactor
      } else {
        // expand the result into multiple rows. E.g., _wstart, top(k, 20)
        // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
        SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
        char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
        for (int32_t k = 0; k < pRow->numOfRows; ++k) {
          colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
        }
      }
    }
5
54liuyao 已提交
4031

4032
    pBlock->info.rows += pRow->numOfRows;
4033
    releaseOutputBuf(pState, &key, pRow);
4034 4035 4036 4037
  }
  blockDataUpdateTsWindow(pBlock, 0);
  return TSDB_CODE_SUCCESS;
}
5
54liuyao 已提交
4038 4039 4040 4041 4042 4043 4044

int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, int32_t size) {
  streamStateSessionPut(pState, key, (const void*)buf, size);
  releaseOutputBuf(pState, NULL, (SResultRow*)buf);
  return TSDB_CODE_SUCCESS;
}

4045
int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock,
5
54liuyao 已提交
4046
                                    SExprSupp* pSup, SGroupResInfo* pGroupResInfo) {
4047
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
5
54liuyao 已提交
4048 4049 4050 4051 4052 4053 4054 4055 4056 4057 4058 4059 4060
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  int32_t         numOfExprs = pSup->numOfExprs;
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
  SqlFunctionCtx* pCtx = pSup->pCtx;

  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);

  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
    SSessionKey* pKey = taosArrayGet(pGroupResInfo->pRows, i);
    int32_t      size = 0;
    void*        pVal = NULL;
    int32_t      code = streamStateSessionGet(pState, pKey, &pVal, &size);
    ASSERT(code == 0);
4061 4062 4063 4064
    if (code == -1) {
      // coverity scan
      continue;
    }
5
54liuyao 已提交
4065 4066 4067 4068 4069 4070 4071 4072 4073 4074 4075
    SResultRow* pRow = (SResultRow*)pVal;
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
    // no results, continue to check the next one
    if (pRow->numOfRows == 0) {
      pGroupResInfo->index += 1;
      releaseOutputBuf(pState, NULL, pRow);
      continue;
    }

    if (pBlock->info.groupId == 0) {
      pBlock->info.groupId = pKey->groupId;
4076 4077 4078 4079 4080 4081 4082 4083 4084 4085 4086 4087 4088 4089 4090 4091 4092 4093 4094 4095 4096 4097 4098 4099 4100

      if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
        SStreamStateAggOperatorInfo* pInfo = pOperator->info;

        char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t));
        if (tbname != NULL) {
          memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
        } else {
          pBlock->info.parTbName[0] = 0;
        }
      } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
                 pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
                 pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
        SStreamSessionAggOperatorInfo* pInfo = pOperator->info;

        char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t));
        if (tbname != NULL) {
          memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
        } else {
          pBlock->info.parTbName[0] = 0;
        }
      } else {
        ASSERT(0);
      }

5
54liuyao 已提交
4101 4102 4103 4104 4105 4106 4107 4108 4109 4110 4111 4112 4113 4114 4115 4116 4117 4118 4119 4120 4121 4122 4123 4124 4125 4126 4127 4128 4129 4130 4131 4132 4133 4134 4135 4136 4137 4138 4139 4140 4141 4142 4143 4144 4145
    } else {
      // current value belongs to different group, it can't be packed into one datablock
      if (pBlock->info.groupId != pKey->groupId) {
        releaseOutputBuf(pState, NULL, pRow);
        break;
      }
    }

    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
      ASSERT(pBlock->info.rows > 0);
      releaseOutputBuf(pState, NULL, pRow);
      break;
    }

    pGroupResInfo->index += 1;

    for (int32_t j = 0; j < numOfExprs; ++j) {
      int32_t slotId = pExprInfo[j].base.resSchema.slotId;

      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
      if (pCtx[j].fpSet.finalize) {
        int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
        if (TAOS_FAILED(code1)) {
          qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code1));
          T_LONG_JMP(pTaskInfo->env, code1);
        }
      } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
        // do nothing, todo refactor
      } else {
        // expand the result into multiple rows. E.g., _wstart, top(k, 20)
        // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
        SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
        char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
        for (int32_t k = 0; k < pRow->numOfRows; ++k) {
          colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
        }
      }
    }

    pBlock->info.rows += pRow->numOfRows;
    // saveSessionDiscBuf(pState, pKey, pVal, size);
    releaseOutputBuf(pState, NULL, pRow);
  }
  blockDataUpdateTsWindow(pBlock, 0);
  return TSDB_CODE_SUCCESS;
4146
}