executorimpl.c 158.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
15

H
Haojun Liao 已提交
16 17
#include "filter.h"
#include "function.h"
18 19
#include "functionMgt.h"
#include "os.h"
H
Haojun Liao 已提交
20
#include "querynodes.h"
21
#include "tfill.h"
dengyihao's avatar
dengyihao 已提交
22
#include "tname.h"
X
Xiaoyu Wang 已提交
23
#include "tref.h"
24

H
Haojun Liao 已提交
25
#include "tdatablock.h"
26
#include "tglobal.h"
H
Haojun Liao 已提交
27
#include "tmsg.h"
H
Haojun Liao 已提交
28
#include "tsort.h"
29
#include "ttime.h"
H
Haojun Liao 已提交
30

31
#include "executorimpl.h"
dengyihao's avatar
dengyihao 已提交
32
#include "index.h"
33
#include "query.h"
34 35
#include "tcompare.h"
#include "tcompression.h"
H
Haojun Liao 已提交
36
#include "thash.h"
37
#include "ttypes.h"
dengyihao's avatar
dengyihao 已提交
38
#include "vnode.h"
39

H
Haojun Liao 已提交
40
#define IS_MAIN_SCAN(runtime)          ((runtime)->scanFlag == MAIN_SCAN)
41 42 43 44 45 46
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)

#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)

#if 0
static UNUSED_FUNC void *u_malloc (size_t __size) {
wafwerar's avatar
wafwerar 已提交
47
  uint32_t v = taosRand();
48 49 50 51

  if (v % 1000 <= 0) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
52
    return taosMemoryMalloc(__size);
53 54 55 56
  }
}

static UNUSED_FUNC void* u_calloc(size_t num, size_t __size) {
wafwerar's avatar
wafwerar 已提交
57
  uint32_t v = taosRand();
58 59 60
  if (v % 1000 <= 0) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
61
    return taosMemoryCalloc(num, __size);
62 63 64 65
  }
}

static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
wafwerar's avatar
wafwerar 已提交
66
  uint32_t v = taosRand();
67 68 69
  if (v % 5 <= 1) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
70
    return taosMemoryRealloc(p, __size);
71 72 73 74 75 76 77 78
  }
}

#define calloc  u_calloc
#define malloc  u_malloc
#define realloc u_realloc
#endif

X
Xiaoyu Wang 已提交
79
#define CLEAR_QUERY_STATUS(q, st)   ((q)->status &= (~(st)))
80 81
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0)

L
Liu Jicong 已提交
82 83
int32_t getMaximumIdleDurationSec() { return tsShellActivityTimer * 2; }

84
static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pBlock);
85

X
Xiaoyu Wang 已提交
86
static void releaseQueryBuf(size_t numOfTables);
87

88 89 90 91
static void destroyFillOperatorInfo(void* param);
static void destroyProjectOperatorInfo(void* param);
static void destroyOrderOperatorInfo(void* param);
static void destroyAggOperatorInfo(void* param);
X
Xiaoyu Wang 已提交
92

93 94
static void destroyIntervalOperatorInfo(void* param);
static void destroyExchangeOperatorInfo(void* param);
H
Haojun Liao 已提交
95

96 97
static void destroyOperatorInfo(SOperatorInfo* pOperator);

98
void doSetOperatorCompleted(SOperatorInfo* pOperator) {
99
  pOperator->status = OP_EXEC_DONE;
H
Haojun Liao 已提交
100
  ASSERT(pOperator->pTaskInfo != NULL);
101

102
  pOperator->cost.totalCost = (taosGetTimestampUs() - pOperator->pTaskInfo->cost.start * 1000) / 1000.0;
H
Haojun Liao 已提交
103
  setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
104
}
105

H
Haojun Liao 已提交
106
int32_t operatorDummyOpenFn(SOperatorInfo* pOperator) {
107
  OPTR_SET_OPENED(pOperator);
108
  pOperator->cost.openCost = 0;
H
Haojun Liao 已提交
109
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
110 111
}

112
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t streamFn,
L
Liu Jicong 已提交
113
                                   __optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_encode_fn_t encode,
114
                                   __optr_decode_fn_t decode, __optr_explain_fn_t explain) {
115 116 117 118 119 120 121 122 123 124 125 126 127 128
  SOperatorFpSet fpSet = {
      ._openFn = openFn,
      .getNextFn = nextFn,
      .getStreamResFn = streamFn,
      .cleanupFn = cleanup,
      .closeFn = closeFn,
      .encodeResultRow = encode,
      .decodeResultRow = decode,
      .getExplainFn = explain,
  };

  return fpSet;
}

129 130
static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
                                  SGroupResInfo* pGroupResInfo);
H
Haojun Liao 已提交
131

132
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
133
static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
134

135
#if 0
L
Liu Jicong 已提交
136 137
static bool chkResultRowFromKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, char* pData,
                                int16_t bytes, bool masterscan, uint64_t uid) {
138 139 140
  bool existed = false;
  SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid);

L
Liu Jicong 已提交
141 142
  SResultRow** p1 =
      (SResultRow**)taosHashGet(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
143 144 145 146 147 148 149 150 151 152 153

  // in case of repeat scan/reverse scan, no new time window added.
  if (QUERY_IS_INTERVAL_QUERY(pRuntimeEnv->pQueryAttr)) {
    if (!masterscan) {  // the *p1 may be NULL in case of sliding+offset exists.
      return p1 != NULL;
    }

    if (p1 != NULL) {
      if (pResultRowInfo->size == 0) {
        existed = false;
      } else if (pResultRowInfo->size == 1) {
dengyihao's avatar
dengyihao 已提交
154
        //        existed = (pResultRowInfo->pResult[0] == (*p1));
155 156
      } else {  // check if current pResultRowInfo contains the existed pResultRow
        SET_RES_EXT_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid, pResultRowInfo);
L
Liu Jicong 已提交
157 158
        int64_t* index =
            taosHashGet(pRuntimeEnv->pResultRowListSet, pRuntimeEnv->keyBuf, GET_RES_EXT_WINDOW_KEY_LEN(bytes));
159 160 161 162 163 164 165 166 167 168 169 170 171
        if (index != NULL) {
          existed = true;
        } else {
          existed = false;
        }
      }
    }

    return existed;
  }

  return p1 != NULL;
}
172
#endif
173

174
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize) {
L
Liu Jicong 已提交
175
  SFilePage* pData = NULL;
176 177 178

  // in the first scan, new space needed for results
  int32_t pageId = -1;
179
  if (*currentPageId == -1) {
180
    pData = getNewBufPage(pResultBuf, &pageId);
181 182
    pData->num = sizeof(SFilePage);
  } else {
183 184
    pData = getBufPage(pResultBuf, *currentPageId);
    pageId = *currentPageId;
185

wmmhello's avatar
wmmhello 已提交
186
    if (pData->num + interBufSize > getBufPageSize(pResultBuf)) {
187
      // release current page first, and prepare the next one
188
      releaseBufPage(pResultBuf, pData);
189

190
      pData = getNewBufPage(pResultBuf, &pageId);
191 192 193 194 195 196 197 198 199 200
      if (pData != NULL) {
        pData->num = sizeof(SFilePage);
      }
    }
  }

  if (pData == NULL) {
    return NULL;
  }

201 202
  setBufPageDirty(pData, true);

203 204 205 206
  // set the number of rows in current disk page
  SResultRow* pResultRow = (SResultRow*)((char*)pData + pData->num);
  pResultRow->pageId = pageId;
  pResultRow->offset = (int32_t)pData->num;
207
  *currentPageId = pageId;
208

wmmhello's avatar
wmmhello 已提交
209
  pData->num += interBufSize;
210 211 212
  return pResultRow;
}

213 214 215 216 217 218 219
/**
 * the struct of key in hash table
 * +----------+---------------+
 * | group id |   key data    |
 * | 8 bytes  | actual length |
 * +----------+---------------+
 */
220 221 222
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData,
                                   int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo,
                                   bool isIntervalQuery, SAggSupporter* pSup) {
223
  SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
H
Haojun Liao 已提交
224

dengyihao's avatar
dengyihao 已提交
225
  SResultRowPosition* p1 =
226
      (SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
H
Haojun Liao 已提交
227

228 229
  SResultRow* pResult = NULL;

H
Haojun Liao 已提交
230 231
  // in case of repeat scan/reverse scan, no new time window added.
  if (isIntervalQuery) {
232
    if (masterscan && p1 != NULL) {  // the *p1 may be NULL in case of sliding+offset exists.
233
      pResult = getResultRowByPos(pResultBuf, p1, true);
234
      ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
H
Haojun Liao 已提交
235 236
    }
  } else {
dengyihao's avatar
dengyihao 已提交
237 238
    // In case of group by column query, the required SResultRow object must be existInCurrentResusltRowInfo in the
    // pResultRowInfo object.
H
Haojun Liao 已提交
239
    if (p1 != NULL) {
240
      // todo
241
      pResult = getResultRowByPos(pResultBuf, p1, true);
242
      ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
H
Haojun Liao 已提交
243 244 245
    }
  }

L
Liu Jicong 已提交
246
  // 1. close current opened time window
247
  if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId))) {
248
    SResultRowPosition pos = pResultRowInfo->cur;
X
Xiaoyu Wang 已提交
249
    SFilePage*         pPage = getBufPage(pResultBuf, pos.pageId);
250 251 252 253 254
    releaseBufPage(pResultBuf, pPage);
  }

  // allocate a new buffer page
  if (pResult == NULL) {
H
Haojun Liao 已提交
255
    ASSERT(pSup->resultRowSize > 0);
256
    pResult = getNewResultRow(pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
257

258 259
    // add a new result set for a new group
    SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
260
    tSimpleHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos,
L
Liu Jicong 已提交
261
                   sizeof(SResultRowPosition));
H
Haojun Liao 已提交
262 263
  }

264 265 266
  // 2. set the new time window to be the new active time window
  pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};

H
Haojun Liao 已提交
267
  // too many time window in query
268
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH &&
269
      tSimpleHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) {
270
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
H
Haojun Liao 已提交
271 272
  }

H
Haojun Liao 已提交
273
  return pResult;
H
Haojun Liao 已提交
274 275
}

276
// a new buffer page for each table. Needs to opt this design
L
Liu Jicong 已提交
277
static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, int32_t tid, uint32_t size) {
278 279 280 281
  if (pWindowRes->pageId != -1) {
    return 0;
  }

L
Liu Jicong 已提交
282
  SFilePage* pData = NULL;
283 284 285

  // in the first scan, new space needed for results
  int32_t pageId = -1;
286
  SIDList list = getDataBufPagesIdList(pResultBuf);
287 288

  if (taosArrayGetSize(list) == 0) {
289
    pData = getNewBufPage(pResultBuf, &pageId);
290
    pData->num = sizeof(SFilePage);
291 292
  } else {
    SPageInfo* pi = getLastPageInfo(list);
293
    pData = getBufPage(pResultBuf, getPageId(pi));
294
    pageId = getPageId(pi);
295

296
    if (pData->num + size > getBufPageSize(pResultBuf)) {
297
      // release current page first, and prepare the next one
298
      releaseBufPageInfo(pResultBuf, pi);
299

300
      pData = getNewBufPage(pResultBuf, &pageId);
301
      if (pData != NULL) {
302
        pData->num = sizeof(SFilePage);
303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322
      }
    }
  }

  if (pData == NULL) {
    return -1;
  }

  // set the number of rows in current disk page
  if (pWindowRes->pageId == -1) {  // not allocated yet, allocate new buffer
    pWindowRes->pageId = pageId;
    pWindowRes->offset = (int32_t)pData->num;

    pData->num += size;
    assert(pWindowRes->pageId >= 0);
  }

  return 0;
}

323
//  query_range_start, query_range_end, window_duration, window_start, window_end
324
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow) {
325 326 327
  pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP;
  pColData->info.bytes = sizeof(int64_t);

328
  colInfoDataEnsureCapacity(pColData, 5);
329 330 331 332 333 334 335 336 337
  colDataAppendInt64(pColData, 0, &pQueryWindow->skey);
  colDataAppendInt64(pColData, 1, &pQueryWindow->ekey);

  int64_t interval = 0;
  colDataAppendInt64(pColData, 2, &interval);  // this value may be variable in case of 'n' and 'y'.
  colDataAppendInt64(pColData, 3, &pQueryWindow->skey);
  colDataAppendInt64(pColData, 4, &pQueryWindow->ekey);
}

L
Liu Jicong 已提交
338
void cleanupExecTimeWindowInfo(SColumnInfoData* pColData) { colDataDestroy(pColData); }
H
Haojun Liao 已提交
339

340 341 342 343 344 345 346 347 348 349 350 351 352 353
typedef struct {
  bool    hasAgg;
  int32_t numOfRows;
  int32_t startOffset;
} SFunctionCtxStatus;

static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
  pStatus->hasAgg = pCtx->input.colDataAggIsSet;
  pStatus->numOfRows = pCtx->input.numOfRows;
  pStatus->startOffset = pCtx->input.startRowIndex;
}

static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
  pCtx->input.colDataAggIsSet = pStatus->hasAgg;
H
Haojun Liao 已提交
354
  pCtx->input.numOfRows = pStatus->numOfRows;
355 356 357 358 359
  pCtx->input.startRowIndex = pStatus->startOffset;
}

void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, int32_t offset,
                      int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput) {
360
  for (int32_t k = 0; k < numOfOutput; ++k) {
H
Haojun Liao 已提交
361
    // keep it temporarily
362 363
    SFunctionCtxStatus status = {0};
    functionCtxSave(&pCtx[k], &status);
364

365
    pCtx[k].input.startRowIndex = offset;
366
    pCtx[k].input.numOfRows = forwardStep;
367 368 369

    // not a whole block involved in query processing, statistics data can not be used
    // NOTE: the original value of isSet have been changed here
370 371
    if (pCtx[k].input.colDataAggIsSet && forwardStep < numOfTotal) {
      pCtx[k].input.colDataAggIsSet = false;
372 373
    }

374 375
    if (fmIsWindowPseudoColumnFunc(pCtx[k].functionId)) {
      SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]);
376 377

      char* p = GET_ROWCELL_INTERBUF(pEntryInfo);
378

379
      SColumnInfoData idata = {0};
dengyihao's avatar
dengyihao 已提交
380
      idata.info.type = TSDB_DATA_TYPE_BIGINT;
381
      idata.info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
dengyihao's avatar
dengyihao 已提交
382
      idata.pData = p;
383 384 385 386

      SScalarParam out = {.columnData = &idata};
      SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData};
      pCtx[k].sfp.process(&tw, 1, &out);
387
      pEntryInfo->numOfRes = 1;
388 389 390 391 392 393 394 395
    } else {
      int32_t code = TSDB_CODE_SUCCESS;
      if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) {
        code = pCtx[k].fpSet.process(&pCtx[k]);

        if (code != TSDB_CODE_SUCCESS) {
          qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code));
          taskInfo->code = code;
396
          T_LONG_JMP(taskInfo->env, code);
397
        }
398
      }
399

400
      // restore it
401
      functionCtxRestore(&pCtx[k], &status);
402
    }
403 404 405
  }
}

dengyihao's avatar
dengyihao 已提交
406
static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order,
407
                                   int32_t scanFlag, bool createDummyCol);
408

dengyihao's avatar
dengyihao 已提交
409 410
static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock,
                                    int32_t order) {
411
  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
412
    pCtx[i].order = order;
413
    pCtx[i].input.numOfRows = pBlock->info.rows;
414
    setBlockSMAInfo(&pCtx[i], &pOperator->exprSupp.pExprInfo[i], pBlock);
415
    pCtx[i].pSrcBlock = pBlock;
416 417 418
  }
}

X
Xiaoyu Wang 已提交
419 420
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order,
                       int32_t scanFlag, bool createDummyCol) {
421
  if (pBlock->pBlockAgg != NULL) {
H
Haojun Liao 已提交
422
    doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order);
423
  } else {
424
    doSetInputDataBlock(pOperator, pCtx, pBlock, order, scanFlag, createDummyCol);
H
Haojun Liao 已提交
425
  }
426 427
}

L
Liu Jicong 已提交
428 429
static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t paramIndex,
                                             int32_t numOfRows) {
430 431 432 433 434 435 436 437
  SColumnInfoData* pColInfo = NULL;
  if (pInput->pData[paramIndex] == NULL) {
    pColInfo = taosMemoryCalloc(1, sizeof(SColumnInfoData));
    if (pColInfo == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

    // Set the correct column info (data type and bytes)
438 439
    pColInfo->info.type = pFuncParam->param.nType;
    pColInfo->info.bytes = pFuncParam->param.nLen;
440 441

    pInput->pData[paramIndex] = pColInfo;
442 443
  } else {
    pColInfo = pInput->pData[paramIndex];
444 445
  }

446
  colInfoDataEnsureCapacity(pColInfo, numOfRows);
447

448
  int8_t type = pFuncParam->param.nType;
449 450
  if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT) {
    int64_t v = pFuncParam->param.i;
dengyihao's avatar
dengyihao 已提交
451
    for (int32_t i = 0; i < numOfRows; ++i) {
452 453 454 455
      colDataAppendInt64(pColInfo, i, &v);
    }
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
    double v = pFuncParam->param.d;
dengyihao's avatar
dengyihao 已提交
456
    for (int32_t i = 0; i < numOfRows; ++i) {
457 458
      colDataAppendDouble(pColInfo, i, &v);
    }
459
  } else if (type == TSDB_DATA_TYPE_VARCHAR) {
L
Liu Jicong 已提交
460
    char* tmp = taosMemoryMalloc(pFuncParam->param.nLen + VARSTR_HEADER_SIZE);
461
    STR_WITH_SIZE_TO_VARSTR(tmp, pFuncParam->param.pz, pFuncParam->param.nLen);
L
Liu Jicong 已提交
462
    for (int32_t i = 0; i < numOfRows; ++i) {
463 464
      colDataAppend(pColInfo, i, tmp, false);
    }
465 466 467 468 469
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
470
static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order,
X
Xiaoyu Wang 已提交
471
                                   int32_t scanFlag, bool createDummyCol) {
472 473
  int32_t code = TSDB_CODE_SUCCESS;

474
  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
L
Liu Jicong 已提交
475
    pCtx[i].order = order;
476 477
    pCtx[i].input.numOfRows = pBlock->info.rows;

L
Liu Jicong 已提交
478
    pCtx[i].pSrcBlock = pBlock;
X
Xiaoyu Wang 已提交
479
    pCtx[i].scanFlag = scanFlag;
H
Haojun Liao 已提交
480

481
    SInputColumnInfoData* pInput = &pCtx[i].input;
482
    pInput->uid = pBlock->info.uid;
C
Cary Xu 已提交
483
    pInput->colDataAggIsSet = false;
484

485
    SExprInfo* pOneExpr = &pOperator->exprSupp.pExprInfo[i];
486
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
dengyihao's avatar
dengyihao 已提交
487
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
G
Ganlin Zhao 已提交
488 489
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
        int32_t slotId = pFuncParam->pCol->slotId;
dengyihao's avatar
dengyihao 已提交
490
        pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
491 492 493
        pInput->totalRows = pBlock->info.rows;
        pInput->numOfRows = pBlock->info.rows;
        pInput->startRowIndex = 0;
494

495
        // NOTE: the last parameter is the primary timestamp column
H
Haojun Liao 已提交
496
        // todo: refactor this
497
        if (fmIsImplicitTsFunc(pCtx[i].functionId) && (j == pOneExpr->base.numOfParams - 1)) {
L
Liu Jicong 已提交
498 499
          pInput->pPTS = pInput->pData[j];  // in case of merge function, this is not always the ts column data.
                                            //          ASSERT(pInput->pPTS->info.type == TSDB_DATA_TYPE_TIMESTAMP);
500
        }
501 502
        ASSERT(pInput->pData[j] != NULL);
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
503 504 505
        // todo avoid case: top(k, 12), 12 is the value parameter.
        // sum(11), 11 is also the value parameter.
        if (createDummyCol && pOneExpr->base.numOfParams == 1) {
506 507 508 509
          pInput->totalRows = pBlock->info.rows;
          pInput->numOfRows = pBlock->info.rows;
          pInput->startRowIndex = 0;

510
          code = doCreateConstantValColumnInfo(pInput, pFuncParam, j, pBlock->info.rows);
511 512 513
          if (code != TSDB_CODE_SUCCESS) {
            return code;
          }
514
        }
G
Ganlin Zhao 已提交
515 516
      }
    }
H
Haojun Liao 已提交
517
  }
518 519

  return code;
H
Haojun Liao 已提交
520 521
}

522
static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) {
523
  for (int32_t k = 0; k < pOperator->exprSupp.numOfExprs; ++k) {
H
Haojun Liao 已提交
524
    if (functionNeedToExecute(&pCtx[k])) {
525
      // todo add a dummy funtion to avoid process check
526 527 528
      if (pCtx[k].fpSet.process == NULL) {
        continue;
      }
H
Haojun Liao 已提交
529

530 531 532 533
      int32_t code = pCtx[k].fpSet.process(&pCtx[k]);
      if (code != TSDB_CODE_SUCCESS) {
        qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
        return code;
534
      }
535 536
    }
  }
537 538

  return TSDB_CODE_SUCCESS;
539 540
}

H
Haojun Liao 已提交
541
static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, SArray* pPseudoList) {
dengyihao's avatar
dengyihao 已提交
542
  size_t num = (pPseudoList != NULL) ? taosArrayGetSize(pPseudoList) : 0;
H
Haojun Liao 已提交
543 544 545 546 547
  for (int32_t i = 0; i < num; ++i) {
    pCtx[i].pOutput = taosArrayGet(pResult->pDataBlock, i);
  }
}

548
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
X
Xiaoyu Wang 已提交
549
                              int32_t numOfOutput, SArray* pPseudoList) {
H
Haojun Liao 已提交
550
  setPseudoOutputColInfo(pResult, pCtx, pPseudoList);
551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570

  if (pSrcBlock == NULL) {
    for (int32_t k = 0; k < numOfOutput; ++k) {
      int32_t outputSlotId = pExpr[k].base.resSchema.slotId;

      ASSERT(pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE);
      SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);

      int32_t type = pExpr[k].base.pParam[0].param.nType;
      if (TSDB_DATA_TYPE_NULL == type) {
        colDataAppendNNULL(pColInfoData, 0, 1);
      } else {
        colDataAppend(pColInfoData, 0, taosVariantGet(&pExpr[k].base.pParam[0].param, type), false);
      }
    }

    pResult->info.rows = 1;
    return TSDB_CODE_SUCCESS;
  }

H
Haojun Liao 已提交
571
  pResult->info.groupId = pSrcBlock->info.groupId;
H
Haojun Liao 已提交
572

573 574
  // if the source equals to the destination, it is to create a new column as the result of scalar
  // function or some operators.
575 576
  bool createNewColModel = (pResult == pSrcBlock);

577 578
  int32_t numOfRows = 0;

579
  for (int32_t k = 0; k < numOfOutput; ++k) {
580 581
    int32_t               outputSlotId = pExpr[k].base.resSchema.slotId;
    SqlFunctionCtx*       pfCtx = &pCtx[k];
582
    SInputColumnInfoData* pInputData = &pfCtx->input;
583

L
Liu Jicong 已提交
584
    if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) {  // it is a project query
585
      SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
586
      if (pResult->info.rows > 0 && !createNewColModel) {
587
        colDataMergeCol(pColInfoData, pResult->info.rows, (int32_t*)&pResult->info.capacity, pInputData->pData[0],
588
                        pInputData->numOfRows);
589
      } else {
590
        colDataAssign(pColInfoData, pInputData->pData[0], pInputData->numOfRows, &pResult->info);
591
      }
592

593
      numOfRows = pInputData->numOfRows;
594
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) {
595
      SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
596

dengyihao's avatar
dengyihao 已提交
597
      int32_t offset = createNewColModel ? 0 : pResult->info.rows;
598 599 600 601 602 603 604 605

      int32_t type = pExpr[k].base.pParam[0].param.nType;
      if (TSDB_DATA_TYPE_NULL == type) {
        colDataAppendNNULL(pColInfoData, offset, pSrcBlock->info.rows);
      } else {
        for (int32_t i = 0; i < pSrcBlock->info.rows; ++i) {
          colDataAppend(pColInfoData, i + offset, taosVariantGet(&pExpr[k].base.pParam[0].param, type), false);
        }
606
      }
607 608

      numOfRows = pSrcBlock->info.rows;
H
Haojun Liao 已提交
609
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_OPERATOR) {
610 611 612
      SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
      taosArrayPush(pBlockList, &pSrcBlock);

613
      SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId);
614
      SColumnInfoData  idata = {.info = pResColData->info, .hasNull = true};
615

616
      SScalarParam dest = {.columnData = &idata};
X
Xiaoyu Wang 已提交
617
      int32_t      code = scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest);
618 619 620 621
      if (code != TSDB_CODE_SUCCESS) {
        taosArrayDestroy(pBlockList);
        return code;
      }
622

dengyihao's avatar
dengyihao 已提交
623
      int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
624
      ASSERT(pResult->info.capacity > 0);
625
      colDataMergeCol(pResColData, startOffset, (int32_t*)&pResult->info.capacity, &idata, dest.numOfRows);
D
dapan1121 已提交
626
      colDataDestroy(&idata);
L
Liu Jicong 已提交
627

628
      numOfRows = dest.numOfRows;
629 630
      taosArrayDestroy(pBlockList);
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) {
631 632
      // _rowts/_c0, not tbname column
      if (fmIsPseudoColumnFunc(pfCtx->functionId) && (!fmIsScanPseudoColumnFunc(pfCtx->functionId))) {
H
Haojun Liao 已提交
633
        // do nothing
634
      } else if (fmIsIndefiniteRowsFunc(pfCtx->functionId)) {
635 636
        SResultRowEntryInfo* pResInfo = GET_RES_INFO(pfCtx);
        pfCtx->fpSet.init(pfCtx, pResInfo);
637 638 639 640 641 642 643 644 645 646

        pfCtx->pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId);
        pfCtx->offset = createNewColModel ? 0 : pResult->info.rows;  // set the start offset

        // set the timestamp(_rowts) output buffer
        if (taosArrayGetSize(pPseudoList) > 0) {
          int32_t* outputColIndex = taosArrayGet(pPseudoList, 0);
          pfCtx->pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput;
        }

647 648 649 650 651
        // link pDstBlock to set selectivity value
        if (pfCtx->subsidiaries.num > 0) {
          pfCtx->pDstBlock = pResult;
        }

652
        numOfRows = pfCtx->fpSet.process(pfCtx);
H
Haojun Liao 已提交
653
      } else if (fmIsAggFunc(pfCtx->functionId)) {
G
Ganlin Zhao 已提交
654
        // selective value output should be set during corresponding function execution
655 656 657
        if (fmIsSelectValueFunc(pfCtx->functionId)) {
          continue;
        }
658 659
        // _group_key function for "partition by tbname" + csum(col_name) query
        SColumnInfoData* pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId);
660
        int32_t          slotId = pfCtx->param[0].pCol->slotId;
661 662 663

        // todo handle the json tag
        SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
664
        for (int32_t f = 0; f < pSrcBlock->info.rows; ++f) {
665 666 667 668 669 670 671 672 673
          bool isNull = colDataIsNull_s(pInput, f);
          if (isNull) {
            colDataAppendNULL(pOutput, pResult->info.rows + f);
          } else {
            char* data = colDataGetData(pInput, f);
            colDataAppend(pOutput, pResult->info.rows + f, data, isNull);
          }
        }

H
Haojun Liao 已提交
674 675 676
      } else {
        SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
        taosArrayPush(pBlockList, &pSrcBlock);
G
Ganlin Zhao 已提交
677

678
        SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId);
679
        SColumnInfoData  idata = {.info = pResColData->info, .hasNull = true};
H
Haojun Liao 已提交
680

681
        SScalarParam dest = {.columnData = &idata};
X
Xiaoyu Wang 已提交
682
        int32_t      code = scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest);
683 684 685 686
        if (code != TSDB_CODE_SUCCESS) {
          taosArrayDestroy(pBlockList);
          return code;
        }
687

dengyihao's avatar
dengyihao 已提交
688
        int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
689
        ASSERT(pResult->info.capacity > 0);
690
        colDataMergeCol(pResColData, startOffset, (int32_t*)&pResult->info.capacity, &idata, dest.numOfRows);
D
dapan1121 已提交
691
        colDataDestroy(&idata);
692 693

        numOfRows = dest.numOfRows;
H
Haojun Liao 已提交
694 695
        taosArrayDestroy(pBlockList);
      }
696
    } else {
697
      return TSDB_CODE_OPS_NOT_SUPPORT;
698 699
    }
  }
700

701 702 703
  if (!createNewColModel) {
    pResult->info.rows += numOfRows;
  }
704 705

  return TSDB_CODE_SUCCESS;
706 707
}

5
54liuyao 已提交
708
bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
709
  struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
710

711 712 713 714 715
  // in case of timestamp column, always generated results.
  int32_t functionId = pCtx->functionId;
  if (functionId == -1) {
    return false;
  }
716

717 718
  if (pCtx->scanFlag == REPEAT_SCAN) {
    return fmIsRepeatScanFunc(pCtx->functionId);
719 720
  }

721 722
  if (isRowEntryCompleted(pResInfo)) {
    return false;
723 724
  }

725 726 727
  return true;
}

728 729 730 731 732 733 734
static int32_t doCreateConstantValColumnAggInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t type,
                                                int32_t paramIndex, int32_t numOfRows) {
  if (pInput->pData[paramIndex] == NULL) {
    pInput->pData[paramIndex] = taosMemoryCalloc(1, sizeof(SColumnInfoData));
    if (pInput->pData[paramIndex] == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
735

736 737 738
    // Set the correct column info (data type and bytes)
    pInput->pData[paramIndex]->info.type = type;
    pInput->pData[paramIndex]->info.bytes = tDataTypes[type].bytes;
739
  }
H
Haojun Liao 已提交
740

741 742 743 744 745 746
  SColumnDataAgg* da = NULL;
  if (pInput->pColumnDataAgg[paramIndex] == NULL) {
    da = taosMemoryCalloc(1, sizeof(SColumnDataAgg));
    pInput->pColumnDataAgg[paramIndex] = da;
    if (da == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
747 748
    }
  } else {
749
    da = pInput->pColumnDataAgg[paramIndex];
750 751
  }

752
  ASSERT(!IS_VAR_DATA_TYPE(type));
753

754 755
  if (type == TSDB_DATA_TYPE_BIGINT) {
    int64_t v = pFuncParam->param.i;
756
    *da = (SColumnDataAgg){.numOfNull = 0, .min = v, .max = v, .sum = v * numOfRows};
757 758
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
    double v = pFuncParam->param.d;
759
    *da = (SColumnDataAgg){.numOfNull = 0};
760

761 762 763 764 765 766
    *(double*)&da->min = v;
    *(double*)&da->max = v;
    *(double*)&da->sum = v * numOfRows;
  } else if (type == TSDB_DATA_TYPE_BOOL) {  // todo validate this data type
    bool v = pFuncParam->param.i;

767
    *da = (SColumnDataAgg){.numOfNull = 0};
768 769 770 771 772
    *(bool*)&da->min = 0;
    *(bool*)&da->max = v;
    *(bool*)&da->sum = v * numOfRows;
  } else if (type == TSDB_DATA_TYPE_TIMESTAMP) {
    // do nothing
773
  } else {
774
    ASSERT(0);
775 776
  }

777 778
  return TSDB_CODE_SUCCESS;
}
779

780
void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pBlock) {
781 782 783 784 785 786 787 788 789
  int32_t numOfRows = pBlock->info.rows;

  SInputColumnInfoData* pInput = &pCtx->input;
  pInput->numOfRows = numOfRows;
  pInput->totalRows = numOfRows;

  if (pBlock->pBlockAgg != NULL) {
    pInput->colDataAggIsSet = true;

790 791
    for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
      SFunctParam* pFuncParam = &pExprInfo->base.pParam[j];
792

793 794
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
        int32_t slotId = pFuncParam->pCol->slotId;
795 796 797 798
        pInput->pColumnDataAgg[j] = pBlock->pBlockAgg[slotId];
        if (pInput->pColumnDataAgg[j] == NULL) {
          pInput->colDataAggIsSet = false;
        }
799 800 801 802

        // Here we set the column info data since the data type for each column data is required, but
        // the data in the corresponding SColumnInfoData will not be used.
        pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
803 804
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
        doCreateConstantValColumnAggInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows);
805 806
      }
    }
807
  } else {
808
    pInput->colDataAggIsSet = false;
809 810 811
  }
}

L
Liu Jicong 已提交
812
bool isTaskKilled(SExecTaskInfo* pTaskInfo) {
813 814
  // query has been executed more than tsShellActivityTimer, and the retrieve has not arrived
  // abort current query execution.
L
Liu Jicong 已提交
815 816
  if (pTaskInfo->owner != 0 &&
      ((taosGetTimestampSec() - pTaskInfo->cost.start / 1000) > 10 * getMaximumIdleDurationSec())
817 818
      /*(!needBuildResAfterQueryComplete(pTaskInfo))*/) {
    assert(pTaskInfo->cost.start != 0);
L
Liu Jicong 已提交
819 820 821
    //    qDebug("QInfo:%" PRIu64 " retrieve not arrive beyond %d ms, abort current query execution, start:%" PRId64
    //           ", current:%d", pQInfo->qId, 1, pQInfo->startExecTs, taosGetTimestampSec());
    //    return true;
822 823 824 825 826
  }

  return false;
}

L
Liu Jicong 已提交
827
void setTaskKilled(SExecTaskInfo* pTaskInfo) { pTaskInfo->code = TSDB_CODE_TSC_QUERY_CANCELLED; }
828 829

/////////////////////////////////////////////////////////////////////////////////////////////
830
STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key) {
L
Liu Jicong 已提交
831
  STimeWindow win = {0};
832
  win.skey = taosTimeTruncate(key, pInterval, precision);
833 834

  /*
H
Haojun Liao 已提交
835
   * if the realSkey > INT64_MAX - pInterval->interval, the query duration between
836 837
   * realSkey and realEkey must be less than one interval.Therefore, no need to adjust the query ranges.
   */
838 839 840
  win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
  if (win.ekey < win.skey) {
    win.ekey = INT64_MAX;
841
  }
842 843

  return win;
844 845
}

846
#if 0
H
Haojun Liao 已提交
847
static bool overlapWithTimeWindow(STaskAttr* pQueryAttr, SDataBlockInfo* pBlockInfo) {
848 849
  STimeWindow w = {0};

dengyihao's avatar
dengyihao 已提交
850 851
  TSKEY sk = TMIN(pQueryAttr->window.skey, pQueryAttr->window.ekey);
  TSKEY ek = TMAX(pQueryAttr->window.skey, pQueryAttr->window.ekey);
852

853
  if (true) {
L
Liu Jicong 已提交
854
    //    getAlignQueryTimeWindow(pQueryAttr, pBlockInfo->window.skey, sk, ek, &w);
855 856 857 858 859 860
    assert(w.ekey >= pBlockInfo->window.skey);

    if (w.ekey < pBlockInfo->window.ekey) {
      return true;
    }

L
Liu Jicong 已提交
861 862
    while (1) {
      //      getNextTimeWindow(pQueryAttr, &w);
863 864 865 866 867 868 869 870 871 872
      if (w.skey > pBlockInfo->window.ekey) {
        break;
      }

      assert(w.ekey > pBlockInfo->window.ekey);
      if (w.skey <= pBlockInfo->window.ekey && w.skey > pBlockInfo->window.skey) {
        return true;
      }
    }
  } else {
L
Liu Jicong 已提交
873
    //    getAlignQueryTimeWindow(pQueryAttr, pBlockInfo->window.ekey, sk, ek, &w);
874 875 876 877 878 879
    assert(w.skey <= pBlockInfo->window.ekey);

    if (w.skey > pBlockInfo->window.skey) {
      return true;
    }

L
Liu Jicong 已提交
880 881
    while (1) {
      //      getNextTimeWindow(pQueryAttr, &w);
882 883 884 885 886 887 888 889 890 891 892 893 894
      if (w.ekey < pBlockInfo->window.skey) {
        break;
      }

      assert(w.skey < pBlockInfo->window.skey);
      if (w.ekey < pBlockInfo->window.ekey && w.ekey >= pBlockInfo->window.skey) {
        return true;
      }
    }
  }

  return false;
}
895
#endif
896

L
Liu Jicong 已提交
897 898
int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
                              uint32_t* status) {
899
  *status = BLK_DATA_NOT_LOAD;
900

H
Haojun Liao 已提交
901
  pBlock->pDataBlock = NULL;
L
Liu Jicong 已提交
902
  pBlock->pBlockAgg = NULL;
H
Haojun Liao 已提交
903

L
Liu Jicong 已提交
904 905
  //  int64_t groupId = pRuntimeEnv->current->groupIndex;
  //  bool    ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
906

H
Haojun Liao 已提交
907
  STaskCostInfo* pCost = &pTaskInfo->cost;
908

909 910
//  pCost->totalBlocks += 1;
//  pCost->totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
911
#if 0
912 913 914
  // Calculate all time windows that are overlapping or contain current data block.
  // If current data block is contained by all possible time window, do not load current data block.
  if (/*pQueryAttr->pFilters || */pQueryAttr->groupbyColumn || pQueryAttr->sw.gap > 0 ||
H
Haojun Liao 已提交
915
      (QUERY_IS_INTERVAL_QUERY(pQueryAttr) && overlapWithTimeWindow(pTaskInfo, &pBlock->info))) {
916
    (*status) = BLK_DATA_DATA_LOAD;
917 918 919
  }

  // check if this data block is required to load
920
  if ((*status) != BLK_DATA_DATA_LOAD) {
921 922 923 924 925 926 927
    bool needFilter = true;

    // the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet,
    // the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer
    if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
      SResultRow* pResult = NULL;

H
Haojun Liao 已提交
928
      bool  masterScan = IS_MAIN_SCAN(pRuntimeEnv);
929 930 931 932 933 934
      TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;

      STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
      if (pQueryAttr->pointInterpQuery) {
        needFilter = chkWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, groupId,
                                    pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
935
                                    pTableScanInfo->rowEntryInfoOffset);
936 937 938
      } else {
        if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.uid, &win, masterScan, &pResult, groupId,
                                    pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
939
                                    pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) {
940
          T_LONG_JMP(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
941 942 943 944
        }
      }
    } else if (pQueryAttr->stableQuery && (!pQueryAttr->tsCompQuery) && (!pQueryAttr->diffQuery)) { // stable aggregate, not interval aggregate or normal column aggregate
      doSetTableGroupOutputBuf(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx,
945
                               pTableScanInfo->rowEntryInfoOffset, pTableScanInfo->numOfOutput,
946 947 948 949 950 951
                               pRuntimeEnv->current->groupIndex);
    }

    if (needFilter) {
      (*status) = doFilterByBlockTimeWindow(pTableScanInfo, pBlock);
    } else {
952
      (*status) = BLK_DATA_DATA_LOAD;
953 954 955 956
    }
  }

  SDataBlockInfo* pBlockInfo = &pBlock->info;
H
Haojun Liao 已提交
957
//  *status = updateBlockLoadStatus(pRuntimeEnv->pQueryAttr, *status);
958

959
  if ((*status) == BLK_DATA_NOT_LOAD || (*status) == BLK_DATA_FILTEROUT) {
960 961
    //qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey,
//           pBlockInfo->window.ekey, pBlockInfo->rows);
962
    pCost->skipBlocks += 1;
963
  } else if ((*status) == BLK_DATA_SMA_LOAD) {
964 965
    // this function never returns error?
    pCost->loadBlockStatis += 1;
966
//    tsdbRetrieveDatablockSMA(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg);
967 968

    if (pBlock->pBlockAgg == NULL) {  // data block statistics does not exist, load data block
969
//      pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL);
970 971 972
      pCost->totalCheckedRows += pBlock->info.rows;
    }
  } else {
973
    assert((*status) == BLK_DATA_DATA_LOAD);
974 975 976

    // load the data block statistics to perform further filter
    pCost->loadBlockStatis += 1;
977
//    tsdbRetrieveDatablockSMA(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg);
978 979 980 981 982 983

    if (pQueryAttr->topBotQuery && pBlock->pBlockAgg != NULL) {
      { // set previous window
        if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
          SResultRow* pResult = NULL;

H
Haojun Liao 已提交
984
          bool  masterScan = IS_MAIN_SCAN(pRuntimeEnv);
985 986 987 988 989
          TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;

          STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
          if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.uid, &win, masterScan, &pResult, groupId,
                                      pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
990
                                      pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) {
991
            T_LONG_JMP(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
992 993 994 995 996 997 998 999 1000 1001
          }
        }
      }
      bool load = false;
      for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
        int32_t functionId = pTableScanInfo->pCtx[i].functionId;
        if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM) {
//          load = topbot_datablock_filter(&pTableScanInfo->pCtx[i], (char*)&(pBlock->pBlockAgg[i].min),
//                                         (char*)&(pBlock->pBlockAgg[i].max));
          if (!load) { // current block has been discard due to filter applied
1002
            pCost->skipBlocks += 1;
1003 1004
            //qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId,
//                   pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
1005
            (*status) = BLK_DATA_FILTEROUT;
1006 1007 1008 1009 1010 1011 1012
            return TSDB_CODE_SUCCESS;
          }
        }
      }
    }

    // current block has been discard due to filter applied
H
Haojun Liao 已提交
1013
//    if (!doFilterByBlockSMA(pRuntimeEnv, pBlock->pBlockAgg, pTableScanInfo->pCtx, pBlockInfo->rows)) {
1014
//      pCost->skipBlocks += 1;
1015 1016
//      qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey,
//             pBlockInfo->window.ekey, pBlockInfo->rows);
1017
//      (*status) = BLK_DATA_FILTEROUT;
1018 1019 1020 1021 1022
//      return TSDB_CODE_SUCCESS;
//    }

    pCost->totalCheckedRows += pBlockInfo->rows;
    pCost->loadBlocks += 1;
1023
//    pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL);
1024 1025 1026 1027 1028
//    if (pBlock->pDataBlock == NULL) {
//      return terrno;
//    }

//    if (pQueryAttr->pFilters != NULL) {
1029
//      filterSetColFieldData(pQueryAttr->pFilters, taosArrayGetSize(pBlock->pDataBlock), pBlock->pDataBlock);
1030
//    }
1031

1032 1033 1034 1035
//    if (pQueryAttr->pFilters != NULL || pRuntimeEnv->pTsBuf != NULL) {
//      filterColRowsInDataBlock(pRuntimeEnv, pBlock, ascQuery);
//    }
  }
H
Haojun Liao 已提交
1036
#endif
1037 1038 1039
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
1040
static void updateTableQueryInfoForReverseScan(STableQueryInfo* pTableQueryInfo) {
1041 1042 1043 1044 1045
  if (pTableQueryInfo == NULL) {
    return;
  }
}

L
Liu Jicong 已提交
1046
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) {
1047
  if (status == TASK_NOT_COMPLETED) {
H
Haojun Liao 已提交
1048
    pTaskInfo->status = status;
1049 1050
  } else {
    // QUERY_NOT_COMPLETED is not compatible with any other status, so clear its position first
1051
    CLEAR_QUERY_STATUS(pTaskInfo, TASK_NOT_COMPLETED);
H
Haojun Liao 已提交
1052
    pTaskInfo->status |= status;
1053 1054 1055
  }
}

1056
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset) {
5
54liuyao 已提交
1057
  bool init = false;
1058
  for (int32_t i = 0; i < numOfOutput; ++i) {
1059
    pCtx[i].resultInfo = getResultEntryInfo(pResult, i, rowEntryInfoOffset);
5
54liuyao 已提交
1060 1061 1062
    if (init) {
      continue;
    }
1063 1064 1065 1066 1067

    struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
    if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) {
      continue;
    }
1068 1069 1070 1071 1072

    if (fmIsWindowPseudoColumnFunc(pCtx[i].functionId)) {
      continue;
    }

1073 1074 1075 1076 1077 1078
    if (!pResInfo->initialized) {
      if (pCtx[i].functionId != -1) {
        pCtx[i].fpSet.init(&pCtx[i], pResInfo);
      } else {
        pResInfo->initialized = true;
      }
5
54liuyao 已提交
1079 1080
    } else {
      init = true;
1081 1082 1083 1084
    }
  }
}

1085 1086
static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep,
                                                int32_t status);
1087

H
Haojun Liao 已提交
1088
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColMatchInfo, SFilterInfo* pFilterInfo) {
1089
  if (pFilterNode == NULL || pBlock->info.rows == 0) {
S
shenglian zhou 已提交
1090 1091
    return;
  }
1092

H
Haojun Liao 已提交
1093
  SFilterInfo* filter = pFilterInfo;
H
Haojun Liao 已提交
1094
  int64_t      st = taosGetTimestampUs();
H
Haojun Liao 已提交
1095

H
Haojun Liao 已提交
1096
  //  pError("start filter");
H
Haojun Liao 已提交
1097

H
Haojun Liao 已提交
1098
  // todo move to the initialization function
H
Haojun Liao 已提交
1099
  int32_t code = 0;
H
Haojun Liao 已提交
1100
  bool    needFree = false;
H
Haojun Liao 已提交
1101
  if (filter == NULL) {
1102
    needFree = true;
H
Haojun Liao 已提交
1103 1104
    code = filterInitFromNode((SNode*)pFilterNode, &filter, 0);
  }
1105

1106
  SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
1107 1108
  code = filterSetDataFromSlotId(filter, &param1);

1109
  SColumnInfoData* p = NULL;
1110
  int32_t          status = 0;
H
Haojun Liao 已提交
1111

1112
  // todo the keep seems never to be True??
1113
  bool keep = filterExecute(filter, pBlock, &p, NULL, param1.numOfCols, &status);
1114 1115 1116 1117

  if (needFree) {
    filterFreeInfo(filter);
  }
H
Haojun Liao 已提交
1118

1119
  extractQualifiedTupleByFilterResult(pBlock, p, keep, status);
H
Haojun Liao 已提交
1120

1121
  if (pColMatchInfo != NULL) {
1122
    for (int32_t i = 0; i < taosArrayGetSize(pColMatchInfo); ++i) {
1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133
      SColMatchInfo* pInfo = taosArrayGet(pColMatchInfo, i);
      if (pInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pInfo->targetSlotId);
        if (pColData->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
          blockDataUpdateTsWindow(pBlock, pInfo->targetSlotId);
          break;
        }
      }
    }
  }

1134 1135
  colDataDestroy(p);
  taosMemoryFree(p);
1136 1137
}

1138
void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep, int32_t status) {
1139 1140 1141 1142
  if (keep) {
    return;
  }

H
Haojun Liao 已提交
1143 1144 1145
  int32_t totalRows = pBlock->info.rows;

  if (status == FILTER_RESULT_ALL_QUALIFIED) {
1146
    // here nothing needs to be done
H
Haojun Liao 已提交
1147
  } else if (status == FILTER_RESULT_NONE_QUALIFIED) {
1148
    pBlock->info.rows = 0;
H
Haojun Liao 已提交
1149
  } else {
1150
    SSDataBlock* px = createOneDataBlock(pBlock, true);
1151

1152 1153
    size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
    for (int32_t i = 0; i < numOfCols; ++i) {
1154 1155
      SColumnInfoData* pSrc = taosArrayGet(px->pDataBlock, i);
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
1156
      // it is a reserved column for scalar function, and no data in this column yet.
1157
      if (pDst->pData == NULL || pSrc->pData == NULL) {
1158 1159 1160
        continue;
      }

1161 1162
      colInfoDataCleanup(pDst, pBlock->info.rows);

1163
      int32_t numOfRows = 0;
1164
      for (int32_t j = 0; j < totalRows; ++j) {
1165
        if (((int8_t*)p->pData)[j] == 0) {
D
dapan1121 已提交
1166 1167
          continue;
        }
1168

D
dapan1121 已提交
1169
        if (colDataIsNull_s(pSrc, j)) {
1170
          colDataAppendNULL(pDst, numOfRows);
D
dapan1121 已提交
1171
        } else {
1172
          colDataAppend(pDst, numOfRows, colDataGetData(pSrc, j), false);
D
dapan1121 已提交
1173
        }
1174
        numOfRows += 1;
H
Haojun Liao 已提交
1175
      }
1176

1177
      // todo this value can be assigned directly
1178 1179 1180 1181 1182
      if (pBlock->info.rows == totalRows) {
        pBlock->info.rows = numOfRows;
      } else {
        ASSERT(pBlock->info.rows == numOfRows);
      }
1183
    }
1184

dengyihao's avatar
dengyihao 已提交
1185
    blockDataDestroy(px);  // fix memory leak
1186 1187 1188
  }
}

1189
void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
1190
  // for simple group by query without interval, all the tables belong to one group result.
1191 1192 1193
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
  SAggOperatorInfo* pAggInfo = pOperator->info;

1194
  SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo;
1195 1196
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
  int32_t*        rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
1197

1198
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId,
L
Liu Jicong 已提交
1199
                                                  sizeof(groupId), true, groupId, pTaskInfo, false, &pAggInfo->aggSup);
L
Liu Jicong 已提交
1200
  assert(pResultRow != NULL);
1201 1202 1203 1204 1205 1206

  /*
   * not assign result buffer yet, add new result buffer
   * all group belong to one result set, and each group result has different group id so set the id to be one
   */
  if (pResultRow->pageId == -1) {
dengyihao's avatar
dengyihao 已提交
1207 1208
    int32_t ret =
        addNewWindowResultBuf(pResultRow, pAggInfo->aggSup.pResultBuf, groupId, pAggInfo->binfo.pRes->info.rowSize);
1209 1210 1211 1212 1213
    if (ret != TSDB_CODE_SUCCESS) {
      return;
    }
  }

1214
  setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
1215 1216
}

1217 1218 1219
static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
  SAggOperatorInfo* pAggInfo = pOperator->info;
  if (pAggInfo->groupId != UINT64_MAX && pAggInfo->groupId == groupId) {
1220 1221
    return;
  }
1222 1223

  doSetTableGroupOutputBuf(pOperator, numOfOutput, groupId);
1224 1225

  // record the current active group id
H
Haojun Liao 已提交
1226
  pAggInfo->groupId = groupId;
1227 1228
}

dengyihao's avatar
dengyihao 已提交
1229 1230
static void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs,
                              const int32_t* rowCellOffset) {
1231
  bool returnNotNull = false;
1232
  for (int32_t j = 0; j < numOfExprs; ++j) {
1233
    struct SResultRowEntryInfo* pResInfo = getResultEntryInfo(pRow, j, rowCellOffset);
1234 1235 1236 1237 1238 1239 1240
    if (!isRowEntryInitialized(pResInfo)) {
      continue;
    }

    if (pRow->numOfRows < pResInfo->numOfRes) {
      pRow->numOfRows = pResInfo->numOfRes;
    }
1241

1242
    if (fmIsNotNullOutputFunc(pCtx[j].functionId)) {
1243 1244
      returnNotNull = true;
    }
1245
  }
S
shenglian zhou 已提交
1246 1247
  // if all expr skips all blocks, e.g. all null inputs for max function, output one row in final result.
  //  except for first/last, which require not null output, output no rows
1248
  if (pRow->numOfRows == 0 && !returnNotNull) {
1249
    pRow->numOfRows = 1;
1250 1251 1252
  }
}

1253 1254
static void doCopyResultToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx,
                                    SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo) {
1255 1256 1257
  for (int32_t j = 0; j < numOfExprs; ++j) {
    int32_t slotId = pExprInfo[j].base.resSchema.slotId;

1258
    pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
1259
    if (pCtx[j].fpSet.finalize) {
1260
      if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_group_key") == 0) {
1261 1262
        // for groupkey along with functions that output multiple lines(e.g. Histogram)
        // need to match groupkey result for each output row of that function.
1263 1264 1265 1266 1267
        if (pCtx[j].resultInfo->numOfRes != 0) {
          pCtx[j].resultInfo->numOfRes = pRow->numOfRows;
        }
      }

1268 1269 1270
      int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
      if (TAOS_FAILED(code)) {
        qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
1271
        T_LONG_JMP(pTaskInfo->env, code);
1272 1273
      }
    } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
1274
      // do nothing
1275
    } else {
1276 1277
      // expand the result into multiple rows. E.g., _wstart, top(k, 20)
      // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
1278 1279 1280 1281 1282 1283 1284
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
      char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
      for (int32_t k = 0; k < pRow->numOfRows; ++k) {
        colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
      }
    }
  }
1285 1286
}

1287 1288 1289
// todo refactor. SResultRow has direct pointer in miainfo
int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
                           SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315
  SFilePage*  page = getBufPage(pBuf, resultRowPosition->pageId);
  SResultRow* pRow = (SResultRow*)((char*)page + resultRowPosition->offset);

  SqlFunctionCtx* pCtx = pSup->pCtx;
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  const int32_t*  rowEntryOffset = pSup->rowEntryInfoOffset;

  doUpdateNumOfRows(pCtx, pRow, pSup->numOfExprs, rowEntryOffset);
  if (pRow->numOfRows == 0) {
    releaseBufPage(pBuf, page);
    return 0;
  }

  int32_t size = pBlock->info.capacity;
  while (pBlock->info.rows + pRow->numOfRows > size) {
    size = size * 1.25;
  }

  int32_t code = blockDataEnsureCapacity(pBlock, size);
  if (TAOS_FAILED(code)) {
    releaseBufPage(pBuf, page);
    qError("%s ensure result data capacity failed, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
    T_LONG_JMP(pTaskInfo->env, code);
  }

  doCopyResultToDataBlock(pExprInfo, pSup->numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
1316 1317

  releaseBufPage(pBuf, page);
1318
  pBlock->info.rows += pRow->numOfRows;
1319 1320 1321
  return 0;
}

1322 1323 1324 1325 1326 1327 1328
int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
                           SGroupResInfo* pGroupResInfo) {
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  int32_t         numOfExprs = pSup->numOfExprs;
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
  SqlFunctionCtx* pCtx = pSup->pCtx;

1329
  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
1330

1331
  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
L
Liu Jicong 已提交
1332 1333
    SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
    SFilePage*  page = getBufPage(pBuf, pPos->pos.pageId);
1334

1335
    SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset);
1336

H
Haojun Liao 已提交
1337
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
1338 1339

    // no results, continue to check the next one
1340 1341
    if (pRow->numOfRows == 0) {
      pGroupResInfo->index += 1;
1342
      releaseBufPage(pBuf, page);
1343 1344 1345
      continue;
    }

1346 1347 1348 1349 1350
    if (pBlock->info.groupId == 0) {
      pBlock->info.groupId = pPos->groupId;
    } else {
      // current value belongs to different group, it can't be packed into one datablock
      if (pBlock->info.groupId != pPos->groupId) {
1351
        releaseBufPage(pBuf, page);
1352 1353 1354 1355
        break;
      }
    }

1356
    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
1357
      ASSERT(pBlock->info.rows > 0);
1358
      releaseBufPage(pBuf, page);
1359 1360 1361 1362
      break;
    }

    pGroupResInfo->index += 1;
1363
    doCopyResultToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
1364

1365
    releaseBufPage(pBuf, page);
1366
    pBlock->info.rows += pRow->numOfRows;
1367 1368
  }

X
Xiaoyu Wang 已提交
1369 1370
  qDebug("%s result generated, rows:%d, groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
         pBlock->info.groupId);
1371

1372
  blockDataUpdateTsWindow(pBlock, 0);
1373 1374 1375
  return 0;
}

1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415
void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
                           SDiskbasedBuf* pBuf) {
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  SSDataBlock*   pBlock = pbInfo->pRes;

  // set output datablock version
  pBlock->info.version = pTaskInfo->version;

  blockDataCleanup(pBlock);
  if (!hasRemainResults(pGroupResInfo)) {
    return;
  }

  // clear the existed group id
  pBlock->info.groupId = 0;
  ASSERT(!pbInfo->mergeResultBlock);
  doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
  if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
    SStreamStateAggOperatorInfo* pInfo = pOperator->info;

    char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t));
    if (tbname != NULL) {
      memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
    } else {
      pBlock->info.parTbName[0] = 0;
    }
  } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
             pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
             pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
    SStreamSessionAggOperatorInfo* pInfo = pOperator->info;

    char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t));
    if (tbname != NULL) {
      memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
    } else {
      pBlock->info.parTbName[0] = 0;
    }
  }
}

X
Xiaoyu Wang 已提交
1416 1417
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
                            SDiskbasedBuf* pBuf) {
1418
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1419
  SSDataBlock*   pBlock = pbInfo->pRes;
1420

1421 1422 1423
  // set output datablock version
  pBlock->info.version = pTaskInfo->version;

1424
  blockDataCleanup(pBlock);
1425
  if (!hasRemainResults(pGroupResInfo)) {
1426 1427 1428
    return;
  }

1429 1430
  // clear the existed group id
  pBlock->info.groupId = 0;
1431 1432 1433
  if (!pbInfo->mergeResultBlock) {
    doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
  } else {
dengyihao's avatar
dengyihao 已提交
1434
    while (hasRemainResults(pGroupResInfo)) {
1435 1436 1437
      doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
      if (pBlock->info.rows >= pOperator->resultInfo.threshold) {
        break;
1438 1439
      }

1440 1441
      // clearing group id to continue to merge data that belong to different groups
      pBlock->info.groupId = 0;
1442
    }
1443 1444 1445

    // clear the group id info in SSDataBlock, since the client does not need it
    pBlock->info.groupId = 0;
1446 1447 1448
  }
}

L
Liu Jicong 已提交
1449
static int32_t compressQueryColData(SColumnInfoData* pColRes, int32_t numOfRows, char* data, int8_t compressed) {
1450 1451
  int32_t colSize = pColRes->info.bytes * numOfRows;
  return (*(tDataTypes[pColRes->info.type].compFunc))(pColRes->pData, colSize, numOfRows, data,
L
Liu Jicong 已提交
1452
                                                      colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0);
1453 1454
}

L
Liu Jicong 已提交
1455 1456
void queryCostStatis(SExecTaskInfo* pTaskInfo) {
  STaskCostInfo* pSummary = &pTaskInfo->cost;
1457

L
Liu Jicong 已提交
1458 1459 1460
  //  uint64_t hashSize = taosHashGetMemSize(pQInfo->runtimeEnv.pResultRowHashTable);
  //  hashSize += taosHashGetMemSize(pRuntimeEnv->tableqinfoGroupInfo.map);
  //  pSummary->hashSize = hashSize;
1461

L
Liu Jicong 已提交
1462 1463 1464 1465 1466 1467 1468 1469 1470
  //  SResultRowPool* p = pTaskInfo->pool;
  //  if (p != NULL) {
  //    pSummary->winInfoSize = getResultRowPoolMemSize(p);
  //    pSummary->numOfTimeWindows = getNumOfAllocatedResultRows(p);
  //  } else {
  //    pSummary->winInfoSize = 0;
  //    pSummary->numOfTimeWindows = 0;
  //  }

1471 1472
  SFileBlockLoadRecorder* pRecorder = pSummary->pRecoder;
  if (pSummary->pRecoder != NULL) {
1473
    qDebug(
X
Xiaoyu Wang 已提交
1474 1475 1476 1477
        "%s :cost summary: elapsed time:%.2f ms, total blocks:%d, load block SMA:%d, load data block:%d, total "
        "rows:%" PRId64 ", check rows:%" PRId64,
        GET_TASKID(pTaskInfo), pSummary->elapsedTime / 1000.0, pRecorder->totalBlocks, pRecorder->loadBlockStatis,
        pRecorder->loadBlocks, pRecorder->totalRows, pRecorder->totalCheckedRows);
1478
  }
1479

L
Liu Jicong 已提交
1480 1481 1482
  // qDebug("QInfo:0x%"PRIx64" :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb,
  // hashTable:%.2f Kb", pQInfo->qId, pSummary->winInfoSize/1024.0,
  //      pSummary->numOfTimeWindows, pSummary->tableInfoSize/1024.0, pSummary->hashSize/1024.0);
1483 1484
}

L
Liu Jicong 已提交
1485 1486 1487
// static void updateOffsetVal(STaskRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBlockInfo) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//   STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
1488
//
L
Liu Jicong 已提交
1489
//   int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
1490
//
L
Liu Jicong 已提交
1491 1492 1493 1494
//   if (pQueryAttr->limit.offset == pBlockInfo->rows) {  // current block will ignore completed
//     pTableQueryInfo->lastKey = QUERY_IS_ASC_QUERY(pQueryAttr) ? pBlockInfo->window.ekey + step :
//     pBlockInfo->window.skey + step; pQueryAttr->limit.offset = 0; return;
//   }
1495
//
L
Liu Jicong 已提交
1496 1497 1498 1499 1500
//   if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//     pQueryAttr->pos = (int32_t)pQueryAttr->limit.offset;
//   } else {
//     pQueryAttr->pos = pBlockInfo->rows - (int32_t)pQueryAttr->limit.offset - 1;
//   }
1501
//
L
Liu Jicong 已提交
1502
//   assert(pQueryAttr->pos >= 0 && pQueryAttr->pos <= pBlockInfo->rows - 1);
1503
//
L
Liu Jicong 已提交
1504 1505
//   SArray *         pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
//   SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
1506
//
L
Liu Jicong 已提交
1507 1508
//   // update the pQueryAttr->limit.offset value, and pQueryAttr->pos value
//   TSKEY *keys = (TSKEY *) pColInfoData->pData;
1509
//
L
Liu Jicong 已提交
1510 1511 1512
//   // update the offset value
//   pTableQueryInfo->lastKey = keys[pQueryAttr->pos];
//   pQueryAttr->limit.offset = 0;
1513
//
L
Liu Jicong 已提交
1514
//   int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock);
1515
//
L
Liu Jicong 已提交
1516 1517 1518 1519
//   //qDebug("QInfo:0x%"PRIx64" check data block, brange:%" PRId64 "-%" PRId64 ", numBlocksOfStep:%d, numOfRes:%d,
//   lastKey:%"PRId64, GET_TASKID(pRuntimeEnv),
//          pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes, pQuery->current->lastKey);
// }
1520

L
Liu Jicong 已提交
1521 1522
// void skipBlocks(STaskRuntimeEnv *pRuntimeEnv) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
1523
//
L
Liu Jicong 已提交
1524 1525 1526
//   if (pQueryAttr->limit.offset <= 0 || pQueryAttr->numOfFilterCols > 0) {
//     return;
//   }
1527
//
L
Liu Jicong 已提交
1528 1529
//   pQueryAttr->pos = 0;
//   int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
1530
//
L
Liu Jicong 已提交
1531 1532
//   STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
//   TsdbQueryHandleT pTsdbReadHandle = pRuntimeEnv->pTsdbReadHandle;
1533
//
L
Liu Jicong 已提交
1534 1535 1536
//   SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
//   while (tsdbNextDataBlock(pTsdbReadHandle)) {
//     if (isTaskKilled(pRuntimeEnv->qinfo)) {
1537
//       T_LONG_JMP(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
L
Liu Jicong 已提交
1538
//     }
1539
//
L
Liu Jicong 已提交
1540
//     tsdbRetrieveDataBlockInfo(pTsdbReadHandle, &blockInfo);
1541
//
L
Liu Jicong 已提交
1542 1543 1544 1545
//     if (pQueryAttr->limit.offset > blockInfo.rows) {
//       pQueryAttr->limit.offset -= blockInfo.rows;
//       pTableQueryInfo->lastKey = (QUERY_IS_ASC_QUERY(pQueryAttr)) ? blockInfo.window.ekey : blockInfo.window.skey;
//       pTableQueryInfo->lastKey += step;
1546
//
L
Liu Jicong 已提交
1547 1548 1549 1550 1551 1552 1553
//       //qDebug("QInfo:0x%"PRIx64" skip rows:%d, offset:%" PRId64, GET_TASKID(pRuntimeEnv), blockInfo.rows,
//              pQuery->limit.offset);
//     } else {  // find the appropriated start position in current block
//       updateOffsetVal(pRuntimeEnv, &blockInfo);
//       break;
//     }
//   }
1554
//
L
Liu Jicong 已提交
1555
//   if (terrno != TSDB_CODE_SUCCESS) {
1556
//     T_LONG_JMP(pRuntimeEnv->env, terrno);
L
Liu Jicong 已提交
1557 1558 1559 1560 1561 1562 1563
//   }
// }

// static TSKEY doSkipIntervalProcess(STaskRuntimeEnv* pRuntimeEnv, STimeWindow* win, SDataBlockInfo* pBlockInfo,
// STableQueryInfo* pTableQueryInfo) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//   SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
1564
//
L
Liu Jicong 已提交
1565 1566 1567
//   assert(pQueryAttr->limit.offset == 0);
//   STimeWindow tw = *win;
//   getNextTimeWindow(pQueryAttr, &tw);
1568
//
L
Liu Jicong 已提交
1569 1570
//   if ((tw.skey <= pBlockInfo->window.ekey && QUERY_IS_ASC_QUERY(pQueryAttr)) ||
//       (tw.ekey >= pBlockInfo->window.skey && !QUERY_IS_ASC_QUERY(pQueryAttr))) {
1571
//
L
Liu Jicong 已提交
1572 1573 1574 1575
//     // load the data block and check data remaining in current data block
//     // TODO optimize performance
//     SArray *         pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
//     SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
1576
//
L
Liu Jicong 已提交
1577 1578 1579 1580
//     tw = *win;
//     int32_t startPos =
//         getNextQualifiedWindow(pQueryAttr, &tw, pBlockInfo, pColInfoData->pData, binarySearchForKey, -1);
//     assert(startPos >= 0);
1581
//
L
Liu Jicong 已提交
1582 1583
//     // set the abort info
//     pQueryAttr->pos = startPos;
1584
//
L
Liu Jicong 已提交
1585 1586 1587 1588
//     // reset the query start timestamp
//     pTableQueryInfo->win.skey = ((TSKEY *)pColInfoData->pData)[startPos];
//     pQueryAttr->window.skey = pTableQueryInfo->win.skey;
//     TSKEY key = pTableQueryInfo->win.skey;
1589
//
L
Liu Jicong 已提交
1590 1591
//     pWindowResInfo->prevSKey = tw.skey;
//     int32_t index = pRuntimeEnv->resultRowInfo.curIndex;
1592
//
L
Liu Jicong 已提交
1593 1594
//     int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock);
//     pRuntimeEnv->resultRowInfo.curIndex = index;  // restore the window index
1595
//
L
Liu Jicong 已提交
1596 1597 1598 1599
//     //qDebug("QInfo:0x%"PRIx64" check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d,
//     lastKey:%" PRId64,
//            GET_TASKID(pRuntimeEnv), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes,
//            pQueryAttr->current->lastKey);
1600
//
L
Liu Jicong 已提交
1601 1602 1603 1604 1605
//     return key;
//   } else {  // do nothing
//     pQueryAttr->window.skey      = tw.skey;
//     pWindowResInfo->prevSKey = tw.skey;
//     pTableQueryInfo->lastKey = tw.skey;
1606
//
L
Liu Jicong 已提交
1607 1608
//     return tw.skey;
//   }
1609
//
L
Liu Jicong 已提交
1610 1611 1612 1613 1614 1615 1616 1617 1618 1619
//   return true;
// }

// static bool skipTimeInterval(STaskRuntimeEnv *pRuntimeEnv, TSKEY* start) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//   if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//     assert(*start <= pRuntimeEnv->current->lastKey);
//   } else {
//     assert(*start >= pRuntimeEnv->current->lastKey);
//   }
1620
//
L
Liu Jicong 已提交
1621 1622 1623 1624 1625
//   // if queried with value filter, do NOT forward query start position
//   if (pQueryAttr->limit.offset <= 0 || pQueryAttr->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL ||
//   pRuntimeEnv->pFillInfo != NULL) {
//     return true;
//   }
1626
//
L
Liu Jicong 已提交
1627 1628 1629 1630 1631 1632 1633
//   /*
//    * 1. for interval without interpolation query we forward pQueryAttr->interval.interval at a time for
//    *    pQueryAttr->limit.offset times. Since hole exists, pQueryAttr->interval.interval*pQueryAttr->limit.offset
//    value is
//    *    not valid. otherwise, we only forward pQueryAttr->limit.offset number of points
//    */
//   assert(pRuntimeEnv->resultRowInfo.prevSKey == TSKEY_INITIAL_VAL);
1634
//
L
Liu Jicong 已提交
1635 1636
//   STimeWindow w = TSWINDOW_INITIALIZER;
//   bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
1637
//
L
Liu Jicong 已提交
1638 1639
//   SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
//   STableQueryInfo *pTableQueryInfo = pRuntimeEnv->current;
1640
//
L
Liu Jicong 已提交
1641 1642 1643
//   SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
//   while (tsdbNextDataBlock(pRuntimeEnv->pTsdbReadHandle)) {
//     tsdbRetrieveDataBlockInfo(pRuntimeEnv->pTsdbReadHandle, &blockInfo);
1644
//
L
Liu Jicong 已提交
1645 1646 1647 1648 1649 1650 1651 1652 1653
//     if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//       if (pWindowResInfo->prevSKey == TSKEY_INITIAL_VAL) {
//         getAlignQueryTimeWindow(pQueryAttr, blockInfo.window.skey, blockInfo.window.skey, pQueryAttr->window.ekey,
//         &w); pWindowResInfo->prevSKey = w.skey;
//       }
//     } else {
//       getAlignQueryTimeWindow(pQueryAttr, blockInfo.window.ekey, pQueryAttr->window.ekey, blockInfo.window.ekey, &w);
//       pWindowResInfo->prevSKey = w.skey;
//     }
1654
//
L
Liu Jicong 已提交
1655 1656
//     // the first time window
//     STimeWindow win = getActiveTimeWindow(pWindowResInfo, pWindowResInfo->prevSKey, pQueryAttr);
1657
//
L
Liu Jicong 已提交
1658 1659
//     while (pQueryAttr->limit.offset > 0) {
//       STimeWindow tw = win;
1660
//
L
Liu Jicong 已提交
1661 1662 1663
//       if ((win.ekey <= blockInfo.window.ekey && ascQuery) || (win.ekey >= blockInfo.window.skey && !ascQuery)) {
//         pQueryAttr->limit.offset -= 1;
//         pWindowResInfo->prevSKey = win.skey;
1664
//
L
Liu Jicong 已提交
1665 1666 1667 1668 1669 1670
//         // current time window is aligned with blockInfo.window.ekey
//         // restart it from next data block by set prevSKey to be TSKEY_INITIAL_VAL;
//         if ((win.ekey == blockInfo.window.ekey && ascQuery) || (win.ekey == blockInfo.window.skey && !ascQuery)) {
//           pWindowResInfo->prevSKey = TSKEY_INITIAL_VAL;
//         }
//       }
1671
//
L
Liu Jicong 已提交
1672 1673 1674 1675
//       if (pQueryAttr->limit.offset == 0) {
//         *start = doSkipIntervalProcess(pRuntimeEnv, &win, &blockInfo, pTableQueryInfo);
//         return true;
//       }
1676
//
L
Liu Jicong 已提交
1677 1678
//       // current window does not ended in current data block, try next data block
//       getNextTimeWindow(pQueryAttr, &tw);
1679
//
L
Liu Jicong 已提交
1680 1681 1682 1683 1684 1685 1686 1687 1688
//       /*
//        * If the next time window still starts from current data block,
//        * load the primary timestamp column first, and then find the start position for the next queried time window.
//        * Note that only the primary timestamp column is required.
//        * TODO: Optimize for this cases. All data blocks are not needed to be loaded, only if the first actually
//        required
//        * time window resides in current data block.
//        */
//       if ((tw.skey <= blockInfo.window.ekey && ascQuery) || (tw.ekey >= blockInfo.window.skey && !ascQuery)) {
1689
//
L
Liu Jicong 已提交
1690 1691
//         SArray *pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
//         SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
1692
//
L
Liu Jicong 已提交
1693 1694 1695
//         if ((win.ekey > blockInfo.window.ekey && ascQuery) || (win.ekey < blockInfo.window.skey && !ascQuery)) {
//           pQueryAttr->limit.offset -= 1;
//         }
1696
//
L
Liu Jicong 已提交
1697 1698 1699 1700 1701 1702 1703 1704
//         if (pQueryAttr->limit.offset == 0) {
//           *start = doSkipIntervalProcess(pRuntimeEnv, &win, &blockInfo, pTableQueryInfo);
//           return true;
//         } else {
//           tw = win;
//           int32_t startPos =
//               getNextQualifiedWindow(pQueryAttr, &tw, &blockInfo, pColInfoData->pData, binarySearchForKey, -1);
//           assert(startPos >= 0);
1705
//
L
Liu Jicong 已提交
1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716
//           // set the abort info
//           pQueryAttr->pos = startPos;
//           pTableQueryInfo->lastKey = ((TSKEY *)pColInfoData->pData)[startPos];
//           pWindowResInfo->prevSKey = tw.skey;
//           win = tw;
//         }
//       } else {
//         break;  // offset is not 0, and next time window begins or ends in the next block.
//       }
//     }
//   }
1717
//
L
Liu Jicong 已提交
1718 1719
//   // check for error
//   if (terrno != TSDB_CODE_SUCCESS) {
1720
//     T_LONG_JMP(pRuntimeEnv->env, terrno);
L
Liu Jicong 已提交
1721
//   }
1722
//
L
Liu Jicong 已提交
1723 1724
//   return true;
// }
1725

1726
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num) {
H
Haojun Liao 已提交
1727
  if (p->pDownstream == NULL) {
H
Haojun Liao 已提交
1728
    assert(p->numOfDownstream == 0);
1729 1730
  }

wafwerar's avatar
wafwerar 已提交
1731
  p->pDownstream = taosMemoryCalloc(1, num * POINTER_BYTES);
1732 1733 1734 1735 1736 1737 1738
  if (p->pDownstream == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  memcpy(p->pDownstream, pDownstream, num * POINTER_BYTES);
  p->numOfDownstream = num;
  return TSDB_CODE_SUCCESS;
1739 1740
}

wmmhello's avatar
wmmhello 已提交
1741
static void doDestroyTableList(STableListInfo* pTableqinfoList);
1742

1743 1744 1745 1746
typedef struct SFetchRspHandleWrapper {
  uint32_t exchangeId;
  int32_t  sourceIndex;
} SFetchRspHandleWrapper;
1747

D
dapan1121 已提交
1748
int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
1749
  SFetchRspHandleWrapper* pWrapper = (SFetchRspHandleWrapper*)param;
1750 1751 1752 1753

  SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pWrapper->exchangeId);
  if (pExchangeInfo == NULL) {
    qWarn("failed to acquire exchange operator, since it may have been released");
1754
    taosMemoryFree(pMsg->pData);
1755 1756 1757
    return TSDB_CODE_SUCCESS;
  }

X
Xiaoyu Wang 已提交
1758
  int32_t          index = pWrapper->sourceIndex;
1759
  SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index);
1760

H
Haojun Liao 已提交
1761 1762
  if (code == TSDB_CODE_SUCCESS) {
    pSourceDataInfo->pRsp = pMsg->pData;
1763

H
Haojun Liao 已提交
1764 1765
    SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
    pRsp->numOfRows = htonl(pRsp->numOfRows);
dengyihao's avatar
dengyihao 已提交
1766
    pRsp->compLen = htonl(pRsp->compLen);
1767
    pRsp->numOfCols = htonl(pRsp->numOfCols);
dengyihao's avatar
dengyihao 已提交
1768
    pRsp->useconds = htobe64(pRsp->useconds);
1769
    pRsp->numOfBlocks = htonl(pRsp->numOfBlocks);
1770

1771
    ASSERT(pRsp != NULL);
H
Haojun Liao 已提交
1772 1773
    qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%d", pSourceDataInfo->taskId, index, pRsp->numOfBlocks,
           pRsp->numOfRows);
H
Haojun Liao 已提交
1774
  } else {
1775
    taosMemoryFree(pMsg->pData);
H
Haojun Liao 已提交
1776
    pSourceDataInfo->code = code;
H
Haojun Liao 已提交
1777
    qDebug("%s fetch rsp received, index:%d, code:%s", pSourceDataInfo->taskId, index, tstrerror(code));
H
Haojun Liao 已提交
1778
  }
H
Haojun Liao 已提交
1779

H
Haojun Liao 已提交
1780
  pSourceDataInfo->status = EX_SOURCE_DATA_READY;
1781 1782 1783 1784

  tsem_post(&pExchangeInfo->ready);
  taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId);

wmmhello's avatar
wmmhello 已提交
1785
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1786 1787
}

D
dapan1121 已提交
1788
void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
S
Shengliang Guan 已提交
1789 1790
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
  assert(pMsg->info.ahandle != NULL);
H
Haojun Liao 已提交
1791 1792 1793 1794

  SDataBuf buf = {.len = pMsg->contLen, .pData = NULL};

  if (pMsg->contLen > 0) {
wafwerar's avatar
wafwerar 已提交
1795
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
H
Haojun Liao 已提交
1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806
    if (buf.pData == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
    } else {
      memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
    }
  }

  pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
  rpcFreeCont(pMsg->pCont);
  destroySendMsgInfo(pSendInfo);
1807 1808
}

L
Liu Jicong 已提交
1809
static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) {
1810
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
1811

L
Liu Jicong 已提交
1812 1813
  SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, sourceIndex);
  SSourceDataInfo*       pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
1814

1815 1816
  ASSERT(pDataInfo->status == EX_SOURCE_DATA_NOT_READY);

1817
  SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper));
X
Xiaoyu Wang 已提交
1818
  pWrapper->exchangeId = pExchangeInfo->self;
1819 1820
  pWrapper->sourceIndex = sourceIndex;

D
dapan1121 已提交
1821 1822
  if (pSource->localExec) {
    SDataBuf pBuf = {0};
1823 1824 1825
    int32_t  code =
        (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->schedId, pTaskInfo->id.queryId,
                                    pSource->taskId, 0, pSource->execId, &pBuf.pData, pTaskInfo->localFetch.explainRes);
D
dapan1121 已提交
1826
    loadRemoteDataCallback(pWrapper, &pBuf, code);
D
dapan1121 已提交
1827
    taosMemoryFree(pWrapper);
D
dapan1121 已提交
1828
  } else {
D
dapan1121 已提交
1829 1830 1831
    SResFetchReq* pMsg = taosMemoryCalloc(1, sizeof(SResFetchReq));
    if (NULL == pMsg) {
      pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
H
Haojun Liao 已提交
1832
      taosMemoryFree(pWrapper);
D
dapan1121 已提交
1833 1834 1835
      return pTaskInfo->code;
    }

D
dapan1121 已提交
1836
    qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", execId:%d, %d/%" PRIzu,
1837 1838
           GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId,
           pSource->execId, sourceIndex, totalSources);
D
dapan1121 已提交
1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849

    pMsg->header.vgId = htonl(pSource->addr.nodeId);
    pMsg->sId = htobe64(pSource->schedId);
    pMsg->taskId = htobe64(pSource->taskId);
    pMsg->queryId = htobe64(pTaskInfo->id.queryId);
    pMsg->execId = htonl(pSource->execId);

    // send the fetch remote task result reques
    SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
    if (NULL == pMsgSendInfo) {
      taosMemoryFreeClear(pMsg);
H
Haojun Liao 已提交
1850
      taosMemoryFree(pWrapper);
D
dapan1121 已提交
1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861
      qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
      pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
      return pTaskInfo->code;
    }

    pMsgSendInfo->param = pWrapper;
    pMsgSendInfo->paramFreeFp = taosMemoryFree;
    pMsgSendInfo->msgInfo.pData = pMsg;
    pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq);
    pMsgSendInfo->msgType = pSource->fetchMsgType;
    pMsgSendInfo->fp = loadRemoteDataCallback;
1862

D
dapan1121 已提交
1863
    int64_t transporterId = 0;
1864 1865
    int32_t code =
        asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
D
dapan1121 已提交
1866
  }
1867

1868 1869 1870
  return TSDB_CODE_SUCCESS;
}

1871 1872 1873 1874 1875 1876 1877 1878
void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int32_t numOfRows, int32_t dataLen, int64_t startTs,
                          SOperatorInfo* pOperator) {
  pInfo->totalRows += numOfRows;
  pInfo->totalSize += dataLen;
  pInfo->totalElapsed += (taosGetTimestampUs() - startTs);
  pOperator->resultInfo.totalRows += numOfRows;
}

H
Haojun Liao 已提交
1879
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart) {
H
Haojun Liao 已提交
1880
  if (pColList == NULL) {  // data from other sources
1881
    blockDataCleanup(pRes);
dengyihao's avatar
dengyihao 已提交
1882
    *pNextStart = (char*)blockDecode(pRes, pData);
H
Haojun Liao 已提交
1883
  } else {  // extract data according to pColList
1884 1885 1886 1887 1888
    char* pStart = pData;

    int32_t numOfCols = htonl(*(int32_t*)pStart);
    pStart += sizeof(int32_t);

1889
    // todo refactor:extract method
1890
    SSysTableSchema* pSchema = (SSysTableSchema*)pStart;
dengyihao's avatar
dengyihao 已提交
1891
    for (int32_t i = 0; i < numOfCols; ++i) {
1892 1893 1894 1895 1896 1897 1898
      SSysTableSchema* p = (SSysTableSchema*)pStart;

      p->colId = htons(p->colId);
      p->bytes = htonl(p->bytes);
      pStart += sizeof(SSysTableSchema);
    }

1899
    SSDataBlock* pBlock = createDataBlock();
dengyihao's avatar
dengyihao 已提交
1900
    for (int32_t i = 0; i < numOfCols; ++i) {
1901 1902
      SColumnInfoData idata = createColumnInfoData(pSchema[i].type, pSchema[i].bytes, pSchema[i].colId);
      blockDataAppendColInfo(pBlock, &idata);
1903 1904
    }

1905
    blockDecode(pBlock, pStart);
1906
    blockDataEnsureCapacity(pRes, pBlock->info.rows);
1907

H
Haojun Liao 已提交
1908
    // data from mnode
1909
    pRes->info.rows = pBlock->info.rows;
1910 1911
    relocateColumnData(pRes, pColList, pBlock->pDataBlock, false);
    blockDataDestroy(pBlock);
1912
  }
1913

1914 1915
  // todo move this to time window aggregator, since the primary timestamp may not be known by exchange operator.
  blockDataUpdateTsWindow(pRes, 0);
1916 1917
  return TSDB_CODE_SUCCESS;
}
1918

L
Liu Jicong 已提交
1919 1920
static void* setAllSourcesCompleted(SOperatorInfo* pOperator, int64_t startTs) {
  SExchangeInfo* pExchangeInfo = pOperator->info;
1921
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
H
Haojun Liao 已提交
1922

1923
  int64_t              el = taosGetTimestampUs() - startTs;
H
Haojun Liao 已提交
1924
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
1925

H
Haojun Liao 已提交
1926
  pLoadInfo->totalElapsed += el;
H
Haojun Liao 已提交
1927

1928
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
L
Liu Jicong 已提交
1929 1930 1931
  qDebug("%s all %" PRIzu " sources are exhausted, total rows: %" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms",
         GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize,
         pLoadInfo->totalElapsed / 1000.0);
1932 1933 1934 1935 1936

  doSetOperatorCompleted(pOperator);
  return NULL;
}

1937 1938
static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
                                           SExecTaskInfo* pTaskInfo) {
1939 1940 1941 1942 1943 1944 1945 1946
  int32_t code = 0;
  int64_t startTs = taosGetTimestampUs();
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);

  while (1) {
    int32_t completed = 0;
    for (int32_t i = 0; i < totalSources; ++i) {
      SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
1947
      if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
1948
        completed += 1;
H
Haojun Liao 已提交
1949 1950
        continue;
      }
1951

1952
      if (pDataInfo->status != EX_SOURCE_DATA_READY) {
1953 1954 1955
        continue;
      }

1956 1957 1958 1959 1960
      if (pDataInfo->code != TSDB_CODE_SUCCESS) {
        code = pDataInfo->code;
        goto _error;
      }

L
Liu Jicong 已提交
1961
      SRetrieveTableRsp*     pRsp = pDataInfo->pRsp;
X
Xiaoyu Wang 已提交
1962
      SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, i);
1963

H
Haojun Liao 已提交
1964
      SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
1965
      if (pRsp->numOfRows == 0) {
1966 1967
        qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64
               ", totalRows:%" PRIu64 ", completed:%d try next %d/%" PRIzu,
D
dapan1121 已提交
1968
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows,
1969
               pExchangeInfo->loadInfo.totalRows, completed + 1, i + 1, totalSources);
1970
        pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
1971
        completed += 1;
D
dapan1121 已提交
1972
        taosMemoryFreeClear(pDataInfo->pRsp);
1973 1974
        continue;
      }
H
Haojun Liao 已提交
1975

1976
      SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
dengyihao's avatar
dengyihao 已提交
1977 1978 1979
      int32_t            index = 0;
      char*              pStart = pRetrieveRsp->data;
      while (index++ < pRetrieveRsp->numOfBlocks) {
1980
        SSDataBlock* pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false);
H
Haojun Liao 已提交
1981
        code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart);
1982 1983 1984 1985 1986 1987
        if (code != 0) {
          taosMemoryFreeClear(pDataInfo->pRsp);
          goto _error;
        }

        taosArrayPush(pExchangeInfo->pResultBlockList, &pb);
1988 1989
      }

1990
      updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
1991

1992
      if (pRsp->completed == 1) {
dengyihao's avatar
dengyihao 已提交
1993 1994 1995 1996
        qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
               " execId:%d"
               " index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64
               ", total:%.2f Kb,"
1997
               " completed:%d try next %d/%" PRIzu,
H
Haojun Liao 已提交
1998 1999 2000
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfBlocks,
               pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0,
               completed + 1, i + 1, totalSources);
2001
        completed += 1;
2002
        pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
2003
      } else {
dengyihao's avatar
dengyihao 已提交
2004 2005 2006 2007
        qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
               " execId:%d blocks:%d, numOfRows:%d, totalRows:%" PRIu64 ", total:%.2f Kb",
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRsp->numOfBlocks,
               pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0);
2008 2009
      }

2010 2011
      taosMemoryFreeClear(pDataInfo->pRsp);

2012 2013
      if (pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED) {
        pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
2014 2015
        code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
        if (code != TSDB_CODE_SUCCESS) {
2016
          taosMemoryFreeClear(pDataInfo->pRsp);
2017 2018 2019 2020
          goto _error;
        }
      }

2021
      return;
2022 2023
    }

2024
    if (completed == totalSources) {
2025 2026
      setAllSourcesCompleted(pOperator, startTs);
      return;
2027
    }
H
Haojun Liao 已提交
2028 2029

    sched_yield();
2030 2031 2032 2033 2034 2035
  }

_error:
  pTaskInfo->code = code;
}

L
Liu Jicong 已提交
2036 2037 2038
static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
  SExchangeInfo* pExchangeInfo = pOperator->info;
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2039

L
Liu Jicong 已提交
2040
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);
2041 2042 2043
  int64_t startTs = taosGetTimestampUs();

  // Asynchronously send all fetch requests to all sources.
L
Liu Jicong 已提交
2044
  for (int32_t i = 0; i < totalSources; ++i) {
2045 2046
    int32_t code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2047 2048
      pTaskInfo->code = code;
      return code;
2049 2050 2051 2052
    }
  }

  int64_t endTs = taosGetTimestampUs();
2053
  qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%.2fms", GET_TASKID(pTaskInfo),
X
Xiaoyu Wang 已提交
2054
         totalSources, (endTs - startTs) / 1000.0);
2055

2056
  pOperator->status = OP_RES_TO_RETURN;
H
Haojun Liao 已提交
2057
  pOperator->cost.openCost = taosGetTimestampUs() - startTs;
2058

2059
  tsem_wait(&pExchangeInfo->ready);
H
Haojun Liao 已提交
2060
  return TSDB_CODE_SUCCESS;
2061 2062
}

2063
static int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
2064 2065
  SExchangeInfo* pExchangeInfo = pOperator->info;
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2066

L
Liu Jicong 已提交
2067
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);
2068
  int64_t startTs = taosGetTimestampUs();
2069

L
Liu Jicong 已提交
2070
  while (1) {
2071
    if (pExchangeInfo->current >= totalSources) {
2072 2073
      setAllSourcesCompleted(pOperator, startTs);
      return TSDB_CODE_SUCCESS;
2074
    }
2075

2076 2077 2078
    doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
    tsem_wait(&pExchangeInfo->ready);

dengyihao's avatar
dengyihao 已提交
2079
    SSourceDataInfo*       pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
X
Xiaoyu Wang 已提交
2080
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
2081

H
Haojun Liao 已提交
2082
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
2083 2084
      qError("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d error happens, code:%s", GET_TASKID(pTaskInfo),
             pSource->addr.nodeId, pSource->taskId, pSource->execId, tstrerror(pDataInfo->code));
H
Haojun Liao 已提交
2085
      pOperator->pTaskInfo->code = pDataInfo->code;
2086
      return pOperator->pTaskInfo->code;
H
Haojun Liao 已提交
2087 2088
    }

L
Liu Jicong 已提交
2089
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
H
Haojun Liao 已提交
2090
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
2091
    if (pRsp->numOfRows == 0) {
2092 2093
      qDebug("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d %d of total completed, rowsOfSource:%" PRIu64
             ", totalRows:%" PRIu64 " try next",
D
dapan1121 已提交
2094
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pExchangeInfo->current + 1,
H
Haojun Liao 已提交
2095
             pDataInfo->totalRows, pLoadInfo->totalRows);
H
Haojun Liao 已提交
2096

2097
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
2098
      pExchangeInfo->current += 1;
D
dapan1121 已提交
2099
      taosMemoryFreeClear(pDataInfo->pRsp);
2100 2101
      continue;
    }
H
Haojun Liao 已提交
2102

2103 2104 2105
    SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;

    char*   pStart = pRetrieveRsp->data;
H
Haojun Liao 已提交
2106
    int32_t code = extractDataBlockFromFetchRsp(NULL, pStart, NULL, &pStart);
2107 2108

    if (pRsp->completed == 1) {
D
dapan1121 已提交
2109
      qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, rowsOfSource:%" PRIu64
L
Liu Jicong 已提交
2110
             ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
2111
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows,
2112 2113
             pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1,
             totalSources);
2114

2115
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
2116 2117
      pExchangeInfo->current += 1;
    } else {
D
dapan1121 已提交
2118
      qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, totalRows:%" PRIu64
L
Liu Jicong 已提交
2119
             ", totalBytes:%" PRIu64,
2120
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows,
2121
             pLoadInfo->totalRows, pLoadInfo->totalSize);
2122 2123
    }

2124 2125 2126
    updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
    pDataInfo->totalRows += pRetrieveRsp->numOfRows;

2127
    taosMemoryFreeClear(pDataInfo->pRsp);
2128
    return TSDB_CODE_SUCCESS;
2129
  }
2130 2131
}

L
Liu Jicong 已提交
2132
static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
2133
  if (OPTR_IS_OPENED(pOperator)) {
H
Haojun Liao 已提交
2134 2135 2136
    return TSDB_CODE_SUCCESS;
  }

2137 2138
  int64_t st = taosGetTimestampUs();

L
Liu Jicong 已提交
2139
  SExchangeInfo* pExchangeInfo = pOperator->info;
2140
  if (!pExchangeInfo->seqLoadData) {
H
Haojun Liao 已提交
2141 2142 2143 2144 2145 2146
    int32_t code = prepareConcurrentlyLoad(pOperator);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
  }

2147
  OPTR_SET_OPENED(pOperator);
2148
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
2149 2150 2151
  return TSDB_CODE_SUCCESS;
}

2152 2153 2154 2155 2156
static void freeBlock(void* pParam) {
  SSDataBlock* pBlock = *(SSDataBlock**)pParam;
  blockDataDestroy(pBlock);
}

2157
static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
2158 2159
  SExchangeInfo* pExchangeInfo = pOperator->info;
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2160

2161
  pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
2162
  if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2163 2164
    return NULL;
  }
2165

2166
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
H
Haojun Liao 已提交
2167

2168
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
2169
  if (pOperator->status == OP_EXEC_DONE) {
L
Liu Jicong 已提交
2170 2171 2172
    qDebug("%s all %" PRIzu " source(s) are exhausted, total rows:%" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms",
           GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize,
           pLoadInfo->totalElapsed / 1000.0);
2173 2174 2175
    return NULL;
  }

2176 2177 2178 2179 2180 2181 2182 2183 2184 2185 2186 2187 2188
  size_t size = taosArrayGetSize(pExchangeInfo->pResultBlockList);
  if (size == 0 || pExchangeInfo->rspBlockIndex >= size) {
    pExchangeInfo->rspBlockIndex = 0;
    taosArrayClearEx(pExchangeInfo->pResultBlockList, freeBlock);
    if (pExchangeInfo->seqLoadData) {
      seqLoadRemoteData(pOperator);
    } else {
      concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
    }

    if (taosArrayGetSize(pExchangeInfo->pResultBlockList) == 0) {
      return NULL;
    }
2189
  }
2190 2191 2192

  // we have buffered retrieved datablock, return it directly
  return taosArrayGetP(pExchangeInfo->pResultBlockList, pExchangeInfo->rspBlockIndex++);
H
Haojun Liao 已提交
2193
}
2194

2195 2196 2197 2198 2199 2200 2201 2202
static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) {
  SExchangeInfo* pExchangeInfo = pOperator->info;
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

L
Liu Jicong 已提交
2203
  while (1) {
2204 2205 2206 2207 2208 2209 2210
    SSDataBlock* pBlock = doLoadRemoteDataImpl(pOperator);
    if (pBlock == NULL) {
      return NULL;
    }

    SLimitInfo* pLimitInfo = &pExchangeInfo->limitInfo;
    if (hasLimitOffsetInfo(pLimitInfo)) {
2211
      int32_t status = handleLimitOffset(pOperator, pLimitInfo, pBlock, false);
2212 2213 2214
      if (status == PROJECT_RETRIEVE_CONTINUE) {
        continue;
      } else if (status == PROJECT_RETRIEVE_DONE) {
2215
        size_t rows = pBlock->info.rows;
2216 2217 2218 2219 2220 2221
        pExchangeInfo->limitInfo.numOfOutputRows += rows;

        if (rows == 0) {
          doSetOperatorCompleted(pOperator);
          return NULL;
        } else {
2222
          return pBlock;
2223 2224 2225
        }
      }
    } else {
2226
      return pBlock;
2227 2228 2229 2230
    }
  }
}

2231
static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const char* id) {
2232
  pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
H
Haojun Liao 已提交
2233 2234
  if (pInfo->pSourceDataInfo == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
2235 2236
  }

L
Liu Jicong 已提交
2237
  for (int32_t i = 0; i < numOfSources; ++i) {
2238
    SSourceDataInfo dataInfo = {0};
H
Haojun Liao 已提交
2239
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
2240
    dataInfo.taskId = id;
L
Liu Jicong 已提交
2241
    dataInfo.index = i;
X
Xiaoyu Wang 已提交
2242
    SSourceDataInfo* pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
2243
    if (pDs == NULL) {
H
Haojun Liao 已提交
2244 2245 2246 2247 2248 2249 2250 2251
      taosArrayDestroy(pInfo->pSourceDataInfo);
      return TSDB_CODE_OUT_OF_MEMORY;
    }
  }

  return TSDB_CODE_SUCCESS;
}

2252
static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* pInfo, const char* id) {
2253
  size_t numOfSources = LIST_LENGTH(pExNode->pSrcEndPoints);
H
Haojun Liao 已提交
2254

2255
  if (numOfSources == 0) {
X
Xiaoyu Wang 已提交
2256
    qError("%s invalid number: %d of sources in exchange operator", id, (int32_t)numOfSources);
2257 2258 2259
    return TSDB_CODE_INVALID_PARA;
  }

H
Haojun Liao 已提交
2260
  pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode));
wmmhello's avatar
wmmhello 已提交
2261
  if (pInfo->pSources == NULL) {
2262
    return TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
2263 2264
  }

L
Liu Jicong 已提交
2265
  for (int32_t i = 0; i < numOfSources; ++i) {
D
dapan1121 已提交
2266
    SDownstreamSourceNode* pNode = (SDownstreamSourceNode*)nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i);
H
Haojun Liao 已提交
2267 2268
    taosArrayPush(pInfo->pSources, pNode);
  }
2269

2270
  initLimitInfo(pExNode->node.pLimit, pExNode->node.pSlimit, &pInfo->limitInfo);
2271 2272
  pInfo->self = taosAddRef(exchangeObjRefPool, pInfo);

2273
  return initDataSource(numOfSources, pInfo, id);
2274 2275 2276 2277 2278 2279
}

SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo) {
  SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo));
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
H
Haojun Liao 已提交
2280
    goto _error;
2281
  }
H
Haojun Liao 已提交
2282

2283
  int32_t code = initExchangeOperator(pExNode, pInfo, GET_TASKID(pTaskInfo));
2284 2285 2286
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2287 2288

  tsem_init(&pInfo->ready, 0, 0);
2289 2290
  pInfo->pDummyBlock = createResDataBlock(pExNode->node.pOutputDataBlockDesc);
  pInfo->pResultBlockList = taosArrayInit(1, POINTER_BYTES);
2291

2292
  pInfo->seqLoadData = false;
2293
  pInfo->pTransporter = pTransporter;
2294

2295
  pOperator->name = "ExchangeOperator";
X
Xiaoyu Wang 已提交
2296
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
X
Xiaoyu Wang 已提交
2297
  pOperator->blocking = false;
2298 2299
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
2300
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pDummyBlock->pDataBlock);
X
Xiaoyu Wang 已提交
2301
  pOperator->pTaskInfo = pTaskInfo;
2302

L
Liu Jicong 已提交
2303 2304
  pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL,
                                         destroyExchangeOperatorInfo, NULL, NULL, NULL);
2305
  return pOperator;
H
Haojun Liao 已提交
2306

L
Liu Jicong 已提交
2307
_error:
H
Haojun Liao 已提交
2308
  if (pInfo != NULL) {
2309
    doDestroyExchangeOperatorInfo(pInfo);
H
Haojun Liao 已提交
2310 2311
  }

wafwerar's avatar
wafwerar 已提交
2312
  taosMemoryFreeClear(pOperator);
2313
  pTaskInfo->code = code;
H
Haojun Liao 已提交
2314
  return NULL;
2315 2316
}

dengyihao's avatar
dengyihao 已提交
2317 2318
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
                                const char* pKey);
2319

L
Liu Jicong 已提交
2320
static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char** buf, int32_t rowIndex) {
2321 2322 2323 2324
  size_t size = taosArrayGetSize(groupInfo);
  if (size == 0) {
    return true;
  }
2325

2326 2327
  for (int32_t i = 0; i < size; ++i) {
    int32_t* index = taosArrayGet(groupInfo, i);
2328

2329
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, *index);
L
Liu Jicong 已提交
2330
    bool             isNull = colDataIsNull(pColInfo, rowIndex, pBlock->info.rows, NULL);
2331

2332 2333 2334
    if ((isNull && buf[i] != NULL) || (!isNull && buf[i] == NULL)) {
      return false;
    }
2335

2336 2337 2338 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348
    char* pCell = colDataGetData(pColInfo, rowIndex);
    if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
      if (varDataLen(pCell) != varDataLen(buf[i])) {
        return false;
      } else {
        if (memcmp(varDataVal(pCell), varDataVal(buf[i]), varDataLen(pCell)) != 0) {
          return false;
        }
      }
    } else {
      if (memcmp(pCell, buf[i], pColInfo->info.bytes) != 0) {
        return false;
      }
2349 2350 2351
    }
  }

2352
  return 0;
2353 2354
}

2355
static bool saveCurrentTuple(char** rowColData, SArray* pColumnList, SSDataBlock* pBlock, int32_t rowIndex) {
L
Liu Jicong 已提交
2356
  int32_t size = (int32_t)taosArrayGetSize(pColumnList);
2357

L
Liu Jicong 已提交
2358 2359
  for (int32_t i = 0; i < size; ++i) {
    int32_t*         index = taosArrayGet(pColumnList, i);
2360
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, *index);
H
Haojun Liao 已提交
2361

2362 2363 2364
    char* data = colDataGetData(pColInfo, rowIndex);
    memcpy(rowColData[i], data, colDataGetLength(pColInfo, rowIndex));
  }
2365

2366 2367
  return true;
}
2368

X
Xiaoyu Wang 已提交
2369
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag) {
2370
  // todo add more information about exchange operation
2371
  int32_t type = pOperator->operatorType;
X
Xiaoyu Wang 已提交
2372
  if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE || type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN ||
2373
      type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN ||
2374
      type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN) {
2375 2376 2377
    *order = TSDB_ORDER_ASC;
    *scanFlag = MAIN_SCAN;
    return TSDB_CODE_SUCCESS;
2378
  } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
2379 2380 2381 2382
    STableScanInfo* pTableScanInfo = pOperator->info;
    *order = pTableScanInfo->cond.order;
    *scanFlag = pTableScanInfo->scanFlag;
    return TSDB_CODE_SUCCESS;
2383 2384 2385 2386 2387
  } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN) {
    STableMergeScanInfo* pTableScanInfo = pOperator->info;
    *order = pTableScanInfo->cond.order;
    *scanFlag = pTableScanInfo->scanFlag;
    return TSDB_CODE_SUCCESS;
2388
  } else {
H
Haojun Liao 已提交
2389
    if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) {
2390
      return TSDB_CODE_INVALID_PARA;
H
Haojun Liao 已提交
2391
    } else {
2392
      return getTableScanInfo(pOperator->pDownstream[0], order, scanFlag);
2393 2394 2395
    }
  }
}
2396

2397
// this is a blocking operator
L
Liu Jicong 已提交
2398
static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
2399 2400
  if (OPTR_IS_OPENED(pOperator)) {
    return TSDB_CODE_SUCCESS;
2401 2402
  }

H
Haojun Liao 已提交
2403
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
2404
  SAggOperatorInfo* pAggInfo = pOperator->info;
H
Haojun Liao 已提交
2405

2406 2407
  SExprSupp*     pSup = &pOperator->exprSupp;
  SOperatorInfo* downstream = pOperator->pDownstream[0];
2408

2409 2410
  int64_t st = taosGetTimestampUs();

2411 2412 2413
  int32_t order = TSDB_ORDER_ASC;
  int32_t scanFlag = MAIN_SCAN;

H
Haojun Liao 已提交
2414
  while (1) {
2415
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
2416 2417 2418 2419
    if (pBlock == NULL) {
      break;
    }

2420 2421
    int32_t code = getTableScanInfo(pOperator, &order, &scanFlag);
    if (code != TSDB_CODE_SUCCESS) {
2422
      T_LONG_JMP(pTaskInfo->env, code);
2423
    }
2424

2425
    // there is an scalar expression that needs to be calculated before apply the group aggregation.
2426 2427 2428
    if (pAggInfo->scalarExprSup.pExprInfo != NULL) {
      SExprSupp* pSup1 = &pAggInfo->scalarExprSup;
      code = projectApplyFunctions(pSup1->pExprInfo, pBlock, pBlock, pSup1->pCtx, pSup1->numOfExprs, NULL);
2429
      if (code != TSDB_CODE_SUCCESS) {
2430
        T_LONG_JMP(pTaskInfo->env, code);
2431
      }
2432 2433
    }

2434
    // the pDataBlock are always the same one, no need to call this again
2435
    setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.groupId);
2436
    setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, true);
2437
    code = doAggregateImpl(pOperator, pSup->pCtx);
2438
    if (code != 0) {
2439
      T_LONG_JMP(pTaskInfo->env, code);
2440
    }
2441 2442
  }

2443
  initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0);
H
Haojun Liao 已提交
2444
  OPTR_SET_OPENED(pOperator);
2445

2446
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
2447 2448 2449
  return TSDB_CODE_SUCCESS;
}

2450
static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
2451
  SAggOperatorInfo* pAggInfo = pOperator->info;
H
Haojun Liao 已提交
2452 2453 2454 2455 2456 2457
  SOptrBasicInfo*   pInfo = &pAggInfo->binfo;

  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

L
Liu Jicong 已提交
2458
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2459
  pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
H
Haojun Liao 已提交
2460
  if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
2461
    doSetOperatorCompleted(pOperator);
H
Haojun Liao 已提交
2462 2463 2464
    return NULL;
  }

H
Haojun Liao 已提交
2465
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
S
slzhou 已提交
2466 2467
  while (1) {
    doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf);
H
Haojun Liao 已提交
2468
    doFilter(pAggInfo->pCondition, pInfo->pRes, NULL, NULL);
S
slzhou 已提交
2469

2470
    if (!hasRemainResults(&pAggInfo->groupResInfo)) {
S
slzhou 已提交
2471 2472 2473
      doSetOperatorCompleted(pOperator);
      break;
    }
2474

S
slzhou 已提交
2475 2476 2477 2478
    if (pInfo->pRes->info.rows > 0) {
      break;
    }
  }
2479

2480
  size_t rows = blockDataGetNumOfRows(pInfo->pRes);
2481 2482
  pOperator->resultInfo.totalRows += rows;

2483
  return (rows == 0) ? NULL : pInfo->pRes;
2484 2485
}

wmmhello's avatar
wmmhello 已提交
2486
int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* length) {
2487
  if (result == NULL || length == NULL) {
wmmhello's avatar
wmmhello 已提交
2488 2489 2490
    return TSDB_CODE_TSC_INVALID_INPUT;
  }
  SOptrBasicInfo* pInfo = (SOptrBasicInfo*)(pOperator->info);
2491
  SAggSupporter*  pSup = (SAggSupporter*)POINTER_SHIFT(pOperator->info, sizeof(SOptrBasicInfo));
2492
  int32_t         size = tSimpleHashGetSize(pSup->pResultRowHashTable);
2493 2494 2495
  size_t          keyLen = sizeof(uint64_t) * 2;  // estimate the key length
  int32_t         totalSize =
      sizeof(int32_t) + sizeof(int32_t) + size * (sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize);
wmmhello's avatar
wmmhello 已提交
2496

C
Cary Xu 已提交
2497 2498 2499 2500 2501 2502
  // no result
  if (getTotalBufSize(pSup->pResultBuf) == 0) {
    *result = NULL;
    *length = 0;
    return TSDB_CODE_SUCCESS;
  }
2503

wmmhello's avatar
wmmhello 已提交
2504
  *result = (char*)taosMemoryCalloc(1, totalSize);
L
Liu Jicong 已提交
2505
  if (*result == NULL) {
wmmhello's avatar
wmmhello 已提交
2506
    return TSDB_CODE_OUT_OF_MEMORY;
wmmhello's avatar
wmmhello 已提交
2507
  }
wmmhello's avatar
wmmhello 已提交
2508

wmmhello's avatar
wmmhello 已提交
2509
  int32_t offset = sizeof(int32_t);
wmmhello's avatar
wmmhello 已提交
2510 2511
  *(int32_t*)(*result + offset) = size;
  offset += sizeof(int32_t);
2512 2513

  // prepare memory
2514
  SResultRowPosition* pos = &pInfo->resultRowInfo.cur;
dengyihao's avatar
dengyihao 已提交
2515 2516
  void*               pPage = getBufPage(pSup->pResultBuf, pos->pageId);
  SResultRow*         pRow = (SResultRow*)((char*)pPage + pos->offset);
2517 2518
  setBufPageDirty(pPage, true);
  releaseBufPage(pSup->pResultBuf, pPage);
L
Liu Jicong 已提交
2519

2520 2521 2522 2523
  int32_t iter = 0;
  void*   pIter = NULL;
  while ((pIter = tSimpleHashIterate(pSup->pResultRowHashTable, pIter, &iter))) {
    void*               key = tSimpleHashGetKey(pIter, &keyLen);
2524
    SResultRowPosition* p1 = (SResultRowPosition*)pIter;
2525

dengyihao's avatar
dengyihao 已提交
2526
    pPage = (SFilePage*)getBufPage(pSup->pResultBuf, p1->pageId);
2527
    pRow = (SResultRow*)((char*)pPage + p1->offset);
2528 2529
    setBufPageDirty(pPage, true);
    releaseBufPage(pSup->pResultBuf, pPage);
wmmhello's avatar
wmmhello 已提交
2530 2531 2532

    // recalculate the result size
    int32_t realTotalSize = offset + sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize;
L
Liu Jicong 已提交
2533
    if (realTotalSize > totalSize) {
wmmhello's avatar
wmmhello 已提交
2534
      char* tmp = (char*)taosMemoryRealloc(*result, realTotalSize);
L
Liu Jicong 已提交
2535
      if (tmp == NULL) {
wafwerar's avatar
wafwerar 已提交
2536
        taosMemoryFree(*result);
wmmhello's avatar
wmmhello 已提交
2537
        *result = NULL;
wmmhello's avatar
wmmhello 已提交
2538
        return TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
2539
      } else {
wmmhello's avatar
wmmhello 已提交
2540 2541 2542 2543 2544 2545 2546 2547 2548 2549 2550 2551
        *result = tmp;
      }
    }
    // save key
    *(int32_t*)(*result + offset) = keyLen;
    offset += sizeof(int32_t);
    memcpy(*result + offset, key, keyLen);
    offset += keyLen;

    // save value
    *(int32_t*)(*result + offset) = pSup->resultRowSize;
    offset += sizeof(int32_t);
2552
    memcpy(*result + offset, pRow, pSup->resultRowSize);
wmmhello's avatar
wmmhello 已提交
2553 2554 2555
    offset += pSup->resultRowSize;
  }

wmmhello's avatar
wmmhello 已提交
2556 2557 2558 2559
  *(int32_t*)(*result) = offset;
  *length = offset;

  return TDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2560 2561
}

2562
int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) {
2563
  if (result == NULL) {
wmmhello's avatar
wmmhello 已提交
2564
    return TSDB_CODE_TSC_INVALID_INPUT;
wmmhello's avatar
wmmhello 已提交
2565
  }
wmmhello's avatar
wmmhello 已提交
2566
  SOptrBasicInfo* pInfo = (SOptrBasicInfo*)(pOperator->info);
2567
  SAggSupporter*  pSup = (SAggSupporter*)POINTER_SHIFT(pOperator->info, sizeof(SOptrBasicInfo));
wmmhello's avatar
wmmhello 已提交
2568 2569

  //  int32_t size = taosHashGetSize(pSup->pResultRowHashTable);
2570
  int32_t length = *(int32_t*)(result);
wmmhello's avatar
wmmhello 已提交
2571
  int32_t offset = sizeof(int32_t);
2572 2573 2574 2575

  int32_t count = *(int32_t*)(result + offset);
  offset += sizeof(int32_t);

L
Liu Jicong 已提交
2576
  while (count-- > 0 && length > offset) {
wmmhello's avatar
wmmhello 已提交
2577 2578 2579
    int32_t keyLen = *(int32_t*)(result + offset);
    offset += sizeof(int32_t);

L
Liu Jicong 已提交
2580
    uint64_t    tableGroupId = *(uint64_t*)(result + offset);
2581
    SResultRow* resultRow = getNewResultRow(pSup->pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
L
Liu Jicong 已提交
2582
    if (!resultRow) {
wmmhello's avatar
wmmhello 已提交
2583
      return TSDB_CODE_TSC_INVALID_INPUT;
wmmhello's avatar
wmmhello 已提交
2584
    }
2585

wmmhello's avatar
wmmhello 已提交
2586
    // add a new result set for a new group
2587
    SResultRowPosition pos = {.pageId = resultRow->pageId, .offset = resultRow->offset};
2588
    tSimpleHashPut(pSup->pResultRowHashTable, result + offset, keyLen, &pos, sizeof(SResultRowPosition));
wmmhello's avatar
wmmhello 已提交
2589 2590 2591

    offset += keyLen;
    int32_t valueLen = *(int32_t*)(result + offset);
L
Liu Jicong 已提交
2592
    if (valueLen != pSup->resultRowSize) {
wmmhello's avatar
wmmhello 已提交
2593
      return TSDB_CODE_TSC_INVALID_INPUT;
wmmhello's avatar
wmmhello 已提交
2594 2595 2596 2597 2598 2599 2600 2601 2602
    }
    offset += sizeof(int32_t);
    int32_t pageId = resultRow->pageId;
    int32_t pOffset = resultRow->offset;
    memcpy(resultRow, result + offset, valueLen);
    resultRow->pageId = pageId;
    resultRow->offset = pOffset;
    offset += valueLen;

dengyihao's avatar
dengyihao 已提交
2603
    pInfo->resultRowInfo.cur = (SResultRowPosition){.pageId = resultRow->pageId, .offset = resultRow->offset};
H
Haojun Liao 已提交
2604
    // releaseBufPage(pSup->pResultBuf, getBufPage(pSup->pResultBuf, pageId));
wmmhello's avatar
wmmhello 已提交
2605 2606
  }

L
Liu Jicong 已提交
2607
  if (offset != length) {
wmmhello's avatar
wmmhello 已提交
2608
    return TSDB_CODE_TSC_INVALID_INPUT;
wmmhello's avatar
wmmhello 已提交
2609
  }
wmmhello's avatar
wmmhello 已提交
2610
  return TDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2611 2612
}

2613 2614 2615 2616 2617
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf) {
  if (pLimitInfo->remainGroupOffset > 0) {
    if (pLimitInfo->currentGroupId == 0) {  // it is the first group
      pLimitInfo->currentGroupId = pBlock->info.groupId;
      blockDataCleanup(pBlock);
2618
      return PROJECT_RETRIEVE_CONTINUE;
2619 2620 2621
    } else if (pLimitInfo->currentGroupId != pBlock->info.groupId) {
      // now it is the data from a new group
      pLimitInfo->remainGroupOffset -= 1;
2622 2623

      // ignore data block in current group
2624 2625
      if (pLimitInfo->remainGroupOffset > 0) {
        blockDataCleanup(pBlock);
2626 2627 2628 2629 2630
        return PROJECT_RETRIEVE_CONTINUE;
      }
    }

    // set current group id of the project operator
2631
    pLimitInfo->currentGroupId = pBlock->info.groupId;
2632 2633
  }

2634
  // here check for a new group data, we need to handle the data of the previous group.
2635 2636 2637
  if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.groupId) {
    pLimitInfo->numOfOutputGroups += 1;
    if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
2638
      pOperator->status = OP_EXEC_DONE;
2639
      blockDataCleanup(pBlock);
2640 2641 2642 2643 2644

      return PROJECT_RETRIEVE_DONE;
    }

    // reset the value for a new group data
2645 2646
    pLimitInfo->numOfOutputRows = 0;
    pLimitInfo->remainOffset = pLimitInfo->limit.offset;
2647 2648 2649 2650 2651

    // existing rows that belongs to previous group.
    if (pBlock->info.rows > 0) {
      return PROJECT_RETRIEVE_DONE;
    }
2652 2653 2654 2655 2656
  }

  // here we reach the start position, according to the limit/offset requirements.

  // set current group id
2657
  pLimitInfo->currentGroupId = pBlock->info.groupId;
2658

2659 2660 2661
  if (pLimitInfo->remainOffset >= pBlock->info.rows) {
    pLimitInfo->remainOffset -= pBlock->info.rows;
    blockDataCleanup(pBlock);
2662
    return PROJECT_RETRIEVE_CONTINUE;
2663 2664 2665
  } else if (pLimitInfo->remainOffset < pBlock->info.rows && pLimitInfo->remainOffset > 0) {
    blockDataTrimFirstNRows(pBlock, pLimitInfo->remainOffset);
    pLimitInfo->remainOffset = 0;
2666 2667
  }

2668
  // check for the limitation in each group
2669 2670 2671 2672
  if (pLimitInfo->limit.limit >= 0 && pLimitInfo->numOfOutputRows + pBlock->info.rows >= pLimitInfo->limit.limit) {
    int32_t keepRows = (int32_t)(pLimitInfo->limit.limit - pLimitInfo->numOfOutputRows);
    blockDataKeepFirstNRows(pBlock, keepRows);
    if (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups) {
2673 2674 2675
      pOperator->status = OP_EXEC_DONE;
    }

2676
    return PROJECT_RETRIEVE_DONE;
2677
  }
2678

2679
  // todo optimize performance
2680 2681
  // If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
  // they may not belong to the same group the limit/offset value is not valid in this case.
2682 2683
  if ((!holdDataInBuf) || (pBlock->info.rows >= pOperator->resultInfo.threshold) || pLimitInfo->slimit.offset != -1 ||
      pLimitInfo->slimit.limit != -1) {
2684
    return PROJECT_RETRIEVE_DONE;
L
Liu Jicong 已提交
2685
  } else {  // not full enough, continue to accumulate the output data in the buffer.
2686 2687 2688 2689
    return PROJECT_RETRIEVE_CONTINUE;
  }
}

2690
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
L
Liu Jicong 已提交
2691 2692
static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo,
                                               SResultInfo* pResultInfo, SExecTaskInfo* pTaskInfo) {
2693
  pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
2694 2695 2696 2697 2698
  SSDataBlock* pResBlock = pInfo->pFinalRes;

  int32_t order = TSDB_ORDER_ASC;
  int32_t scanFlag = MAIN_SCAN;
  getTableScanInfo(pOperator, &order, &scanFlag);
H
Haojun Liao 已提交
2699

L
Liu Jicong 已提交
2700 2701
  int64_t ekey =
      Q_STATUS_EQUAL(pTaskInfo->status, TASK_COMPLETED) ? pInfo->win.ekey : pInfo->existNewGroupBlock->info.window.ekey;
2702 2703
  taosResetFillInfo(pInfo->pFillInfo, getFillInfoStart(pInfo->pFillInfo));

2704
  blockDataCleanup(pInfo->pRes);
2705 2706 2707 2708
  doApplyScalarCalculation(pOperator, pInfo->existNewGroupBlock, order, scanFlag);

  taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, ekey);
  taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->pRes);
2709

2710 2711
  int32_t numOfResultRows = pResultInfo->capacity - pResBlock->info.rows;
  taosFillResultDataBlock(pInfo->pFillInfo, pResBlock, numOfResultRows);
H
Haojun Liao 已提交
2712

2713
  pInfo->curGroupId = pInfo->existNewGroupBlock->info.groupId;
2714 2715 2716
  pInfo->existNewGroupBlock = NULL;
}

L
Liu Jicong 已提交
2717 2718
static void doHandleRemainBlockFromNewGroup(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo,
                                            SResultInfo* pResultInfo, SExecTaskInfo* pTaskInfo) {
2719
  if (taosFillHasMoreResults(pInfo->pFillInfo)) {
H
Haojun Liao 已提交
2720 2721
    int32_t numOfResultRows = pResultInfo->capacity - pInfo->pFinalRes->info.rows;
    taosFillResultDataBlock(pInfo->pFillInfo, pInfo->pFinalRes, numOfResultRows);
2722 2723
    pInfo->pRes->info.groupId = pInfo->curGroupId;
    return;
2724 2725 2726 2727
  }

  // handle the cached new group data block
  if (pInfo->existNewGroupBlock) {
2728 2729 2730 2731 2732 2733
    doHandleRemainBlockForNewGroupImpl(pOperator, pInfo, pResultInfo, pTaskInfo);
  }
}

static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag) {
  SFillOperatorInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
2734 2735
  SExprSupp*         pSup = &pOperator->exprSupp;
  SSDataBlock*       pResBlock = pInfo->pFinalRes;
2736 2737 2738 2739 2740 2741 2742 2743 2744

  setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, false);
  projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs, NULL);
  pInfo->pRes->info.groupId = pBlock->info.groupId;

  SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pInfo->primaryTsCol);
  SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, pInfo->primarySrcSlotId);
  colDataAssign(pDst, pSrc, pInfo->pRes->info.rows, &pResBlock->info);

L
Liu Jicong 已提交
2745
  for (int32_t i = 0; i < pInfo->numOfNotFillExpr; ++i) {
2746 2747 2748 2749
    SFillColInfo* pCol = &pInfo->pFillInfo->pFillCol[i + pInfo->numOfExpr];
    ASSERT(pCol->notFillCol);

    SExprInfo* pExpr = pCol->pExpr;
L
Liu Jicong 已提交
2750 2751
    int32_t    srcSlotId = pExpr->base.pParam[0].pCol->slotId;
    int32_t    dstSlotId = pExpr->base.resSchema.slotId;
2752 2753 2754 2755

    SColumnInfoData* pDst1 = taosArrayGet(pInfo->pRes->pDataBlock, dstSlotId);
    SColumnInfoData* pSrc1 = taosArrayGet(pBlock->pDataBlock, srcSlotId);
    colDataAssign(pDst1, pSrc1, pInfo->pRes->info.rows, &pResBlock->info);
2756 2757 2758
  }
}

S
slzhou 已提交
2759
static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
2760 2761
  SFillOperatorInfo* pInfo = pOperator->info;
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
2762

H
Haojun Liao 已提交
2763
  SResultInfo* pResultInfo = &pOperator->resultInfo;
H
Haojun Liao 已提交
2764
  SSDataBlock* pResBlock = pInfo->pFinalRes;
2765 2766

  blockDataCleanup(pResBlock);
2767

H
Haojun Liao 已提交
2768 2769
  int32_t order = TSDB_ORDER_ASC;
  int32_t scanFlag = MAIN_SCAN;
2770
  getTableScanInfo(pOperator, &order, &scanFlag);
2771

2772
  doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, pTaskInfo);
2773
  if (pResBlock->info.rows > 0) {
2774
    pResBlock->info.groupId = pInfo->curGroupId;
2775
    return pResBlock;
H
Haojun Liao 已提交
2776
  }
2777

H
Haojun Liao 已提交
2778
  SOperatorInfo* pDownstream = pOperator->pDownstream[0];
L
Liu Jicong 已提交
2779
  while (1) {
2780
    SSDataBlock* pBlock = pDownstream->fpSet.getNextFn(pDownstream);
2781 2782
    if (pBlock == NULL) {
      if (pInfo->totalInputRows == 0) {
2783
        doSetOperatorCompleted(pOperator);
2784 2785
        return NULL;
      }
2786

2787
      taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
2788
    } else {
2789
      blockDataUpdateTsWindow(pBlock, pInfo->primarySrcSlotId);
2790 2791

      blockDataCleanup(pInfo->pRes);
2792 2793
      blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);
      blockDataEnsureCapacity(pInfo->pFinalRes, pBlock->info.rows);
2794
      doApplyScalarCalculation(pOperator, pBlock, order, scanFlag);
2795

H
Haojun Liao 已提交
2796 2797 2798
      if (pInfo->curGroupId == 0 || pInfo->curGroupId == pInfo->pRes->info.groupId) {
        pInfo->curGroupId = pInfo->pRes->info.groupId;  // the first data block
        pInfo->totalInputRows += pInfo->pRes->info.rows;
2799

H
Haojun Liao 已提交
2800 2801 2802 2803 2804
        if (order == pInfo->pFillInfo->order) {
          taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, pBlock->info.window.ekey);
        } else {
          taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, pBlock->info.window.skey);
        }
H
Haojun Liao 已提交
2805
        taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->pRes);
L
Liu Jicong 已提交
2806
      } else if (pInfo->curGroupId != pBlock->info.groupId) {  // the new group data block
2807 2808 2809 2810 2811
        pInfo->existNewGroupBlock = pBlock;

        // Fill the previous group data block, before handle the data block of new group.
        // Close the fill operation for previous group data block
        taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
2812 2813 2814
      }
    }

2815 2816
    int32_t numOfResultRows = pOperator->resultInfo.capacity - pResBlock->info.rows;
    taosFillResultDataBlock(pInfo->pFillInfo, pResBlock, numOfResultRows);
2817 2818

    // current group has no more result to return
2819
    if (pResBlock->info.rows > 0) {
2820 2821
      // 1. The result in current group not reach the threshold of output result, continue
      // 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately
2822
      if (pResBlock->info.rows > pResultInfo->threshold || pBlock == NULL || pInfo->existNewGroupBlock != NULL) {
2823
        pResBlock->info.groupId = pInfo->curGroupId;
2824
        return pResBlock;
2825 2826
      }

2827
      doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, pTaskInfo);
2828
      if (pResBlock->info.rows >= pOperator->resultInfo.threshold || pBlock == NULL) {
2829
        pResBlock->info.groupId = pInfo->curGroupId;
2830
        return pResBlock;
2831 2832 2833
      }
    } else if (pInfo->existNewGroupBlock) {  // try next group
      assert(pBlock != NULL);
2834 2835 2836 2837

      blockDataCleanup(pResBlock);

      doHandleRemainBlockForNewGroupImpl(pOperator, pInfo, pResultInfo, pTaskInfo);
2838
      if (pResBlock->info.rows > pResultInfo->threshold) {
2839
        pResBlock->info.groupId = pInfo->curGroupId;
2840
        return pResBlock;
2841 2842 2843 2844 2845 2846 2847
      }
    } else {
      return NULL;
    }
  }
}

S
slzhou 已提交
2848 2849 2850 2851 2852 2853 2854 2855
static SSDataBlock* doFill(SOperatorInfo* pOperator) {
  SFillOperatorInfo* pInfo = pOperator->info;
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;

  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

S
slzhou 已提交
2856
  SSDataBlock* fillResult = NULL;
S
slzhou 已提交
2857
  while (true) {
S
slzhou 已提交
2858
    fillResult = doFillImpl(pOperator);
S
slzhou 已提交
2859 2860 2861 2862 2863
    if (fillResult == NULL) {
      doSetOperatorCompleted(pOperator);
      break;
    }

H
Haojun Liao 已提交
2864
    doFilter(pInfo->pCondition, fillResult, pInfo->pColMatchColInfo, NULL);
S
slzhou 已提交
2865 2866 2867 2868 2869
    if (fillResult->info.rows > 0) {
      break;
    }
  }

S
slzhou 已提交
2870
  if (fillResult != NULL) {
2871
    pOperator->resultInfo.totalRows += fillResult->info.rows;
S
slzhou 已提交
2872
  }
S
slzhou 已提交
2873

S
slzhou 已提交
2874
  return fillResult;
S
slzhou 已提交
2875 2876
}

2877
void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
C
Cary Xu 已提交
2878 2879 2880 2881 2882
  for (int32_t i = 0; i < numOfExprs; ++i) {
    SExprInfo* pExprInfo = &pExpr[i];
    for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
      if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_COLUMN) {
        taosMemoryFreeClear(pExprInfo->base.pParam[j].pCol);
H
Haojun Liao 已提交
2883
      }
2884
    }
C
Cary Xu 已提交
2885 2886 2887

    taosMemoryFree(pExprInfo->base.pParam);
    taosMemoryFree(pExprInfo->pExpr);
H
Haojun Liao 已提交
2888 2889 2890
  }
}

2891 2892 2893 2894 2895
static void destroyOperatorInfo(SOperatorInfo* pOperator) {
  if (pOperator == NULL) {
    return;
  }

2896
  if (pOperator->fpSet.closeFn != NULL) {
2897
    pOperator->fpSet.closeFn(pOperator->info);
2898 2899
  }

H
Haojun Liao 已提交
2900
  if (pOperator->pDownstream != NULL) {
L
Liu Jicong 已提交
2901
    for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
H
Haojun Liao 已提交
2902
      destroyOperatorInfo(pOperator->pDownstream[i]);
2903 2904
    }

wafwerar's avatar
wafwerar 已提交
2905
    taosMemoryFreeClear(pOperator->pDownstream);
H
Haojun Liao 已提交
2906
    pOperator->numOfDownstream = 0;
2907 2908
  }

2909
  cleanupExprSupp(&pOperator->exprSupp);
wafwerar's avatar
wafwerar 已提交
2910
  taosMemoryFreeClear(pOperator);
2911 2912
}

2913 2914 2915 2916 2917 2918 2919 2920 2921 2922 2923 2924 2925 2926 2927
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz) {
  *defaultPgsz = 4096;
  while (*defaultPgsz < rowSize * 4) {
    *defaultPgsz <<= 1u;
  }

  // at least four pages need to be in buffer
  *defaultBufsz = 4096 * 256;
  if ((*defaultBufsz) <= (*defaultPgsz)) {
    (*defaultBufsz) = (*defaultPgsz) * 4;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
2928 2929
int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
                         const char* pKey) {
L
Liu Jicong 已提交
2930
  int32_t    code = 0;
2931 2932
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);

2933
  pAggSup->currentPageId = -1;
dengyihao's avatar
dengyihao 已提交
2934 2935
  pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
  pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
2936
  pAggSup->pResultRowHashTable = tSimpleHashInit(10, hashFn);
2937

H
Haojun Liao 已提交
2938
  if (pAggSup->keyBuf == NULL || pAggSup->pResultRowHashTable == NULL) {
2939 2940 2941
    return TSDB_CODE_OUT_OF_MEMORY;
  }

dengyihao's avatar
dengyihao 已提交
2942
  uint32_t defaultPgsz = 0;
2943 2944
  uint32_t defaultBufsz = 0;
  getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz);
H
Haojun Liao 已提交
2945

wafwerar's avatar
wafwerar 已提交
2946
  if (!osTempSpaceAvailable()) {
H
Haojun Liao 已提交
2947 2948 2949
    code = TSDB_CODE_NO_AVAIL_DISK;
    qError("Init stream agg supporter failed since %s, %s", terrstr(code), pKey);
    return code;
wafwerar's avatar
wafwerar 已提交
2950
  }
2951

H
Haojun Liao 已提交
2952
  code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, tsTempDir);
H
Haojun Liao 已提交
2953
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2954
    qError("Create agg result buf failed since %s, %s", tstrerror(code), pKey);
H
Haojun Liao 已提交
2955 2956 2957
    return code;
  }

H
Haojun Liao 已提交
2958
  return code;
2959 2960
}

2961
void cleanupAggSup(SAggSupporter* pAggSup) {
wafwerar's avatar
wafwerar 已提交
2962
  taosMemoryFreeClear(pAggSup->keyBuf);
2963
  tSimpleHashCleanup(pAggSup->pResultRowHashTable);
H
Haojun Liao 已提交
2964
  destroyDiskbasedBuf(pAggSup->pResultBuf);
2965 2966
}

L
Liu Jicong 已提交
2967 2968
int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
                    const char* pkey) {
2969 2970 2971 2972 2973
  int32_t code = initExprSupp(pSup, pExprInfo, numOfCols);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

2974 2975 2976 2977 2978
  code = doInitAggInfoSup(pAggSup, pSup->pCtx, numOfCols, keyBufSize, pkey);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

L
Liu Jicong 已提交
2979
  for (int32_t i = 0; i < numOfCols; ++i) {
2980
    pSup->pCtx[i].saveHandle.pBuf = pAggSup->pResultBuf;
2981 2982
  }

2983
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
2984 2985
}

L
Liu Jicong 已提交
2986
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows) {
wmmhello's avatar
wmmhello 已提交
2987
  ASSERT(numOfRows != 0);
2988 2989
  pResultInfo->capacity = numOfRows;
  pResultInfo->threshold = numOfRows * 0.75;
2990

2991 2992
  if (pResultInfo->threshold == 0) {
    pResultInfo->threshold = numOfRows;
2993 2994 2995
  }
}

2996 2997 2998 2999 3000
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock) {
  pInfo->pRes = pBlock;
  initResultRowInfo(&pInfo->resultRowInfo);
}

5
54liuyao 已提交
3001
void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
3002 3003 3004 3005 3006 3007 3008 3009 3010 3011
  if (pCtx == NULL) {
    return NULL;
  }

  for (int32_t i = 0; i < numOfOutput; ++i) {
    for (int32_t j = 0; j < pCtx[i].numOfParams; ++j) {
      taosVariantDestroy(&pCtx[i].param[j].param);
    }

    taosMemoryFreeClear(pCtx[i].subsidiaries.pCtx);
3012
    taosMemoryFreeClear(pCtx[i].subsidiaries.buf);
3013 3014 3015 3016 3017 3018 3019 3020
    taosMemoryFree(pCtx[i].input.pData);
    taosMemoryFree(pCtx[i].input.pColumnDataAgg);
  }

  taosMemoryFreeClear(pCtx);
  return NULL;
}

3021
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr) {
3022 3023 3024 3025
  pSup->pExprInfo = pExprInfo;
  pSup->numOfExprs = numOfExpr;
  if (pSup->pExprInfo != NULL) {
    pSup->pCtx = createSqlFunctionCtx(pExprInfo, numOfExpr, &pSup->rowEntryInfoOffset);
3026 3027 3028
    if (pSup->pCtx == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
3029
  }
3030 3031

  return TSDB_CODE_SUCCESS;
3032 3033
}

3034 3035 3036 3037
void cleanupExprSupp(SExprSupp* pSupp) {
  destroySqlFunctionCtx(pSupp->pCtx, pSupp->numOfExprs);
  if (pSupp->pExprInfo != NULL) {
    destroyExprInfo(pSupp->pExprInfo, pSupp->numOfExprs);
C
Cary Xu 已提交
3038
    taosMemoryFreeClear(pSupp->pExprInfo);
3039
  }
H
Haojun Liao 已提交
3040 3041 3042 3043 3044 3045

  if (pSupp->pFilterInfo != NULL) {
    filterFreeInfo(pSupp->pFilterInfo);
    pSupp->pFilterInfo = NULL;
  }

3046 3047 3048
  taosMemoryFree(pSupp->rowEntryInfoOffset);
}

L
Liu Jicong 已提交
3049
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
S
slzhou 已提交
3050
                                           SSDataBlock* pResultBlock, SNode* pCondition, SExprInfo* pScalarExprInfo,
3051
                                           int32_t numOfScalarExpr, bool mergeResult, SExecTaskInfo* pTaskInfo) {
wafwerar's avatar
wafwerar 已提交
3052
  SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo));
L
Liu Jicong 已提交
3053
  SOperatorInfo*    pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
3054 3055 3056
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
H
Haojun Liao 已提交
3057

dengyihao's avatar
dengyihao 已提交
3058
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
3059

3060
  initResultSizeInfo(&pOperator->resultInfo, 4096);
3061
  int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
L
Liu Jicong 已提交
3062
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3063 3064
    goto _error;
  }
H
Haojun Liao 已提交
3065

3066
  initBasicInfo(&pInfo->binfo, pResultBlock);
3067 3068 3069 3070
  code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
3071

3072
  pInfo->binfo.mergeResultBlock = mergeResult;
3073
  pInfo->groupId = UINT64_MAX;
S
slzhou 已提交
3074
  pInfo->pCondition = pCondition;
dengyihao's avatar
dengyihao 已提交
3075
  pOperator->name = "TableAggregate";
3076
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
3077
  pOperator->blocking = true;
dengyihao's avatar
dengyihao 已提交
3078 3079 3080
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
3081

3082 3083
  pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, NULL, destroyAggOperatorInfo,
                                         aggEncodeResultRow, aggDecodeResultRow, NULL);
H
Haojun Liao 已提交
3084

3085 3086 3087 3088 3089 3090
  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
    STableScanInfo* pTableScanInfo = downstream->info;
    pTableScanInfo->pdInfo.pExprSup = &pOperator->exprSupp;
    pTableScanInfo->pdInfo.pAggSup = &pInfo->aggSup;
  }

H
Haojun Liao 已提交
3091 3092 3093 3094
  code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
3095 3096

  return pOperator;
H
Haojun Liao 已提交
3097

L
Liu Jicong 已提交
3098
_error:
H
Haojun Liao 已提交
3099 3100 3101 3102
  if (pInfo != NULL) {
    destroyAggOperatorInfo(pInfo);
  }

wafwerar's avatar
wafwerar 已提交
3103
  taosMemoryFreeClear(pOperator);
H
Haojun Liao 已提交
3104 3105
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
3106 3107
}

3108
void cleanupBasicInfo(SOptrBasicInfo* pInfo) {
3109
  assert(pInfo != NULL);
H
Haojun Liao 已提交
3110
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
3111 3112
}

H
Haojun Liao 已提交
3113 3114 3115 3116 3117 3118 3119
static void freeItem(void* pItem) {
  void** p = pItem;
  if (*p != NULL) {
    taosMemoryFreeClear(*p);
  }
}

3120
void destroyAggOperatorInfo(void* param) {
L
Liu Jicong 已提交
3121
  SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param;
L
Liu Jicong 已提交
3122 3123
  cleanupBasicInfo(&pInfo->binfo);

H
Haojun Liao 已提交
3124
  cleanupAggSup(&pInfo->aggSup);
S
shenglian zhou 已提交
3125
  cleanupExprSupp(&pInfo->scalarExprSup);
H
Haojun Liao 已提交
3126
  cleanupGroupResInfo(&pInfo->groupResInfo);
D
dapan1121 已提交
3127
  taosMemoryFreeClear(param);
3128
}
3129

3130
void destroyFillOperatorInfo(void* param) {
L
Liu Jicong 已提交
3131
  SFillOperatorInfo* pInfo = (SFillOperatorInfo*)param;
3132
  pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo);
H
Haojun Liao 已提交
3133
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
H
Haojun Liao 已提交
3134 3135 3136 3137 3138 3139 3140
  pInfo->pFinalRes = blockDataDestroy(pInfo->pFinalRes);

  if (pInfo->pNotFillExprInfo != NULL) {
    destroyExprInfo(pInfo->pNotFillExprInfo, pInfo->numOfNotFillExpr);
    taosMemoryFree(pInfo->pNotFillExprInfo);
  }

wafwerar's avatar
wafwerar 已提交
3141
  taosMemoryFreeClear(pInfo->p);
3142
  taosArrayDestroy(pInfo->pColMatchColInfo);
D
dapan1121 已提交
3143
  taosMemoryFreeClear(param);
3144 3145
}

3146
void destroyExchangeOperatorInfo(void* param) {
L
Liu Jicong 已提交
3147
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
3148 3149 3150
  taosRemoveRef(exchangeObjRefPool, pExInfo->self);
}

L
Liu Jicong 已提交
3151
void freeSourceDataInfo(void* p) {
3152 3153 3154 3155
  SSourceDataInfo* pInfo = (SSourceDataInfo*)p;
  taosMemoryFreeClear(pInfo->pRsp);
}

3156
void doDestroyExchangeOperatorInfo(void* param) {
X
Xiaoyu Wang 已提交
3157
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
3158

H
Haojun Liao 已提交
3159
  taosArrayDestroy(pExInfo->pSources);
3160
  taosArrayDestroyEx(pExInfo->pSourceDataInfo, freeSourceDataInfo);
3161 3162 3163 3164

  if (pExInfo->pResultBlockList != NULL) {
    taosArrayDestroyEx(pExInfo->pResultBlockList, freeBlock);
    pExInfo->pResultBlockList = NULL;
H
Haojun Liao 已提交
3165 3166
  }

3167
  blockDataDestroy(pExInfo->pDummyBlock);
L
Liu Jicong 已提交
3168

3169
  tsem_destroy(&pExInfo->ready);
D
dapan1121 已提交
3170
  taosMemoryFreeClear(param);
H
Haojun Liao 已提交
3171 3172
}

H
Haojun Liao 已提交
3173 3174 3175 3176
static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, SExprInfo* pNotFillExpr,
                            int32_t numOfNotFillCols, SNodeListNode* pValNode, STimeWindow win, int32_t capacity,
                            const char* id, SInterval* pInterval, int32_t fillType, int32_t order) {
  SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pNotFillExpr, numOfNotFillCols, pValNode);
H
Haojun Liao 已提交
3177

H
Haojun Liao 已提交
3178 3179 3180
  int64_t     startKey = (order == TSDB_ORDER_ASC) ? win.skey : win.ekey;
  STimeWindow w = getAlignQueryTimeWindow(pInterval, pInterval->precision, startKey);
  w = getFirstQualifiedTimeWindow(startKey, &w, pInterval, order);
H
Haojun Liao 已提交
3181

L
Liu Jicong 已提交
3182 3183
  pInfo->pFillInfo = taosCreateFillInfo(w.skey, numOfCols, numOfNotFillCols, capacity, pInterval, fillType, pColInfo,
                                        pInfo->primaryTsCol, order, id);
H
Haojun Liao 已提交
3184

H
Haojun Liao 已提交
3185 3186 3187 3188 3189 3190 3191
  if (order == TSDB_ORDER_ASC) {
    pInfo->win.skey = win.skey;
    pInfo->win.ekey = win.ekey;
  } else {
    pInfo->win.skey = win.ekey;
    pInfo->win.ekey = win.skey;
  }
L
Liu Jicong 已提交
3192
  pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES);
3193

H
Haojun Liao 已提交
3194
  if (pInfo->pFillInfo == NULL || pInfo->p == NULL) {
H
Haojun Liao 已提交
3195 3196
    taosMemoryFree(pInfo->pFillInfo);
    taosMemoryFree(pInfo->p);
H
Haojun Liao 已提交
3197 3198 3199 3200 3201 3202
    return TSDB_CODE_OUT_OF_MEMORY;
  } else {
    return TSDB_CODE_SUCCESS;
  }
}

3203 3204 3205 3206 3207 3208
static bool isWstartColumnExist(SFillOperatorInfo* pInfo) {
  if (pInfo->numOfNotFillExpr == 0) {
    return false;
  }
  for (int32_t i = 0; i < pInfo->numOfNotFillExpr; ++i) {
    SExprInfo* exprInfo = pInfo->pNotFillExprInfo + i;
3209
    if (exprInfo->pExpr->nodeType == QUERY_NODE_COLUMN && exprInfo->base.numOfParams == 1 &&
3210 3211 3212 3213 3214 3215 3216 3217 3218 3219 3220 3221 3222 3223 3224
        exprInfo->base.pParam[0].pCol->colType == COLUMN_TYPE_WINDOW_START) {
      return true;
    }
  }
  return false;
}

static int32_t createWStartTsAsNotFillExpr(SFillOperatorInfo* pInfo, SFillPhysiNode* pPhyFillNode) {
  bool wstartExist = isWstartColumnExist(pInfo);
  if (wstartExist == false) {
    if (pPhyFillNode->pWStartTs->type != QUERY_NODE_TARGET) {
      qError("pWStartTs of fill physical node is not a target node");
      return TSDB_CODE_QRY_SYS_ERROR;
    }

3225 3226
    SExprInfo* notFillExprs =
        taosMemoryRealloc(pInfo->pNotFillExprInfo, (pInfo->numOfNotFillExpr + 1) * sizeof(SExprInfo));
3227 3228 3229 3230 3231 3232 3233 3234 3235 3236
    if (notFillExprs == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

    createExprFromTargetNode(notFillExprs + pInfo->numOfNotFillExpr, (STargetNode*)pPhyFillNode->pWStartTs);

    ++pInfo->numOfNotFillExpr;
    pInfo->pNotFillExprInfo = notFillExprs;
    return TSDB_CODE_SUCCESS;
  }
3237

3238 3239 3240
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
3241 3242
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode,
                                      SExecTaskInfo* pTaskInfo) {
3243 3244 3245 3246 3247 3248
  SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo));
  SOperatorInfo*     pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

H
Haojun Liao 已提交
3249 3250 3251 3252
  pInfo->pRes = createResDataBlock(pPhyFillNode->node.pOutputDataBlockDesc);
  SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pInfo->numOfExpr);
  pOperator->exprSupp.pExprInfo = pExprInfo;

H
Haojun Liao 已提交
3253
  pInfo->pNotFillExprInfo = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &pInfo->numOfNotFillExpr);
3254 3255 3256 3257
  int32_t code = createWStartTsAsNotFillExpr(pInfo, pPhyFillNode);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
H
Haojun Liao 已提交
3258

L
Liu Jicong 已提交
3259
  SInterval* pInterval =
3260
      QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType
L
Liu Jicong 已提交
3261 3262
          ? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval
          : &((SIntervalAggOperatorInfo*)downstream->info)->interval;
3263

3264
  int32_t order = (pPhyFillNode->inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
3265
  int32_t type = convertFillType(pPhyFillNode->mode);
3266

H
Haojun Liao 已提交
3267
  SResultInfo* pResultInfo = &pOperator->resultInfo;
H
Haojun Liao 已提交
3268

3269
  initResultSizeInfo(&pOperator->resultInfo, 4096);
H
Haojun Liao 已提交
3270 3271 3272 3273 3274
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
  code = initExprSupp(&pOperator->exprSupp, pExprInfo, pInfo->numOfExpr);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
H
Haojun Liao 已提交
3275

H
Haojun Liao 已提交
3276 3277
  pInfo->primaryTsCol = ((STargetNode*)pPhyFillNode->pWStartTs)->slotId;
  pInfo->primarySrcSlotId = ((SColumnNode*)((STargetNode*)pPhyFillNode->pWStartTs)->pExpr)->slotId;
3278

3279
  int32_t numOfOutputCols = 0;
H
Haojun Liao 已提交
3280 3281
  pInfo->pColMatchColInfo = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc,
                                                &numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
3282

3283
  code = initFillInfo(pInfo, pExprInfo, pInfo->numOfExpr, pInfo->pNotFillExprInfo, pInfo->numOfNotFillExpr,
3284 3285
                      (SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange, pResultInfo->capacity,
                      pTaskInfo->id.str, pInterval, type, order);
3286 3287 3288
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
3289

H
Haojun Liao 已提交
3290
  pInfo->pFinalRes = createOneDataBlock(pInfo->pRes, false);
H
Haojun Liao 已提交
3291 3292
  blockDataEnsureCapacity(pInfo->pFinalRes, pOperator->resultInfo.capacity);

3293 3294 3295 3296 3297
  pInfo->pCondition = pPhyFillNode->node.pConditions;
  pOperator->name = "FillOperator";
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL;
3298
  pOperator->exprSupp.numOfExprs = pInfo->numOfExpr;
3299 3300
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
3301

L
Liu Jicong 已提交
3302
  pOperator->fpSet =
3303
      createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroyFillOperatorInfo, NULL, NULL, NULL);
3304

3305
  code = appendDownstream(pOperator, &downstream, 1);
3306
  return pOperator;
H
Haojun Liao 已提交
3307

L
Liu Jicong 已提交
3308
_error:
H
Haojun Liao 已提交
3309 3310 3311 3312
  if (pInfo != NULL) {
    destroyFillOperatorInfo(pInfo);
  }

wafwerar's avatar
wafwerar 已提交
3313
  taosMemoryFreeClear(pOperator);
H
Haojun Liao 已提交
3314
  return NULL;
3315 3316
}

D
dapan1121 已提交
3317
static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPTR_EXEC_MODEL model, char* dbFName) {
wafwerar's avatar
wafwerar 已提交
3318
  SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
3319
  setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
H
Haojun Liao 已提交
3320

3321
  pTaskInfo->schemaInfo.dbname = strdup(dbFName);
3322
  pTaskInfo->cost.created = taosGetTimestampMs();
H
Haojun Liao 已提交
3323
  pTaskInfo->id.queryId = queryId;
dengyihao's avatar
dengyihao 已提交
3324
  pTaskInfo->execModel = model;
H
Haojun Liao 已提交
3325

wafwerar's avatar
wafwerar 已提交
3326
  char* p = taosMemoryCalloc(1, 128);
L
Liu Jicong 已提交
3327
  snprintf(p, 128, "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, queryId);
H
Haojun Liao 已提交
3328
  pTaskInfo->id.str = p;
H
Haojun Liao 已提交
3329

3330 3331
  return pTaskInfo;
}
H
Haojun Liao 已提交
3332

H
Haojun Liao 已提交
3333
static SArray* extractColumnInfo(SNodeList* pNodeList);
3334

H
Haojun Liao 已提交
3335 3336
SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode);

3337
int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, SExecTaskInfo* pTaskInfo) {
3338 3339
  SMetaReader mr = {0};
  metaReaderInit(&mr, pHandle->meta, 0);
3340
  int32_t code = metaGetTableEntryByUid(&mr, pScanNode->uid);
3341
  if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
3342 3343
    qError("failed to get the table meta, uid:0x%" PRIx64 ", suid:0x%" PRIx64 ", %s", pScanNode->uid, pScanNode->suid,
           GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
3344

D
dapan1121 已提交
3345
    metaReaderClear(&mr);
3346
    return terrno;
D
dapan1121 已提交
3347
  }
3348

3349 3350
  SSchemaInfo* pSchemaInfo = &pTaskInfo->schemaInfo;
  pSchemaInfo->tablename = strdup(mr.me.name);
3351 3352

  if (mr.me.type == TSDB_SUPER_TABLE) {
3353 3354
    pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
    pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version;
3355
  } else if (mr.me.type == TSDB_CHILD_TABLE) {
3356 3357
    tDecoderClear(&mr.coder);

3358 3359
    tb_uid_t suid = mr.me.ctbEntry.suid;
    metaGetTableEntryByUid(&mr, suid);
3360 3361
    pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
    pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version;
3362
  } else {
3363
    pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow);
3364
  }
3365 3366

  metaReaderClear(&mr);
3367

H
Haojun Liao 已提交
3368 3369 3370 3371 3372
  pSchemaInfo->qsw = extractQueriedColumnSchema(pScanNode);
  return TSDB_CODE_SUCCESS;
}

SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) {
3373 3374 3375
  int32_t numOfCols = LIST_LENGTH(pScanNode->pScanCols);
  int32_t numOfTags = LIST_LENGTH(pScanNode->pScanPseudoCols);

3376
  SSchemaWrapper* pqSw = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
3377
  pqSw->pSchema = taosMemoryCalloc(numOfCols + numOfTags, sizeof(SSchema));
3378

L
Liu Jicong 已提交
3379
  for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
3380
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pScanNode->pScanCols, i);
3381 3382
    SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;

H
Haojun Liao 已提交
3383 3384 3385
    SSchema* pSchema = &pqSw->pSchema[pqSw->nCols++];
    pSchema->colId = pColNode->colId;
    pSchema->type = pColNode->node.resType.type;
H
Haojun Liao 已提交
3386 3387
    pSchema->bytes = pColNode->node.resType.bytes;
    tstrncpy(pSchema->name, pColNode->colName, tListLen(pSchema->name));
3388 3389
  }

3390
  // this the tags and pseudo function columns, we only keep the tag columns
3391
  for (int32_t i = 0; i < numOfTags; ++i) {
3392 3393 3394 3395 3396 3397 3398 3399 3400
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pScanNode->pScanPseudoCols, i);

    int32_t type = nodeType(pNode->pExpr);
    if (type == QUERY_NODE_COLUMN) {
      SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;

      SSchema* pSchema = &pqSw->pSchema[pqSw->nCols++];
      pSchema->colId = pColNode->colId;
      pSchema->type = pColNode->node.resType.type;
H
Haojun Liao 已提交
3401
      pSchema->bytes = pColNode->node.resType.bytes;
H
Haojun Liao 已提交
3402
      tstrncpy(pSchema->name, pColNode->colName, tListLen(pSchema->name));
3403 3404 3405
    }
  }

H
Haojun Liao 已提交
3406
  return pqSw;
3407 3408
}

3409 3410
static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) {
  taosMemoryFreeClear(pSchemaInfo->dbname);
3411
  taosMemoryFreeClear(pSchemaInfo->tablename);
3412 3413
  tDeleteSSchemaWrapper(pSchemaInfo->sw);
  tDeleteSSchemaWrapper(pSchemaInfo->qsw);
3414 3415
}

3416
static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) { tDeleteSSchemaWrapper(pStreamInfo->schema); }
3417

H
Haojun Liao 已提交
3418
static int32_t sortTableGroup(STableListInfo* pTableListInfo) {
wmmhello's avatar
wmmhello 已提交
3419
  taosArrayClear(pTableListInfo->pGroupList);
H
Haojun Liao 已提交
3420
  SArray* sortSupport = taosArrayInit(16, sizeof(uint64_t));
3421
  if (sortSupport == NULL) return TSDB_CODE_OUT_OF_MEMORY;
wmmhello's avatar
wmmhello 已提交
3422 3423
  for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) {
    STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
3424
    uint64_t*      groupId = taosHashGet(pTableListInfo->map, &info->uid, sizeof(uint64_t));
wmmhello's avatar
wmmhello 已提交
3425 3426

    int32_t index = taosArraySearchIdx(sortSupport, groupId, compareUint64Val, TD_EQ);
3427 3428 3429 3430
    if (index == -1) {
      void*   p = taosArraySearch(sortSupport, groupId, compareUint64Val, TD_GT);
      SArray* tGroup = taosArrayInit(8, sizeof(STableKeyInfo));
      if (tGroup == NULL) {
wmmhello's avatar
wmmhello 已提交
3431 3432 3433
        taosArrayDestroy(sortSupport);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
3434
      if (taosArrayPush(tGroup, info) == NULL) {
wmmhello's avatar
wmmhello 已提交
3435 3436 3437 3438
        qError("taos push info array error");
        taosArrayDestroy(sortSupport);
        return TSDB_CODE_QRY_APP_ERROR;
      }
3439
      if (p == NULL) {
wmmhello's avatar
wmmhello 已提交
3440
        if (taosArrayPush(sortSupport, groupId) == NULL) {
wmmhello's avatar
wmmhello 已提交
3441 3442 3443 3444
          qError("taos push support array error");
          taosArrayDestroy(sortSupport);
          return TSDB_CODE_QRY_APP_ERROR;
        }
wmmhello's avatar
wmmhello 已提交
3445
        if (taosArrayPush(pTableListInfo->pGroupList, &tGroup) == NULL) {
wmmhello's avatar
wmmhello 已提交
3446 3447 3448 3449
          qError("taos push group array error");
          taosArrayDestroy(sortSupport);
          return TSDB_CODE_QRY_APP_ERROR;
        }
3450
      } else {
wmmhello's avatar
wmmhello 已提交
3451
        int32_t pos = TARRAY_ELEM_IDX(sortSupport, p);
3452
        if (taosArrayInsert(sortSupport, pos, groupId) == NULL) {
wmmhello's avatar
wmmhello 已提交
3453 3454 3455 3456
          qError("taos insert support array error");
          taosArrayDestroy(sortSupport);
          return TSDB_CODE_QRY_APP_ERROR;
        }
3457
        if (taosArrayInsert(pTableListInfo->pGroupList, pos, &tGroup) == NULL) {
wmmhello's avatar
wmmhello 已提交
3458 3459 3460 3461 3462
          qError("taos insert group array error");
          taosArrayDestroy(sortSupport);
          return TSDB_CODE_QRY_APP_ERROR;
        }
      }
3463
    } else {
wmmhello's avatar
wmmhello 已提交
3464
      SArray* tGroup = (SArray*)taosArrayGetP(pTableListInfo->pGroupList, index);
3465
      if (taosArrayPush(tGroup, info) == NULL) {
wmmhello's avatar
wmmhello 已提交
3466 3467 3468 3469 3470 3471 3472 3473 3474 3475
        qError("taos push uid array error");
        taosArrayDestroy(sortSupport);
        return TSDB_CODE_QRY_APP_ERROR;
      }
    }
  }
  taosArrayDestroy(sortSupport);
  return TDB_CODE_SUCCESS;
}

3476
bool groupbyTbname(SNodeList* pGroupList) {
3477
  bool bytbname = false;
H
Haojun Liao 已提交
3478
  if (LIST_LENGTH(pGroupList) > 0) {
3479 3480 3481 3482 3483 3484 3485 3486 3487 3488
    SNode* p = nodesListGetNode(pGroupList, 0);
    if (p->type == QUERY_NODE_FUNCTION) {
      // partition by tbname/group by tbname
      bytbname = (strcmp(((struct SFunctionNode*)p)->functionName, "tbname") == 0);
    }
  }

  return bytbname;
}

wmmhello's avatar
wmmhello 已提交
3489 3490
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group) {
  if (group == NULL) {
wmmhello's avatar
wmmhello 已提交
3491 3492 3493 3494 3495 3496 3497 3498
    return TDB_CODE_SUCCESS;
  }

  pTableListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  if (pTableListInfo->map == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

3499 3500
  bool assignUid = groupbyTbname(group);

H
Haojun Liao 已提交
3501
  size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
3502

H
Haojun Liao 已提交
3503 3504 3505
  if (assignUid) {
    for (int32_t i = 0; i < numOfTables; i++) {
      STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
3506
      info->groupId = info->uid;
H
Haojun Liao 已提交
3507 3508 3509 3510 3511 3512
      taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t));
    }
  } else {
    int32_t code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
wmmhello's avatar
wmmhello 已提交
3513 3514
    }
  }
3515

3516
  if (pTableListInfo->needSortTableByGroupId) {
H
Haojun Liao 已提交
3517
    return sortTableGroup(pTableListInfo);
3518 3519
  }

wmmhello's avatar
wmmhello 已提交
3520 3521 3522
  return TDB_CODE_SUCCESS;
}

3523 3524 3525 3526 3527 3528 3529 3530 3531 3532 3533 3534 3535 3536 3537 3538 3539
static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pCond) {
  memset(pCond, 0, sizeof(SQueryTableDataCond));

  pCond->order = TSDB_ORDER_ASC;
  pCond->numOfCols = 1;
  pCond->colList = taosMemoryCalloc(1, sizeof(SColumnInfo));
  if (pCond->colList == NULL) {
    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return terrno;
  }

  pCond->colList->colId = 1;
  pCond->colList->type = TSDB_DATA_TYPE_TIMESTAMP;
  pCond->colList->bytes = sizeof(TSKEY);

  pCond->twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
  pCond->suid = uid;
3540
  pCond->type = TIMEWINDOW_RANGE_CONTAINED;
3541
  pCond->startVersion = -1;
L
Liu Jicong 已提交
3542
  pCond->endVersion = -1;
3543 3544 3545 3546

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
3547
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
L
Liu Jicong 已提交
3548 3549
                                  STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
                                  const char* pUser) {
3550 3551
  int32_t type = nodeType(pPhyNode);

X
Xiaoyu Wang 已提交
3552
  if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
3553
    SOperatorInfo* pOperator = NULL;
H
Haojun Liao 已提交
3554
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
dengyihao's avatar
dengyihao 已提交
3555
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
3556

L
Liu Jicong 已提交
3557 3558 3559
      int32_t code =
          createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle,
                                  pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
3560
      if (code) {
wmmhello's avatar
wmmhello 已提交
3561
        pTaskInfo->code = code;
3562
        qError("failed to createScanTableListInfo, code:%s, %s", tstrerror(code), GET_TASKID(pTaskInfo));
D
dapan1121 已提交
3563 3564
        return NULL;
      }
wmmhello's avatar
wmmhello 已提交
3565

3566
      code = extractTableSchemaInfo(pHandle, &pTableScanNode->scan, pTaskInfo);
S
slzhou 已提交
3567
      if (code) {
3568
        pTaskInfo->code = terrno;
wmmhello's avatar
wmmhello 已提交
3569 3570 3571
        return NULL;
      }

3572
      pOperator = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
3573 3574
      STableScanInfo* pScanInfo = pOperator->info;
      pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
S
slzhou 已提交
3575 3576
    } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
      STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
L
Liu Jicong 已提交
3577 3578 3579
      int32_t                   code =
          createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle,
                                  pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
L
Liu Jicong 已提交
3580
      if (code) {
wmmhello's avatar
wmmhello 已提交
3581
        pTaskInfo->code = code;
H
Haojun Liao 已提交
3582
        qError("failed to createScanTableListInfo, code: %s", tstrerror(code));
wmmhello's avatar
wmmhello 已提交
3583 3584
        return NULL;
      }
3585

3586
      code = extractTableSchemaInfo(pHandle, &pTableScanNode->scan, pTaskInfo);
wmmhello's avatar
wmmhello 已提交
3587 3588 3589 3590
      if (code) {
        pTaskInfo->code = terrno;
        return NULL;
      }
wmmhello's avatar
wmmhello 已提交
3591

3592
      pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo);
wmmhello's avatar
wmmhello 已提交
3593

3594 3595
      STableScanInfo* pScanInfo = pOperator->info;
      pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
H
Haojun Liao 已提交
3596
    } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
3597 3598
      pOperator = createExchangeOperatorInfo(pHandle ? pHandle->pMsgCb->clientRpc : NULL, (SExchangePhysiNode*)pPhyNode,
                                             pTaskInfo);
H
Haojun Liao 已提交
3599
    } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
5
54liuyao 已提交
3600
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
5
54liuyao 已提交
3601
      if (pHandle->vnode) {
L
Liu Jicong 已提交
3602 3603 3604
        int32_t code =
            createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort,
                                    pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
L
Liu Jicong 已提交
3605
        if (code) {
wmmhello's avatar
wmmhello 已提交
3606
          pTaskInfo->code = code;
H
Haojun Liao 已提交
3607
          qError("failed to createScanTableListInfo, code: %s", tstrerror(code));
wmmhello's avatar
wmmhello 已提交
3608 3609
          return NULL;
        }
L
Liu Jicong 已提交
3610 3611 3612 3613 3614

#ifndef NDEBUG
        int32_t sz = taosArrayGetSize(pTableListInfo->pTableList);
        for (int32_t i = 0; i < sz; i++) {
          STableKeyInfo* pKeyInfo = taosArrayGet(pTableListInfo->pTableList, i);
S
Shengliang Guan 已提交
3615
          qDebug("creating stream task: add table %" PRId64, pKeyInfo->uid);
L
Liu Jicong 已提交
3616 3617
        }
#endif
3618
      }
3619

H
Haojun Liao 已提交
3620
      pTaskInfo->schemaInfo.qsw = extractQueriedColumnSchema(&pTableScanNode->scan);
3621
      pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTaskInfo);
H
Haojun Liao 已提交
3622
    } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
L
Liu Jicong 已提交
3623
      SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
3624
      pOperator = createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pUser, pTaskInfo);
3625
    } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
X
Xiaoyu Wang 已提交
3626
      STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode;
3627
      int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanPhyNode, pTagCond, pTagIndexCond, pTableListInfo);
3628
      if (code != TSDB_CODE_SUCCESS) {
3629
        pTaskInfo->code = code;
H
Haojun Liao 已提交
3630
        qError("failed to getTableList, code: %s", tstrerror(code));
3631 3632 3633
        return NULL;
      }

3634
      pOperator = createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo);
3635
    } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) {
3636
      SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode;
3637 3638 3639
      pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));

      if (pBlockNode->tableType == TSDB_SUPER_TABLE) {
3640
        int32_t code = vnodeGetAllTableList(pHandle->vnode, pBlockNode->uid, pTableListInfo->pTableList);
3641 3642 3643 3644 3645
        if (code != TSDB_CODE_SUCCESS) {
          pTaskInfo->code = terrno;
          return NULL;
        }
      } else {  // Create one table group.
3646
        STableKeyInfo info = {.uid = pBlockNode->uid, .groupId = 0};
3647 3648 3649 3650
        taosArrayPush(pTableListInfo->pTableList, &info);
      }

      SQueryTableDataCond cond = {0};
L
Liu Jicong 已提交
3651
      int32_t             code = initTableblockDistQueryCond(pBlockNode->suid, &cond);
3652 3653
      if (code != TSDB_CODE_SUCCESS) {
        return NULL;
3654
      }
H
Haojun Liao 已提交
3655 3656 3657

      STsdbReader* pReader = NULL;
      tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo->pTableList, &pReader, "");
3658 3659
      cleanupQueryTableDataCond(&cond);

3660
      pOperator = createDataBlockInfoScanOperator(pReader, pHandle, cond.suid, pBlockNode, pTaskInfo);
H
Haojun Liao 已提交
3661 3662 3663
    } else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) {
      SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode;

L
Liu Jicong 已提交
3664 3665
      int32_t code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo,
                                             pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
3666 3667 3668 3669
      if (code != TSDB_CODE_SUCCESS) {
        pTaskInfo->code = code;
        return NULL;
      }
3670

3671
      code = extractTableSchemaInfo(pHandle, &pScanNode->scan, pTaskInfo);
3672 3673 3674
      if (code != TSDB_CODE_SUCCESS) {
        pTaskInfo->code = code;
        return NULL;
H
Haojun Liao 已提交
3675 3676
      }

3677
      pOperator = createCacherowsScanOperator(pScanNode, pHandle, pTaskInfo);
3678
    } else if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
3679
      pOperator = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo);
H
Haojun Liao 已提交
3680 3681
    } else {
      ASSERT(0);
H
Haojun Liao 已提交
3682
    }
3683 3684 3685 3686 3687

    if (pOperator != NULL) {
      pOperator->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId;
    }

3688
    return pOperator;
H
Haojun Liao 已提交
3689 3690
  }

3691 3692
  int32_t num = 0;
  size_t  size = LIST_LENGTH(pPhyNode->pChildren);
H
Haojun Liao 已提交
3693

3694
  SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES);
dengyihao's avatar
dengyihao 已提交
3695
  for (int32_t i = 0; i < size; ++i) {
3696
    SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
3697
    ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, pTableListInfo, pTagCond, pTagIndexCond, pUser);
3698
    if (ops[i] == NULL) {
H
Haojun Liao 已提交
3699
      taosMemoryFree(ops);
3700 3701
      return NULL;
    }
3702 3703

    ops[i]->resultDataBlockId = pChildNode->pOutputDataBlockDesc->dataBlockId;
3704
  }
H
Haojun Liao 已提交
3705

3706
  SOperatorInfo* pOptr = NULL;
H
Haojun Liao 已提交
3707
  if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
3708
    pOptr = createProjectOperatorInfo(ops[0], (SProjectPhysiNode*)pPhyNode, pTaskInfo);
3709
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_AGG == type) {
H
Haojun Liao 已提交
3710 3711
    SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode;
    SExprInfo*     pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
3712
    SSDataBlock*   pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
H
Haojun Liao 已提交
3713

dengyihao's avatar
dengyihao 已提交
3714
    int32_t    numOfScalarExpr = 0;
3715 3716 3717 3718 3719
    SExprInfo* pScalarExprInfo = NULL;
    if (pAggNode->pExprs != NULL) {
      pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr);
    }

H
Haojun Liao 已提交
3720 3721
    if (pAggNode->pGroupKeys != NULL) {
      SArray* pColList = extractColumnInfo(pAggNode->pGroupKeys);
dengyihao's avatar
dengyihao 已提交
3722
      pOptr = createGroupOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions,
wmmhello's avatar
wmmhello 已提交
3723
                                      pScalarExprInfo, numOfScalarExpr, pTaskInfo);
H
Haojun Liao 已提交
3724
    } else {
L
Liu Jicong 已提交
3725
      pOptr = createAggregateOperatorInfo(ops[0], pExprInfo, num, pResBlock, pAggNode->node.pConditions,
3726
                                          pScalarExprInfo, numOfScalarExpr, pAggNode->mergeDataBlock, pTaskInfo);
H
Haojun Liao 已提交
3727
    }
3728
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL == type) {
H
Haojun Liao 已提交
3729
    SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
3730

H
Haojun Liao 已提交
3731
    SExprInfo*   pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
3732
    SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
H
Haojun Liao 已提交
3733

dengyihao's avatar
dengyihao 已提交
3734 3735 3736 3737 3738 3739
    SInterval interval = {.interval = pIntervalPhyNode->interval,
                          .sliding = pIntervalPhyNode->sliding,
                          .intervalUnit = pIntervalPhyNode->intervalUnit,
                          .slidingUnit = pIntervalPhyNode->slidingUnit,
                          .offset = pIntervalPhyNode->offset,
                          .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
H
Haojun Liao 已提交
3740

X
Xiaoyu Wang 已提交
3741 3742 3743 3744 3745
    STimeWindowAggSupp as = {
        .waterMark = pIntervalPhyNode->window.watermark,
        .calTrigger = pIntervalPhyNode->window.triggerType,
        .maxTs = INT64_MIN,
    };
3746
    ASSERT(as.calTrigger != STREAM_TRIGGER_MAX_DELAY);
3747

3748
    int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
L
Liu Jicong 已提交
3749
    bool    isStream = (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type);
3750 3751
    pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pIntervalPhyNode,
                                       pTaskInfo, isStream);
3752

3753
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type) {
3754
    pOptr = createStreamIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo);
3755 3756
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == type) {
    SMergeAlignedIntervalPhysiNode* pIntervalPhyNode = (SMergeAlignedIntervalPhysiNode*)pPhyNode;
3757
    pOptr = createMergeAlignedIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo);
S
shenglian zhou 已提交
3758
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) {
X
Xiaoyu Wang 已提交
3759
    SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode;
3760
    pOptr = createMergeIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo);
5
54liuyao 已提交
3761
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) {
3762
    int32_t children = 0;
5
54liuyao 已提交
3763 3764
    pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL == type) {
5
54liuyao 已提交
3765
    int32_t children = pHandle->numOfVgroups;
5
54liuyao 已提交
3766
    pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
H
Haojun Liao 已提交
3767
  } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
3768
    pOptr = createSortOperatorInfo(ops[0], (SSortPhysiNode*)pPhyNode, pTaskInfo);
S
shenglian zhou 已提交
3769 3770
  } else if (QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT == type) {
    pOptr = createGroupSortOperatorInfo(ops[0], (SGroupSortPhysiNode*)pPhyNode, pTaskInfo);
X
Xiaoyu Wang 已提交
3771
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE == type) {
3772
    SMergePhysiNode* pMergePhyNode = (SMergePhysiNode*)pPhyNode;
3773
    pOptr = createMultiwayMergeOperatorInfo(ops, size, pMergePhyNode, pTaskInfo);
3774
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION == type) {
H
Haojun Liao 已提交
3775
    SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
3776
    pOptr = createSessionAggOperatorInfo(ops[0], pSessionNode, pTaskInfo);
3777
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION == type) {
3778 3779 3780 3781 3782
    pOptr = createStreamSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION == type) {
    int32_t children = 0;
    pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION == type) {
3783
    int32_t children = pHandle->numOfVgroups;
3784
    pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
H
Haojun Liao 已提交
3785
  } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
3786
    pOptr = createPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo);
3787
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION == type) {
3788
    pOptr = createStreamPartitionOperatorInfo(ops[0], (SStreamPartitionPhysiNode*)pPhyNode, pTaskInfo);
3789
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE == type) {
dengyihao's avatar
dengyihao 已提交
3790
    SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*)pPhyNode;
3791
    pOptr = createStatewindowOperatorInfo(ops[0], pStateNode, pTaskInfo);
3792
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) {
5
54liuyao 已提交
3793
    pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
3794
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) {
3795
    pOptr = createMergeJoinOperatorInfo(ops, size, (SSortMergeJoinPhysiNode*)pPhyNode, pTaskInfo);
3796
  } else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) {
H
Haojun Liao 已提交
3797
    pOptr = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, pTaskInfo);
5
54liuyao 已提交
3798 3799
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL == type) {
    pOptr = createStreamFillOperatorInfo(ops[0], (SStreamFillPhysiNode*)pPhyNode, pTaskInfo);
H
Haojun Liao 已提交
3800 3801
  } else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) {
    pOptr = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo);
3802 3803
  } else if (QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC == type) {
    pOptr = createTimeSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo);
H
Haojun Liao 已提交
3804 3805
  } else {
    ASSERT(0);
H
Haojun Liao 已提交
3806
  }
3807

3808
  taosMemoryFree(ops);
3809 3810 3811 3812
  if (pOptr) {
    pOptr->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId;
  }

3813
  return pOptr;
3814
}
H
Haojun Liao 已提交
3815

H
Haojun Liao 已提交
3816
SArray* extractColumnInfo(SNodeList* pNodeList) {
L
Liu Jicong 已提交
3817
  size_t  numOfCols = LIST_LENGTH(pNodeList);
H
Haojun Liao 已提交
3818 3819 3820 3821 3822 3823
  SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn));
  if (pList == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

L
Liu Jicong 已提交
3824 3825
  for (int32_t i = 0; i < numOfCols; ++i) {
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);
H
Haojun Liao 已提交
3826

3827 3828 3829
    if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN) {
      SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;

3830
      SColumn c = extractColumnFromColumnNode(pColNode);
3831 3832
      taosArrayPush(pList, &c);
    } else if (nodeType(pNode->pExpr) == QUERY_NODE_VALUE) {
L
Liu Jicong 已提交
3833 3834
      SValueNode* pValNode = (SValueNode*)pNode->pExpr;
      SColumn     c = {0};
3835
      c.slotId = pNode->slotId;
L
Liu Jicong 已提交
3836 3837 3838 3839
      c.colId = pNode->slotId;
      c.type = pValNode->node.type;
      c.bytes = pValNode->node.resType.bytes;
      c.scale = pValNode->node.resType.scale;
3840 3841 3842 3843
      c.precision = pValNode->node.resType.precision;

      taosArrayPush(pList, &c);
    }
H
Haojun Liao 已提交
3844 3845 3846 3847 3848
  }

  return pList;
}

L
Liu Jicong 已提交
3849 3850 3851 3852 3853 3854 3855 3856 3857 3858 3859 3860 3861
static int32_t extractTbscanInStreamOpTree(SOperatorInfo* pOperator, STableScanInfo** ppInfo) {
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
    if (pOperator->numOfDownstream == 0) {
      qError("failed to find stream scan operator");
      return TSDB_CODE_QRY_APP_ERROR;
    }

    if (pOperator->numOfDownstream > 1) {
      qError("join not supported for stream block scan");
      return TSDB_CODE_QRY_APP_ERROR;
    }
    return extractTbscanInStreamOpTree(pOperator->pDownstream[0], ppInfo);
  } else {
3862 3863 3864
    SStreamScanInfo* pInfo = pOperator->info;
    ASSERT(pInfo->pTableScanOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
    *ppInfo = pInfo->pTableScanOp->info;
L
Liu Jicong 已提交
3865 3866 3867 3868
    return 0;
  }
}

3869 3870 3871 3872 3873 3874 3875 3876 3877 3878 3879 3880 3881 3882 3883 3884 3885 3886 3887 3888 3889 3890
int32_t extractTableScanNode(SPhysiNode* pNode, STableScanPhysiNode** ppNode) {
  if (pNode->pChildren == NULL || LIST_LENGTH(pNode->pChildren) == 0) {
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == pNode->type) {
      *ppNode = (STableScanPhysiNode*)pNode;
      return 0;
    } else {
      ASSERT(0);
      terrno = TSDB_CODE_QRY_APP_ERROR;
      return -1;
    }
  } else {
    if (LIST_LENGTH(pNode->pChildren) != 1) {
      ASSERT(0);
      terrno = TSDB_CODE_QRY_APP_ERROR;
      return -1;
    }
    SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pNode->pChildren, 0);
    return extractTableScanNode(pChildNode, ppNode);
  }
  return -1;
}

3891
#if 0
L
Liu Jicong 已提交
3892 3893 3894 3895 3896
int32_t rebuildReader(SOperatorInfo* pOperator, SSubplan* plan, SReadHandle* pHandle, int64_t uid, int64_t ts) {
  STableScanInfo* pTableScanInfo = NULL;
  if (extractTbscanInStreamOpTree(pOperator, &pTableScanInfo) < 0) {
    return -1;
  }
3897

L
Liu Jicong 已提交
3898 3899 3900 3901
  STableScanPhysiNode* pNode = NULL;
  if (extractTableScanNode(plan->pNode, &pNode) < 0) {
    ASSERT(0);
  }
3902

H
Haojun Liao 已提交
3903
  tsdbReaderClose(pTableScanInfo->dataReader);
3904

L
Liu Jicong 已提交
3905
  STableListInfo info = {0};
H
Haojun Liao 已提交
3906
  pTableScanInfo->dataReader = doCreateDataReader(pNode, pHandle, &info, NULL);
L
Liu Jicong 已提交
3907 3908 3909 3910
  if (pTableScanInfo->dataReader == NULL) {
    ASSERT(0);
    qError("failed to create data reader");
    return TSDB_CODE_QRY_APP_ERROR;
3911
  }
L
Liu Jicong 已提交
3912
  // TODO: set uid and ts to data reader
3913 3914
  return 0;
}
3915
#endif
3916

C
Cary Xu 已提交
3917
int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t* length, int32_t* nOptrWithVal) {
wmmhello's avatar
wmmhello 已提交
3918
  int32_t code = TDB_CODE_SUCCESS;
3919
  char*   pCurrent = NULL;
wmmhello's avatar
wmmhello 已提交
3920
  int32_t currLength = 0;
3921
  if (ops->fpSet.encodeResultRow) {
C
Cary Xu 已提交
3922
    if (result == NULL || length == NULL || nOptrWithVal == NULL) {
wmmhello's avatar
wmmhello 已提交
3923 3924 3925
      return TSDB_CODE_TSC_INVALID_INPUT;
    }
    code = ops->fpSet.encodeResultRow(ops, &pCurrent, &currLength);
wmmhello's avatar
wmmhello 已提交
3926

3927 3928
    if (code != TDB_CODE_SUCCESS) {
      if (*result != NULL) {
wmmhello's avatar
wmmhello 已提交
3929 3930 3931 3932
        taosMemoryFree(*result);
        *result = NULL;
      }
      return code;
C
Cary Xu 已提交
3933 3934 3935
    } else if (currLength == 0) {
      ASSERT(!pCurrent);
      goto _downstream;
wmmhello's avatar
wmmhello 已提交
3936
    }
wmmhello's avatar
wmmhello 已提交
3937

C
Cary Xu 已提交
3938 3939
    ++(*nOptrWithVal);

C
Cary Xu 已提交
3940
    ASSERT(currLength >= 0);
wmmhello's avatar
wmmhello 已提交
3941

3942
    if (*result == NULL) {
wmmhello's avatar
wmmhello 已提交
3943
      *result = (char*)taosMemoryCalloc(1, currLength + sizeof(int32_t));
wmmhello's avatar
wmmhello 已提交
3944 3945 3946 3947 3948 3949
      if (*result == NULL) {
        taosMemoryFree(pCurrent);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      memcpy(*result + sizeof(int32_t), pCurrent, currLength);
      *(int32_t*)(*result) = currLength + sizeof(int32_t);
3950
    } else {
wmmhello's avatar
wmmhello 已提交
3951
      int32_t sizePre = *(int32_t*)(*result);
3952
      char*   tmp = (char*)taosMemoryRealloc(*result, sizePre + currLength);
wmmhello's avatar
wmmhello 已提交
3953 3954 3955 3956 3957 3958 3959 3960 3961 3962 3963 3964
      if (tmp == NULL) {
        taosMemoryFree(pCurrent);
        taosMemoryFree(*result);
        *result = NULL;
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      *result = tmp;
      memcpy(*result + sizePre, pCurrent, currLength);
      *(int32_t*)(*result) += currLength;
    }
    taosMemoryFree(pCurrent);
    *length = *(int32_t*)(*result);
wmmhello's avatar
wmmhello 已提交
3965 3966
  }

C
Cary Xu 已提交
3967
_downstream:
wmmhello's avatar
wmmhello 已提交
3968
  for (int32_t i = 0; i < ops->numOfDownstream; ++i) {
C
Cary Xu 已提交
3969
    code = encodeOperator(ops->pDownstream[i], result, length, nOptrWithVal);
3970
    if (code != TDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
3971
      return code;
wmmhello's avatar
wmmhello 已提交
3972 3973
    }
  }
wmmhello's avatar
wmmhello 已提交
3974
  return TDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
3975 3976
}

H
Haojun Liao 已提交
3977
int32_t decodeOperator(SOperatorInfo* ops, const char* result, int32_t length) {
wmmhello's avatar
wmmhello 已提交
3978
  int32_t code = TDB_CODE_SUCCESS;
3979 3980
  if (ops->fpSet.decodeResultRow) {
    if (result == NULL) {
wmmhello's avatar
wmmhello 已提交
3981 3982
      return TSDB_CODE_TSC_INVALID_INPUT;
    }
H
Haojun Liao 已提交
3983

3984
    ASSERT(length == *(int32_t*)result);
H
Haojun Liao 已提交
3985 3986

    const char* data = result + sizeof(int32_t);
L
Liu Jicong 已提交
3987
    code = ops->fpSet.decodeResultRow(ops, (char*)data);
3988
    if (code != TDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
3989 3990
      return code;
    }
wmmhello's avatar
wmmhello 已提交
3991

wmmhello's avatar
wmmhello 已提交
3992
    int32_t totalLength = *(int32_t*)result;
3993 3994
    int32_t dataLength = *(int32_t*)data;

3995
    if (totalLength == dataLength + sizeof(int32_t)) {  // the last data
wmmhello's avatar
wmmhello 已提交
3996 3997
      result = NULL;
      length = 0;
3998
    } else {
wmmhello's avatar
wmmhello 已提交
3999 4000 4001 4002
      result += dataLength;
      *(int32_t*)(result) = totalLength - dataLength;
      length = totalLength - dataLength;
    }
wmmhello's avatar
wmmhello 已提交
4003 4004
  }

wmmhello's avatar
wmmhello 已提交
4005 4006
  for (int32_t i = 0; i < ops->numOfDownstream; ++i) {
    code = decodeOperator(ops->pDownstream[i], result, length);
4007
    if (code != TDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
4008
      return code;
wmmhello's avatar
wmmhello 已提交
4009 4010
    }
  }
wmmhello's avatar
wmmhello 已提交
4011
  return TDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
4012 4013
}

D
dapan1121 已提交
4014
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle) {
D
dapan1121 已提交
4015
  SExecTaskInfo* pTask = *(SExecTaskInfo**)pTaskInfo;
4016

D
dapan1121 已提交
4017
  switch (pNode->type) {
D
dapan1121 已提交
4018 4019 4020 4021 4022 4023
    case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: {
      SInserterParam* pInserterParam = taosMemoryCalloc(1, sizeof(SInserterParam));
      if (NULL == pInserterParam) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      pInserterParam->readHandle = readHandle;
L
Liu Jicong 已提交
4024

D
dapan1121 已提交
4025 4026 4027
      *pParam = pInserterParam;
      break;
    }
D
dapan1121 已提交
4028
    case QUERY_NODE_PHYSICAL_PLAN_DELETE: {
4029
      SDeleterParam* pDeleterParam = taosMemoryCalloc(1, sizeof(SDeleterParam));
D
dapan1121 已提交
4030 4031 4032 4033
      if (NULL == pDeleterParam) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      int32_t tbNum = taosArrayGetSize(pTask->tableqinfoList.pTableList);
D
dapan1121 已提交
4034
      pDeleterParam->suid = pTask->tableqinfoList.suid;
D
dapan1121 已提交
4035 4036 4037 4038 4039 4040
      pDeleterParam->pUidList = taosArrayInit(tbNum, sizeof(uint64_t));
      if (NULL == pDeleterParam->pUidList) {
        taosMemoryFree(pDeleterParam);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      for (int32_t i = 0; i < tbNum; ++i) {
4041
        STableKeyInfo* pTable = taosArrayGet(pTask->tableqinfoList.pTableList, i);
D
dapan1121 已提交
4042 4043 4044 4045 4046 4047 4048 4049 4050 4051 4052 4053 4054
        taosArrayPush(pDeleterParam->pUidList, &pTable->uid);
      }

      *pParam = pDeleterParam;
      break;
    }
    default:
      break;
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
4055
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
D
dapan1121 已提交
4056
                               char* sql, EOPTR_EXEC_MODEL model) {
H
Haojun Liao 已提交
4057 4058
  uint64_t queryId = pPlan->id.queryId;

H
Haojun Liao 已提交
4059
  int32_t code = TSDB_CODE_SUCCESS;
D
dapan1121 已提交
4060
  *pTaskInfo = createExecTaskInfo(queryId, taskId, model, pPlan->dbFName);
H
Haojun Liao 已提交
4061 4062 4063 4064
  if (*pTaskInfo == NULL) {
    code = TSDB_CODE_QRY_OUT_OF_MEMORY;
    goto _complete;
  }
H
Haojun Liao 已提交
4065

H
Haojun Liao 已提交
4066 4067 4068 4069
  if (pHandle && pHandle->pStateBackend) {
    (*pTaskInfo)->streamInfo.pState = pHandle->pStateBackend;
  }

4070
  (*pTaskInfo)->sql = sql;
D
dapan1121 已提交
4071
  sql = NULL;
4072
  (*pTaskInfo)->pSubplan = pPlan;
L
Liu Jicong 已提交
4073 4074
  (*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, &(*pTaskInfo)->tableqinfoList,
                                           pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user);
L
Liu Jicong 已提交
4075

D
dapan1121 已提交
4076
  if (NULL == (*pTaskInfo)->pRoot) {
4077
    code = (*pTaskInfo)->code;
D
dapan1121 已提交
4078
    goto _complete;
4079 4080
  }

H
Haojun Liao 已提交
4081 4082
  return code;

H
Haojun Liao 已提交
4083
_complete:
D
dapan1121 已提交
4084
  taosMemoryFree(sql);
H
Haojun Liao 已提交
4085
  doDestroyTask(*pTaskInfo);
H
Haojun Liao 已提交
4086 4087
  terrno = code;
  return code;
H
Haojun Liao 已提交
4088 4089
}

4090
void doDestroyTableList(STableListInfo* pTableqinfoList) {
wmmhello's avatar
wmmhello 已提交
4091 4092
  taosArrayDestroy(pTableqinfoList->pTableList);
  taosHashCleanup(pTableqinfoList->map);
4093 4094
  if (pTableqinfoList->needSortTableByGroupId) {
    for (int32_t i = 0; i < taosArrayGetSize(pTableqinfoList->pGroupList); i++) {
wmmhello's avatar
wmmhello 已提交
4095
      SArray* tmp = taosArrayGetP(pTableqinfoList->pGroupList, i);
4096 4097 4098
      if (tmp == pTableqinfoList->pTableList) {
        continue;
      }
wmmhello's avatar
wmmhello 已提交
4099 4100 4101 4102
      taosArrayDestroy(tmp);
    }
  }
  taosArrayDestroy(pTableqinfoList->pGroupList);
4103

wmmhello's avatar
wmmhello 已提交
4104 4105
  pTableqinfoList->pTableList = NULL;
  pTableqinfoList->map = NULL;
4106 4107
}

L
Liu Jicong 已提交
4108
void doDestroyTask(SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
4109 4110
  qDebug("%s execTask is freed", GET_TASKID(pTaskInfo));

wmmhello's avatar
wmmhello 已提交
4111
  doDestroyTableList(&pTaskInfo->tableqinfoList);
H
Haojun Liao 已提交
4112
  destroyOperatorInfo(pTaskInfo->pRoot);
4113
  cleanupTableSchemaInfo(&pTaskInfo->schemaInfo);
4114
  cleanupStreamInfo(&pTaskInfo->streamInfo);
4115

D
dapan1121 已提交
4116
  if (!pTaskInfo->localFetch.localExec) {
D
dapan1121 已提交
4117 4118
    nodesDestroyNode((SNode*)pTaskInfo->pSubplan);
  }
4119

wafwerar's avatar
wafwerar 已提交
4120 4121 4122
  taosMemoryFreeClear(pTaskInfo->sql);
  taosMemoryFreeClear(pTaskInfo->id.str);
  taosMemoryFreeClear(pTaskInfo);
4123 4124 4125 4126
}

static int64_t getQuerySupportBufSize(size_t numOfTables) {
  size_t s1 = sizeof(STableQueryInfo);
L
Liu Jicong 已提交
4127 4128
  //  size_t s3 = sizeof(STableCheckInfo);  buffer consumption in tsdb
  return (int64_t)(s1 * 1.5 * numOfTables);
4129 4130 4131 4132 4133 4134 4135
}

int32_t checkForQueryBuf(size_t numOfTables) {
  int64_t t = getQuerySupportBufSize(numOfTables);
  if (tsQueryBufferSizeBytes < 0) {
    return TSDB_CODE_SUCCESS;
  } else if (tsQueryBufferSizeBytes > 0) {
L
Liu Jicong 已提交
4136
    while (1) {
4137 4138 4139 4140 4141 4142 4143 4144 4145 4146 4147 4148 4149 4150 4151 4152 4153 4154 4155 4156 4157 4158 4159 4160 4161 4162
      int64_t s = tsQueryBufferSizeBytes;
      int64_t remain = s - t;
      if (remain >= 0) {
        if (atomic_val_compare_exchange_64(&tsQueryBufferSizeBytes, s, remain) == s) {
          return TSDB_CODE_SUCCESS;
        }
      } else {
        return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
      }
    }
  }

  // disable query processing if the value of tsQueryBufferSize is zero.
  return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
}

void releaseQueryBuf(size_t numOfTables) {
  if (tsQueryBufferSizeBytes < 0) {
    return;
  }

  int64_t t = getQuerySupportBufSize(numOfTables);

  // restore value is not enough buffer available
  atomic_add_fetch_64(&tsQueryBufferSizeBytes, t);
}
D
dapan1121 已提交
4163

H
Haojun Liao 已提交
4164
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList) {
4165
  SExplainExecInfo  execInfo = {0};
H
Haojun Liao 已提交
4166
  SExplainExecInfo* pExplainInfo = taosArrayPush(pExecInfoList, &execInfo);
4167

H
Haojun Liao 已提交
4168 4169 4170 4171 4172
  pExplainInfo->numOfRows = operatorInfo->resultInfo.totalRows;
  pExplainInfo->startupCost = operatorInfo->cost.openCost;
  pExplainInfo->totalCost = operatorInfo->cost.totalCost;
  pExplainInfo->verboseLen = 0;
  pExplainInfo->verboseInfo = NULL;
D
dapan1121 已提交
4173

4174
  if (operatorInfo->fpSet.getExplainFn) {
4175 4176
    int32_t code =
        operatorInfo->fpSet.getExplainFn(operatorInfo, &pExplainInfo->verboseInfo, &pExplainInfo->verboseLen);
D
dapan1121 已提交
4177
    if (code) {
4178
      qError("%s operator getExplainFn failed, code:%s", GET_TASKID(operatorInfo->pTaskInfo), tstrerror(code));
D
dapan1121 已提交
4179 4180 4181
      return code;
    }
  }
dengyihao's avatar
dengyihao 已提交
4182

D
dapan1121 已提交
4183
  int32_t code = 0;
D
dapan1121 已提交
4184
  for (int32_t i = 0; i < operatorInfo->numOfDownstream; ++i) {
H
Haojun Liao 已提交
4185 4186
    code = getOperatorExplainExecInfo(operatorInfo->pDownstream[i], pExecInfoList);
    if (code != TSDB_CODE_SUCCESS) {
4187
      //      taosMemoryFreeClear(*pRes);
D
dapan1121 已提交
4188 4189 4190 4191 4192
      return TSDB_CODE_QRY_OUT_OF_MEMORY;
    }
  }

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
4193
}
5
54liuyao 已提交
4194

4195 4196
int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId,
                     SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup) {
4197 4198 4199 4200 4201 4202
  SWinKey key = {
      .ts = win->skey,
      .groupId = tableGroupId,
  };
  char*   value = NULL;
  int32_t size = pAggSup->resultRowSize;
5
54liuyao 已提交
4203

4204
  if (streamStateAddIfNotExist(pState, &key, (void**)&value, &size) < 0) {
4205 4206 4207 4208 4209 4210 4211 4212 4213 4214
    return TSDB_CODE_QRY_OUT_OF_MEMORY;
  }
  *pResult = (SResultRow*)value;
  ASSERT(*pResult);
  // set time window for current result
  (*pResult)->win = (*win);
  setResultRowInitCtx(*pResult, pCtx, numOfOutput, rowEntryInfoOffset);
  return TSDB_CODE_SUCCESS;
}

4215 4216
int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult) {
  streamStateReleaseBuf(pState, pKey, pResult);
4217 4218 4219
  return TSDB_CODE_SUCCESS;
}

4220 4221
int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize) {
  streamStatePut(pState, pKey, pResult, resSize);
4222 4223 4224
  return TSDB_CODE_SUCCESS;
}

4225
int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup,
4226
                                   SGroupResInfo* pGroupResInfo) {
4227
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
4228 4229 4230 4231 4232 4233 4234 4235 4236 4237 4238 4239 4240 4241 4242
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  int32_t         numOfExprs = pSup->numOfExprs;
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
  SqlFunctionCtx* pCtx = pSup->pCtx;

  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);

  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
    SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
    int32_t     size = 0;
    void*       pVal = NULL;
    SWinKey     key = {
            .ts = *(TSKEY*)pPos->key,
            .groupId = pPos->groupId,
    };
4243
    int32_t code = streamStateGet(pState, &key, &pVal, &size);
4244 4245 4246 4247 4248 4249
    ASSERT(code == 0);
    SResultRow* pRow = (SResultRow*)pVal;
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
    // no results, continue to check the next one
    if (pRow->numOfRows == 0) {
      pGroupResInfo->index += 1;
4250
      releaseOutputBuf(pState, &key, pRow);
4251 4252 4253 4254 4255
      continue;
    }

    if (pBlock->info.groupId == 0) {
      pBlock->info.groupId = pPos->groupId;
4256 4257 4258 4259
      SStreamIntervalOperatorInfo* pInfo = pOperator->info;
      char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t));
      if (tbname != NULL) {
        memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
L
Liu Jicong 已提交
4260 4261
      } else {
        pBlock->info.parTbName[0] = 0;
4262
      }
4263 4264 4265
    } else {
      // current value belongs to different group, it can't be packed into one datablock
      if (pBlock->info.groupId != pPos->groupId) {
4266
        releaseOutputBuf(pState, &key, pRow);
4267 4268 4269 4270 4271 4272
        break;
      }
    }

    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
      ASSERT(pBlock->info.rows > 0);
4273
      releaseOutputBuf(pState, &key, pRow);
4274 4275 4276 4277 4278 4279 4280 4281 4282 4283
      break;
    }

    pGroupResInfo->index += 1;

    for (int32_t j = 0; j < numOfExprs; ++j) {
      int32_t slotId = pExprInfo[j].base.resSchema.slotId;

      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
      if (pCtx[j].fpSet.finalize) {
4284 4285 4286 4287
        int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
        if (TAOS_FAILED(code1)) {
          qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code1));
          T_LONG_JMP(pTaskInfo->env, code1);
4288 4289 4290 4291 4292 4293 4294 4295 4296 4297 4298 4299 4300
        }
      } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
        // do nothing, todo refactor
      } else {
        // expand the result into multiple rows. E.g., _wstart, top(k, 20)
        // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
        SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
        char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
        for (int32_t k = 0; k < pRow->numOfRows; ++k) {
          colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
        }
      }
    }
5
54liuyao 已提交
4301

4302
    pBlock->info.rows += pRow->numOfRows;
4303
    releaseOutputBuf(pState, &key, pRow);
4304 4305 4306 4307
  }
  blockDataUpdateTsWindow(pBlock, 0);
  return TSDB_CODE_SUCCESS;
}
5
54liuyao 已提交
4308 4309 4310 4311 4312 4313 4314

int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, int32_t size) {
  streamStateSessionPut(pState, key, (const void*)buf, size);
  releaseOutputBuf(pState, NULL, (SResultRow*)buf);
  return TSDB_CODE_SUCCESS;
}

4315
int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock,
5
54liuyao 已提交
4316
                                    SExprSupp* pSup, SGroupResInfo* pGroupResInfo) {
4317
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
5
54liuyao 已提交
4318 4319 4320 4321 4322 4323 4324 4325 4326 4327 4328 4329 4330 4331 4332 4333 4334 4335 4336 4337 4338 4339 4340 4341
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  int32_t         numOfExprs = pSup->numOfExprs;
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
  SqlFunctionCtx* pCtx = pSup->pCtx;

  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);

  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
    SSessionKey* pKey = taosArrayGet(pGroupResInfo->pRows, i);
    int32_t      size = 0;
    void*        pVal = NULL;
    int32_t      code = streamStateSessionGet(pState, pKey, &pVal, &size);
    ASSERT(code == 0);
    SResultRow* pRow = (SResultRow*)pVal;
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
    // no results, continue to check the next one
    if (pRow->numOfRows == 0) {
      pGroupResInfo->index += 1;
      releaseOutputBuf(pState, NULL, pRow);
      continue;
    }

    if (pBlock->info.groupId == 0) {
      pBlock->info.groupId = pKey->groupId;
4342 4343 4344 4345 4346 4347 4348 4349 4350 4351 4352 4353 4354 4355 4356 4357 4358 4359 4360 4361 4362 4363 4364 4365 4366

      if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
        SStreamStateAggOperatorInfo* pInfo = pOperator->info;

        char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t));
        if (tbname != NULL) {
          memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
        } else {
          pBlock->info.parTbName[0] = 0;
        }
      } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
                 pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
                 pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
        SStreamSessionAggOperatorInfo* pInfo = pOperator->info;

        char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t));
        if (tbname != NULL) {
          memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
        } else {
          pBlock->info.parTbName[0] = 0;
        }
      } else {
        ASSERT(0);
      }

5
54liuyao 已提交
4367 4368 4369 4370 4371 4372 4373 4374 4375 4376 4377 4378 4379 4380 4381 4382 4383 4384 4385 4386 4387 4388 4389 4390 4391 4392 4393 4394 4395 4396 4397 4398 4399 4400 4401 4402 4403 4404 4405 4406 4407 4408 4409 4410 4411
    } else {
      // current value belongs to different group, it can't be packed into one datablock
      if (pBlock->info.groupId != pKey->groupId) {
        releaseOutputBuf(pState, NULL, pRow);
        break;
      }
    }

    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
      ASSERT(pBlock->info.rows > 0);
      releaseOutputBuf(pState, NULL, pRow);
      break;
    }

    pGroupResInfo->index += 1;

    for (int32_t j = 0; j < numOfExprs; ++j) {
      int32_t slotId = pExprInfo[j].base.resSchema.slotId;

      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
      if (pCtx[j].fpSet.finalize) {
        int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
        if (TAOS_FAILED(code1)) {
          qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code1));
          T_LONG_JMP(pTaskInfo->env, code1);
        }
      } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
        // do nothing, todo refactor
      } else {
        // expand the result into multiple rows. E.g., _wstart, top(k, 20)
        // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
        SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
        char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
        for (int32_t k = 0; k < pRow->numOfRows; ++k) {
          colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
        }
      }
    }

    pBlock->info.rows += pRow->numOfRows;
    // saveSessionDiscBuf(pState, pKey, pVal, size);
    releaseOutputBuf(pState, NULL, pRow);
  }
  blockDataUpdateTsWindow(pBlock, 0);
  return TSDB_CODE_SUCCESS;
4412
}