executorimpl.c 70.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
15

H
Haojun Liao 已提交
16 17
#include "filter.h"
#include "function.h"
18 19
#include "functionMgt.h"
#include "os.h"
H
Haojun Liao 已提交
20
#include "querynodes.h"
21
#include "tfill.h"
dengyihao's avatar
dengyihao 已提交
22
#include "tname.h"
23

H
Haojun Liao 已提交
24
#include "tdatablock.h"
25
#include "tglobal.h"
H
Haojun Liao 已提交
26
#include "tmsg.h"
27
#include "ttime.h"
H
Haojun Liao 已提交
28

29
#include "executorimpl.h"
dengyihao's avatar
dengyihao 已提交
30
#include "index.h"
31
#include "query.h"
32
#include "tcompare.h"
H
Haojun Liao 已提交
33
#include "thash.h"
34
#include "ttypes.h"
dengyihao's avatar
dengyihao 已提交
35
#include "vnode.h"
36 37 38 39 40 41

#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)

#if 0
static UNUSED_FUNC void *u_malloc (size_t __size) {
wafwerar's avatar
wafwerar 已提交
42
  uint32_t v = taosRand();
43 44 45 46

  if (v % 1000 <= 0) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
47
    return taosMemoryMalloc(__size);
48 49 50 51
  }
}

static UNUSED_FUNC void* u_calloc(size_t num, size_t __size) {
wafwerar's avatar
wafwerar 已提交
52
  uint32_t v = taosRand();
53 54 55
  if (v % 1000 <= 0) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
56
    return taosMemoryCalloc(num, __size);
57 58 59 60
  }
}

static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
wafwerar's avatar
wafwerar 已提交
61
  uint32_t v = taosRand();
62 63 64
  if (v % 5 <= 1) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
65
    return taosMemoryRealloc(p, __size);
66 67 68 69 70 71 72 73
  }
}

#define calloc  u_calloc
#define malloc  u_malloc
#define realloc u_realloc
#endif

X
Xiaoyu Wang 已提交
74
#define CLEAR_QUERY_STATUS(q, st)   ((q)->status &= (~(st)))
H
Haojun Liao 已提交
75

76
static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pBlock);
77

X
Xiaoyu Wang 已提交
78
static void releaseQueryBuf(size_t numOfTables);
79

H
Haojun Liao 已提交
80 81
static void    initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
static void    doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
82

H
Haojun Liao 已提交
83 84 85 86
static void    extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep,
                                                   int32_t status);
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
                                   bool createDummyCol);
H
Haojun Liao 已提交
87 88
static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
                                  SGroupResInfo* pGroupResInfo);
89
static SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode);
H
Haojun Liao 已提交
90

H
Haojun Liao 已提交
91
void setOperatorCompleted(SOperatorInfo* pOperator) {
92
  pOperator->status = OP_EXEC_DONE;
93
  pOperator->cost.totalCost = (taosGetTimestampUs() - pOperator->pTaskInfo->cost.start) / 1000.0;
H
Haojun Liao 已提交
94
  setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
95
}
96

H
Haojun Liao 已提交
97 98 99 100 101 102 103 104 105 106
void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, bool blocking, int32_t status,
                     void* pInfo, SExecTaskInfo* pTaskInfo) {
  pOperator->name = (char*)name;
  pOperator->operatorType = type;
  pOperator->blocking = blocking;
  pOperator->status = status;
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
}

107
int32_t optrDummyOpenFn(SOperatorInfo* pOperator) {
108
  OPTR_SET_OPENED(pOperator);
109
  pOperator->cost.openCost = 0;
H
Haojun Liao 已提交
110
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
111 112
}

H
Haojun Liao 已提交
113
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup,
114 115
                                   __optr_close_fn_t closeFn, __optr_reqBuf_fn_t reqBufFn,
                                   __optr_explain_fn_t explain) {
116 117 118 119 120
  SOperatorFpSet fpSet = {
      ._openFn = openFn,
      .getNextFn = nextFn,
      .cleanupFn = cleanup,
      .closeFn = closeFn,
121
      .reqBufFn = reqBufFn,
122 123 124 125 126 127
      .getExplainFn = explain,
  };

  return fpSet;
}

128
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize) {
L
Liu Jicong 已提交
129
  SFilePage* pData = NULL;
130 131 132

  // in the first scan, new space needed for results
  int32_t pageId = -1;
133
  if (*currentPageId == -1) {
134
    pData = getNewBufPage(pResultBuf, &pageId);
135 136
    pData->num = sizeof(SFilePage);
  } else {
137
    pData = getBufPage(pResultBuf, *currentPageId);
138 139 140 141 142
    if (pData == NULL) {
      qError("failed to get buffer, code:%s", tstrerror(terrno));
      return NULL;
    }

143
    pageId = *currentPageId;
144

wmmhello's avatar
wmmhello 已提交
145
    if (pData->num + interBufSize > getBufPageSize(pResultBuf)) {
146
      // release current page first, and prepare the next one
147
      releaseBufPage(pResultBuf, pData);
148

149
      pData = getNewBufPage(pResultBuf, &pageId);
150 151 152 153 154 155 156 157 158 159
      if (pData != NULL) {
        pData->num = sizeof(SFilePage);
      }
    }
  }

  if (pData == NULL) {
    return NULL;
  }

160 161
  setBufPageDirty(pData, true);

162 163
  // set the number of rows in current disk page
  SResultRow* pResultRow = (SResultRow*)((char*)pData + pData->num);
164

X
Xiaoyu Wang 已提交
165
  memset((char*)pResultRow, 0, interBufSize);
166 167 168
  pResultRow->pageId = pageId;
  pResultRow->offset = (int32_t)pData->num;

169
  *currentPageId = pageId;
wmmhello's avatar
wmmhello 已提交
170
  pData->num += interBufSize;
171 172 173
  return pResultRow;
}

174 175 176 177 178 179 180
/**
 * the struct of key in hash table
 * +----------+---------------+
 * | group id |   key data    |
 * | 8 bytes  | actual length |
 * +----------+---------------+
 */
181 182
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData,
                                   int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo,
D
dapan1121 已提交
183
                                   bool isIntervalQuery, SAggSupporter* pSup, bool keepGroup) {
184
  SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
D
dapan1121 已提交
185 186 187 188
  if (!keepGroup) {
    *(uint64_t*)pSup->keyBuf = calcGroupId(pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
  }
  
dengyihao's avatar
dengyihao 已提交
189
  SResultRowPosition* p1 =
190
      (SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
H
Haojun Liao 已提交
191

192 193
  SResultRow* pResult = NULL;

H
Haojun Liao 已提交
194 195
  // in case of repeat scan/reverse scan, no new time window added.
  if (isIntervalQuery) {
196
    if (p1 != NULL) {  // the *p1 may be NULL in case of sliding+offset exists.
197
      pResult = getResultRowByPos(pResultBuf, p1, true);
198 199 200 201
      if (NULL == pResult) {
        T_LONG_JMP(pTaskInfo->env, terrno);
      }

202
      ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
H
Haojun Liao 已提交
203 204
    }
  } else {
dengyihao's avatar
dengyihao 已提交
205 206
    // In case of group by column query, the required SResultRow object must be existInCurrentResusltRowInfo in the
    // pResultRowInfo object.
H
Haojun Liao 已提交
207
    if (p1 != NULL) {
208
      // todo
209
      pResult = getResultRowByPos(pResultBuf, p1, true);
210 211 212 213
      if (NULL == pResult) {
        T_LONG_JMP(pTaskInfo->env, terrno);
      }

214
      ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
H
Haojun Liao 已提交
215 216 217
    }
  }

L
Liu Jicong 已提交
218
  // 1. close current opened time window
219
  if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId))) {
220
    SResultRowPosition pos = pResultRowInfo->cur;
X
Xiaoyu Wang 已提交
221
    SFilePage*         pPage = getBufPage(pResultBuf, pos.pageId);
222 223 224 225
    if (pPage == NULL) {
      qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
      T_LONG_JMP(pTaskInfo->env, terrno);
    }
226 227 228 229 230
    releaseBufPage(pResultBuf, pPage);
  }

  // allocate a new buffer page
  if (pResult == NULL) {
231
    pResult = getNewResultRow(pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
232 233 234
    if (pResult == NULL) {
      T_LONG_JMP(pTaskInfo->env, terrno);
    }
235

236 237
    // add a new result set for a new group
    SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
238
    tSimpleHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos,
L
Liu Jicong 已提交
239
                   sizeof(SResultRowPosition));
H
Haojun Liao 已提交
240 241
  }

242 243 244
  // 2. set the new time window to be the new active time window
  pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};

H
Haojun Liao 已提交
245
  // too many time window in query
246
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH &&
247
      tSimpleHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) {
248
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
H
Haojun Liao 已提交
249 250
  }

H
Haojun Liao 已提交
251
  return pResult;
H
Haojun Liao 已提交
252 253
}

254
//  query_range_start, query_range_end, window_duration, window_start, window_end
255
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow) {
256 257 258
  pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP;
  pColData->info.bytes = sizeof(int64_t);

H
Haojun Liao 已提交
259
  colInfoDataEnsureCapacity(pColData, 5, false);
260 261
  colDataSetInt64(pColData, 0, &pQueryWindow->skey);
  colDataSetInt64(pColData, 1, &pQueryWindow->ekey);
262 263

  int64_t interval = 0;
264 265 266
  colDataSetInt64(pColData, 2, &interval);  // this value may be variable in case of 'n' and 'y'.
  colDataSetInt64(pColData, 3, &pQueryWindow->skey);
  colDataSetInt64(pColData, 4, &pQueryWindow->ekey);
267 268
}

G
Ganlin Zhao 已提交
269
static void doSetInputDataBlockInfo(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag) {
270 271
  SqlFunctionCtx* pCtx = pExprSup->pCtx;
  for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
272
    pCtx[i].order = order;
273
    pCtx[i].input.numOfRows = pBlock->info.rows;
274
    setBlockSMAInfo(&pCtx[i], &pExprSup->pExprInfo[i], pBlock);
275
    pCtx[i].pSrcBlock = pBlock;
G
Ganlin Zhao 已提交
276
    pCtx[i].scanFlag = scanFlag;
277 278 279
  }
}

280
void setInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol) {
281
  if (pBlock->pBlockAgg != NULL) {
G
Ganlin Zhao 已提交
282
    doSetInputDataBlockInfo(pExprSup, pBlock, order, scanFlag);
283
  } else {
284
    doSetInputDataBlock(pExprSup, pBlock, order, scanFlag, createDummyCol);
H
Haojun Liao 已提交
285
  }
286 287
}

L
Liu Jicong 已提交
288 289
static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t paramIndex,
                                             int32_t numOfRows) {
290 291 292 293 294 295 296 297
  SColumnInfoData* pColInfo = NULL;
  if (pInput->pData[paramIndex] == NULL) {
    pColInfo = taosMemoryCalloc(1, sizeof(SColumnInfoData));
    if (pColInfo == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

    // Set the correct column info (data type and bytes)
298 299
    pColInfo->info.type = pFuncParam->param.nType;
    pColInfo->info.bytes = pFuncParam->param.nLen;
300 301

    pInput->pData[paramIndex] = pColInfo;
302 303
  } else {
    pColInfo = pInput->pData[paramIndex];
304 305
  }

H
Haojun Liao 已提交
306
  colInfoDataEnsureCapacity(pColInfo, numOfRows, false);
307

308
  int8_t type = pFuncParam->param.nType;
309 310
  if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT) {
    int64_t v = pFuncParam->param.i;
dengyihao's avatar
dengyihao 已提交
311
    for (int32_t i = 0; i < numOfRows; ++i) {
312
      colDataSetInt64(pColInfo, i, &v);
313 314 315
    }
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
    double v = pFuncParam->param.d;
dengyihao's avatar
dengyihao 已提交
316
    for (int32_t i = 0; i < numOfRows; ++i) {
317
      colDataSetDouble(pColInfo, i, &v);
318
    }
319
  } else if (type == TSDB_DATA_TYPE_VARCHAR) {
L
Liu Jicong 已提交
320
    char* tmp = taosMemoryMalloc(pFuncParam->param.nLen + VARSTR_HEADER_SIZE);
321
    STR_WITH_SIZE_TO_VARSTR(tmp, pFuncParam->param.pz, pFuncParam->param.nLen);
L
Liu Jicong 已提交
322
    for (int32_t i = 0; i < numOfRows; ++i) {
323
      colDataSetVal(pColInfo, i, tmp, false);
324
    }
H
Haojun Liao 已提交
325
    taosMemoryFree(tmp);
326 327 328 329 330
  }

  return TSDB_CODE_SUCCESS;
}

331 332
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
                                   bool createDummyCol) {
333
  int32_t         code = TSDB_CODE_SUCCESS;
334
  SqlFunctionCtx* pCtx = pExprSup->pCtx;
335

336
  for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
L
Liu Jicong 已提交
337
    pCtx[i].order = order;
338 339
    pCtx[i].input.numOfRows = pBlock->info.rows;

L
Liu Jicong 已提交
340
    pCtx[i].pSrcBlock = pBlock;
X
Xiaoyu Wang 已提交
341
    pCtx[i].scanFlag = scanFlag;
H
Haojun Liao 已提交
342

343
    SInputColumnInfoData* pInput = &pCtx[i].input;
H
Haojun Liao 已提交
344
    pInput->uid = pBlock->info.id.uid;
H
Haojun Liao 已提交
345
    pInput->colDataSMAIsSet = false;
346

347
    SExprInfo* pOneExpr = &pExprSup->pExprInfo[i];
348
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
dengyihao's avatar
dengyihao 已提交
349
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
G
Ganlin Zhao 已提交
350 351
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
        int32_t slotId = pFuncParam->pCol->slotId;
dengyihao's avatar
dengyihao 已提交
352
        pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
353 354 355
        pInput->totalRows = pBlock->info.rows;
        pInput->numOfRows = pBlock->info.rows;
        pInput->startRowIndex = 0;
356

357
        // NOTE: the last parameter is the primary timestamp column
H
Haojun Liao 已提交
358
        // todo: refactor this
359
        if (fmIsImplicitTsFunc(pCtx[i].functionId) && (j == pOneExpr->base.numOfParams - 1)) {
L
Liu Jicong 已提交
360
          pInput->pPTS = pInput->pData[j];  // in case of merge function, this is not always the ts column data.
361
        }
362 363
        ASSERT(pInput->pData[j] != NULL);
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
364 365 366
        // todo avoid case: top(k, 12), 12 is the value parameter.
        // sum(11), 11 is also the value parameter.
        if (createDummyCol && pOneExpr->base.numOfParams == 1) {
367 368 369 370
          pInput->totalRows = pBlock->info.rows;
          pInput->numOfRows = pBlock->info.rows;
          pInput->startRowIndex = 0;

371
          code = doCreateConstantValColumnInfo(pInput, pFuncParam, j, pBlock->info.rows);
372 373 374
          if (code != TSDB_CODE_SUCCESS) {
            return code;
          }
375
        }
G
Ganlin Zhao 已提交
376 377
      }
    }
H
Haojun Liao 已提交
378
  }
379 380

  return code;
H
Haojun Liao 已提交
381 382
}

5
54liuyao 已提交
383
bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
384
  struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
385

386 387 388 389 390
  // in case of timestamp column, always generated results.
  int32_t functionId = pCtx->functionId;
  if (functionId == -1) {
    return false;
  }
391

G
Ganlin Zhao 已提交
392
  if (pCtx->scanFlag == PRE_SCAN) {
393
    return fmIsRepeatScanFunc(pCtx->functionId);
394 395
  }

396 397
  if (isRowEntryCompleted(pResInfo)) {
    return false;
398 399
  }

400 401 402
  return true;
}

H
Haojun Liao 已提交
403
static int32_t doCreateConstantValColumnSMAInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t type,
404 405 406 407 408 409
                                                int32_t paramIndex, int32_t numOfRows) {
  if (pInput->pData[paramIndex] == NULL) {
    pInput->pData[paramIndex] = taosMemoryCalloc(1, sizeof(SColumnInfoData));
    if (pInput->pData[paramIndex] == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
410

411 412 413
    // Set the correct column info (data type and bytes)
    pInput->pData[paramIndex]->info.type = type;
    pInput->pData[paramIndex]->info.bytes = tDataTypes[type].bytes;
414
  }
H
Haojun Liao 已提交
415

416 417 418 419 420 421
  SColumnDataAgg* da = NULL;
  if (pInput->pColumnDataAgg[paramIndex] == NULL) {
    da = taosMemoryCalloc(1, sizeof(SColumnDataAgg));
    pInput->pColumnDataAgg[paramIndex] = da;
    if (da == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
422 423
    }
  } else {
424
    da = pInput->pColumnDataAgg[paramIndex];
425 426
  }

427 428
  if (type == TSDB_DATA_TYPE_BIGINT) {
    int64_t v = pFuncParam->param.i;
429
    *da = (SColumnDataAgg){.numOfNull = 0, .min = v, .max = v, .sum = v * numOfRows};
430 431
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
    double v = pFuncParam->param.d;
432
    *da = (SColumnDataAgg){.numOfNull = 0};
433

434 435 436 437 438 439
    *(double*)&da->min = v;
    *(double*)&da->max = v;
    *(double*)&da->sum = v * numOfRows;
  } else if (type == TSDB_DATA_TYPE_BOOL) {  // todo validate this data type
    bool v = pFuncParam->param.i;

440
    *da = (SColumnDataAgg){.numOfNull = 0};
441 442 443 444 445
    *(bool*)&da->min = 0;
    *(bool*)&da->max = v;
    *(bool*)&da->sum = v * numOfRows;
  } else if (type == TSDB_DATA_TYPE_TIMESTAMP) {
    // do nothing
446
  } else {
H
Haojun Liao 已提交
447
    qError("invalid constant type for sma info");
448 449
  }

450 451
  return TSDB_CODE_SUCCESS;
}
452

453
void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pBlock) {
454 455 456 457 458 459 460
  int32_t numOfRows = pBlock->info.rows;

  SInputColumnInfoData* pInput = &pCtx->input;
  pInput->numOfRows = numOfRows;
  pInput->totalRows = numOfRows;

  if (pBlock->pBlockAgg != NULL) {
H
Haojun Liao 已提交
461
    pInput->colDataSMAIsSet = true;
462

463 464
    for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
      SFunctParam* pFuncParam = &pExprInfo->base.pParam[j];
465

466 467
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
        int32_t slotId = pFuncParam->pCol->slotId;
468 469
        pInput->pColumnDataAgg[j] = pBlock->pBlockAgg[slotId];
        if (pInput->pColumnDataAgg[j] == NULL) {
H
Haojun Liao 已提交
470
          pInput->colDataSMAIsSet = false;
471
        }
472 473 474 475

        // Here we set the column info data since the data type for each column data is required, but
        // the data in the corresponding SColumnInfoData will not be used.
        pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
476
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
H
Haojun Liao 已提交
477
        doCreateConstantValColumnSMAInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows);
478 479
      }
    }
480
  } else {
H
Haojun Liao 已提交
481
    pInput->colDataSMAIsSet = false;
482 483 484
  }
}

485
bool isTaskKilled(SExecTaskInfo* pTaskInfo) { return (0 != pTaskInfo->code);}
486

D
dapan1121 已提交
487
void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode) { pTaskInfo->code = rspCode; }
488 489

/////////////////////////////////////////////////////////////////////////////////////////////
490
STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key) {
L
Liu Jicong 已提交
491
  STimeWindow win = {0};
492
  win.skey = taosTimeTruncate(key, pInterval, precision);
493 494

  /*
H
Haojun Liao 已提交
495
   * if the realSkey > INT64_MAX - pInterval->interval, the query duration between
496 497
   * realSkey and realEkey must be less than one interval.Therefore, no need to adjust the query ranges.
   */
498 499 500
  win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
  if (win.ekey < win.skey) {
    win.ekey = INT64_MAX;
501
  }
502 503

  return win;
504 505
}

L
Liu Jicong 已提交
506
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) {
507
  if (status == TASK_NOT_COMPLETED) {
H
Haojun Liao 已提交
508
    pTaskInfo->status = status;
509 510
  } else {
    // QUERY_NOT_COMPLETED is not compatible with any other status, so clear its position first
511
    CLEAR_QUERY_STATUS(pTaskInfo, TASK_NOT_COMPLETED);
H
Haojun Liao 已提交
512
    pTaskInfo->status |= status;
513 514 515
  }
}

516
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset) {
5
54liuyao 已提交
517
  bool init = false;
518
  for (int32_t i = 0; i < numOfOutput; ++i) {
519
    pCtx[i].resultInfo = getResultEntryInfo(pResult, i, rowEntryInfoOffset);
5
54liuyao 已提交
520 521 522
    if (init) {
      continue;
    }
523 524 525 526 527

    struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
    if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) {
      continue;
    }
528

529
    if (pCtx[i].isPseudoFunc) {
530 531 532
      continue;
    }

533 534 535 536 537 538
    if (!pResInfo->initialized) {
      if (pCtx[i].functionId != -1) {
        pCtx[i].fpSet.init(&pCtx[i], pResInfo);
      } else {
        pResInfo->initialized = true;
      }
5
54liuyao 已提交
539 540
    } else {
      init = true;
541 542 543 544
    }
  }
}

545 546 547 548 549 550 551 552 553 554 555 556 557 558
void clearResultRowInitFlag(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
  for (int32_t i = 0; i < numOfOutput; ++i) {
    SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
    if (pResInfo == NULL) {
      continue;
    }

    pResInfo->initialized = false;
    pResInfo->numOfRes = 0;
    pResInfo->isNullRes = 0;
    pResInfo->complete = false;
  }
}

H
Haojun Liao 已提交
559 560
void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo) {
  if (pFilterInfo == NULL || pBlock->info.rows == 0) {
S
shenglian zhou 已提交
561 562
    return;
  }
563

564
  SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
565
  int32_t            code = filterSetDataFromSlotId(pFilterInfo, &param1);
566

567
  SColumnInfoData* p = NULL;
568
  int32_t          status = 0;
H
Haojun Liao 已提交
569

570
  // todo the keep seems never to be True??
H
Haojun Liao 已提交
571
  bool keep = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status);
572
  extractQualifiedTupleByFilterResult(pBlock, p, keep, status);
H
Haojun Liao 已提交
573

574
  if (pColMatchInfo != NULL) {
575
    size_t size = taosArrayGetSize(pColMatchInfo->pList);
H
Haojun Liao 已提交
576
    for (int32_t i = 0; i < size; ++i) {
H
Haojun Liao 已提交
577
      SColMatchItem* pInfo = taosArrayGet(pColMatchInfo->pList, i);
578
      if (pInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
579
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pInfo->dstSlotId);
580
        if (pColData->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
H
Haojun Liao 已提交
581
          blockDataUpdateTsWindow(pBlock, pInfo->dstSlotId);
582 583 584 585 586 587
          break;
        }
      }
    }
  }

588 589
  colDataDestroy(p);
  taosMemoryFree(p);
590 591
}

592
void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep, int32_t status) {
593 594 595 596
  if (keep) {
    return;
  }

H
Haojun Liao 已提交
597
  int8_t* pIndicator = (int8_t*)p->pData;
H
Haojun Liao 已提交
598 599 600
  int32_t totalRows = pBlock->info.rows;

  if (status == FILTER_RESULT_ALL_QUALIFIED) {
601
    // here nothing needs to be done
H
Haojun Liao 已提交
602
  } else if (status == FILTER_RESULT_NONE_QUALIFIED) {
603
    pBlock->info.rows = 0;
H
Haojun Liao 已提交
604
  } else {
H
Haojun Liao 已提交
605
    int32_t bmLen = BitmapLen(totalRows);
H
Haojun Liao 已提交
606 607
    char*   pBitmap = NULL;
    int32_t maxRows = 0;
608

609 610
    size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
    for (int32_t i = 0; i < numOfCols; ++i) {
611
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
612
      // it is a reserved column for scalar function, and no data in this column yet.
H
Haojun Liao 已提交
613
      if (pDst->pData == NULL) {
614 615 616
        continue;
      }

617
      int32_t numOfRows = 0;
H
Haojun Liao 已提交
618 619
      if (IS_VAR_DATA_TYPE(pDst->info.type)) {
        int32_t j = 0;
620 621
        pDst->varmeta.length = 0;

X
Xiaoyu Wang 已提交
622
        while (j < totalRows) {
H
Haojun Liao 已提交
623
          if (pIndicator[j] == 0) {
624
            j += 1;
H
Haojun Liao 已提交
625 626
            continue;
          }
627

H
Haojun Liao 已提交
628 629 630 631
          if (colDataIsNull_var(pDst, j)) {
            colDataSetNull_var(pDst, numOfRows);
          } else {
            char* p1 = colDataGetVarData(pDst, j);
632
            colDataSetVal(pDst, numOfRows, p1, false);
H
Haojun Liao 已提交
633
          }
H
Haojun Liao 已提交
634 635
          numOfRows += 1;
          j += 1;
D
dapan1121 已提交
636
        }
637

H
Haojun Liao 已提交
638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657
        if (maxRows < numOfRows) {
          maxRows = numOfRows;
        }
      } else {
        if (pBitmap == NULL) {
          pBitmap = taosMemoryCalloc(1, bmLen);
        }

        memcpy(pBitmap, pDst->nullbitmap, bmLen);
        memset(pDst->nullbitmap, 0, bmLen);

        int32_t j = 0;

        switch (pDst->info.type) {
          case TSDB_DATA_TYPE_BIGINT:
          case TSDB_DATA_TYPE_UBIGINT:
          case TSDB_DATA_TYPE_DOUBLE:
          case TSDB_DATA_TYPE_TIMESTAMP:
            while (j < totalRows) {
              if (pIndicator[j] == 0) {
H
Haojun Liao 已提交
658
                j += 1;
H
Haojun Liao 已提交
659 660 661 662 663 664 665
                continue;
              }

              if (colDataIsNull_f(pBitmap, j)) {
                colDataSetNull_f(pDst->nullbitmap, numOfRows);
              } else {
                ((int64_t*)pDst->pData)[numOfRows] = ((int64_t*)pDst->pData)[j];
H
Haojun Liao 已提交
666
              }
H
Haojun Liao 已提交
667 668 669 670 671 672 673 674 675
              numOfRows += 1;
              j += 1;
            }
            break;
          case TSDB_DATA_TYPE_FLOAT:
          case TSDB_DATA_TYPE_INT:
          case TSDB_DATA_TYPE_UINT:
            while (j < totalRows) {
              if (pIndicator[j] == 0) {
H
Haojun Liao 已提交
676
                j += 1;
H
Haojun Liao 已提交
677
                continue;
H
Haojun Liao 已提交
678
              }
H
Haojun Liao 已提交
679 680 681
              if (colDataIsNull_f(pBitmap, j)) {
                colDataSetNull_f(pDst->nullbitmap, numOfRows);
              } else {
H
Haojun Liao 已提交
682
                ((int32_t*)pDst->pData)[numOfRows] = ((int32_t*)pDst->pData)[j];
H
Haojun Liao 已提交
683
              }
H
Haojun Liao 已提交
684 685 686 687 688 689 690 691
              numOfRows += 1;
              j += 1;
            }
            break;
          case TSDB_DATA_TYPE_SMALLINT:
          case TSDB_DATA_TYPE_USMALLINT:
            while (j < totalRows) {
              if (pIndicator[j] == 0) {
H
Haojun Liao 已提交
692
                j += 1;
H
Haojun Liao 已提交
693
                continue;
H
Haojun Liao 已提交
694
              }
H
Haojun Liao 已提交
695 696 697
              if (colDataIsNull_f(pBitmap, j)) {
                colDataSetNull_f(pDst->nullbitmap, numOfRows);
              } else {
H
Haojun Liao 已提交
698
                ((int16_t*)pDst->pData)[numOfRows] = ((int16_t*)pDst->pData)[j];
H
Haojun Liao 已提交
699
              }
H
Haojun Liao 已提交
700 701 702 703 704 705 706 707 708
              numOfRows += 1;
              j += 1;
            }
            break;
          case TSDB_DATA_TYPE_BOOL:
          case TSDB_DATA_TYPE_TINYINT:
          case TSDB_DATA_TYPE_UTINYINT:
            while (j < totalRows) {
              if (pIndicator[j] == 0) {
H
Haojun Liao 已提交
709
                j += 1;
H
Haojun Liao 已提交
710
                continue;
H
Haojun Liao 已提交
711
              }
H
Haojun Liao 已提交
712 713 714 715 716 717 718 719 720
              if (colDataIsNull_f(pBitmap, j)) {
                colDataSetNull_f(pDst->nullbitmap, numOfRows);
              } else {
                ((int8_t*)pDst->pData)[numOfRows] = ((int8_t*)pDst->pData)[j];
              }
              numOfRows += 1;
              j += 1;
            }
            break;
D
dapan1121 已提交
721
        }
H
Haojun Liao 已提交
722
      }
723

H
Haojun Liao 已提交
724 725
      if (maxRows < numOfRows) {
        maxRows = numOfRows;
726
      }
727
    }
728

H
Haojun Liao 已提交
729
    pBlock->info.rows = maxRows;
H
Haojun Liao 已提交
730 731 732
    if (pBitmap != NULL) {
      taosMemoryFree(pBitmap);
    }
733 734 735
  }
}

H
Haojun Liao 已提交
736
void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset) {
737
  bool returnNotNull = false;
738
  for (int32_t j = 0; j < numOfExprs; ++j) {
739
    SResultRowEntryInfo* pResInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
740 741 742 743 744 745 746
    if (!isRowEntryInitialized(pResInfo)) {
      continue;
    }

    if (pRow->numOfRows < pResInfo->numOfRes) {
      pRow->numOfRows = pResInfo->numOfRes;
    }
747

748
    if (pCtx[j].isNotNullFunc) {
749 750
      returnNotNull = true;
    }
751
  }
S
shenglian zhou 已提交
752 753
  // if all expr skips all blocks, e.g. all null inputs for max function, output one row in final result.
  //  except for first/last, which require not null output, output no rows
754
  if (pRow->numOfRows == 0 && !returnNotNull) {
755
    pRow->numOfRows = 1;
756 757 758
  }
}

H
Haojun Liao 已提交
759 760
void copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx,
                              SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo) {
761 762 763
  for (int32_t j = 0; j < numOfExprs; ++j) {
    int32_t slotId = pExprInfo[j].base.resSchema.slotId;

764
    pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
765
    if (pCtx[j].fpSet.finalize) {
766
      if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_group_key") == 0) {
767 768
        // for groupkey along with functions that output multiple lines(e.g. Histogram)
        // need to match groupkey result for each output row of that function.
769 770 771 772 773
        if (pCtx[j].resultInfo->numOfRes != 0) {
          pCtx[j].resultInfo->numOfRes = pRow->numOfRows;
        }
      }

774 775 776
      int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
      if (TAOS_FAILED(code)) {
        qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
777
        T_LONG_JMP(pTaskInfo->env, code);
778 779
      }
    } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
780
      // do nothing
781
    } else {
782 783
      // expand the result into multiple rows. E.g., _wstart, top(k, 20)
      // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
784 785 786
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
      char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
      for (int32_t k = 0; k < pRow->numOfRows; ++k) {
787
        colDataSetVal(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
788 789 790
      }
    }
  }
791 792
}

793 794 795
// todo refactor. SResultRow has direct pointer in miainfo
int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
                           SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
X
Xiaoyu Wang 已提交
796
  SFilePage* page = getBufPage(pBuf, resultRowPosition->pageId);
797 798 799 800 801
  if (page == NULL) {
    qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
    T_LONG_JMP(pTaskInfo->env, terrno);
  }

802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825
  SResultRow* pRow = (SResultRow*)((char*)page + resultRowPosition->offset);

  SqlFunctionCtx* pCtx = pSup->pCtx;
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  const int32_t*  rowEntryOffset = pSup->rowEntryInfoOffset;

  doUpdateNumOfRows(pCtx, pRow, pSup->numOfExprs, rowEntryOffset);
  if (pRow->numOfRows == 0) {
    releaseBufPage(pBuf, page);
    return 0;
  }

  int32_t size = pBlock->info.capacity;
  while (pBlock->info.rows + pRow->numOfRows > size) {
    size = size * 1.25;
  }

  int32_t code = blockDataEnsureCapacity(pBlock, size);
  if (TAOS_FAILED(code)) {
    releaseBufPage(pBuf, page);
    qError("%s ensure result data capacity failed, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
    T_LONG_JMP(pTaskInfo->env, code);
  }

H
Haojun Liao 已提交
826
  copyResultrowToDataBlock(pExprInfo, pSup->numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
827 828

  releaseBufPage(pBuf, page);
829
  pBlock->info.rows += pRow->numOfRows;
830 831 832
  return 0;
}

833 834 835 836 837 838 839
int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
                           SGroupResInfo* pGroupResInfo) {
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  int32_t         numOfExprs = pSup->numOfExprs;
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
  SqlFunctionCtx* pCtx = pSup->pCtx;

840
  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
841

842
  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
L
Liu Jicong 已提交
843 844
    SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
    SFilePage*  page = getBufPage(pBuf, pPos->pos.pageId);
845 846 847 848
    if (page == NULL) {
      qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
      T_LONG_JMP(pTaskInfo->env, terrno);
    }
849

850
    SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset);
851

H
Haojun Liao 已提交
852
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
853 854

    // no results, continue to check the next one
855 856
    if (pRow->numOfRows == 0) {
      pGroupResInfo->index += 1;
857
      releaseBufPage(pBuf, page);
858 859 860
      continue;
    }

H
Haojun Liao 已提交
861 862
    if (pBlock->info.id.groupId == 0) {
      pBlock->info.id.groupId = pPos->groupId;
863 864
    } else {
      // current value belongs to different group, it can't be packed into one datablock
H
Haojun Liao 已提交
865
      if (pBlock->info.id.groupId != pPos->groupId) {
866
        releaseBufPage(pBuf, page);
867 868 869 870
        break;
      }
    }

871
    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
872
      blockDataEnsureCapacity(pBlock, pBlock->info.rows + pRow->numOfRows);
D
dapan1121 已提交
873
      qDebug("datablock capacity not sufficient, expand to required:%" PRId64 ", current capacity:%d, %s",
874 875
             (pRow->numOfRows+pBlock->info.rows),
             pBlock->info.capacity, GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
876
        // todo set the pOperator->resultInfo size
877 878 879
    }

    pGroupResInfo->index += 1;
H
Haojun Liao 已提交
880
    copyResultrowToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
881

882
    releaseBufPage(pBuf, page);
883
    pBlock->info.rows += pRow->numOfRows;
884 885
  }

D
dapan1121 已提交
886
  qDebug("%s result generated, rows:%" PRId64 ", groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
H
Haojun Liao 已提交
887
         pBlock->info.id.groupId);
H
Haojun Liao 已提交
888
  pBlock->info.dataLoad = 1;
889
  blockDataUpdateTsWindow(pBlock, 0);
890 891 892
  return 0;
}

893 894 895 896 897 898 899 900 901 902 903 904 905 906
void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
                           SDiskbasedBuf* pBuf) {
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  SSDataBlock*   pBlock = pbInfo->pRes;

  // set output datablock version
  pBlock->info.version = pTaskInfo->version;

  blockDataCleanup(pBlock);
  if (!hasRemainResults(pGroupResInfo)) {
    return;
  }

  // clear the existed group id
H
Haojun Liao 已提交
907
  pBlock->info.id.groupId = 0;
908 909 910
  ASSERT(!pbInfo->mergeResultBlock);
  doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);

911
  void* tbname = NULL;
H
Haojun Liao 已提交
912
  if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) {
913 914 915
    pBlock->info.parTbName[0] = 0;
  } else {
    memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
916
  }
917
  tdbFree(tbname);
918 919
}

X
Xiaoyu Wang 已提交
920 921
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
                            SDiskbasedBuf* pBuf) {
922
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
923
  SSDataBlock*   pBlock = pbInfo->pRes;
924

925 926 927
  // set output datablock version
  pBlock->info.version = pTaskInfo->version;

928
  blockDataCleanup(pBlock);
929
  if (!hasRemainResults(pGroupResInfo)) {
930 931 932
    return;
  }

933
  // clear the existed group id
H
Haojun Liao 已提交
934
  pBlock->info.id.groupId = 0;
935 936 937
  if (!pbInfo->mergeResultBlock) {
    doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
  } else {
dengyihao's avatar
dengyihao 已提交
938
    while (hasRemainResults(pGroupResInfo)) {
939 940 941
      doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
      if (pBlock->info.rows >= pOperator->resultInfo.threshold) {
        break;
942 943
      }

944
      // clearing group id to continue to merge data that belong to different groups
H
Haojun Liao 已提交
945
      pBlock->info.id.groupId = 0;
946
    }
947 948

    // clear the group id info in SSDataBlock, since the client does not need it
H
Haojun Liao 已提交
949
    pBlock->info.id.groupId = 0;
950 951 952
  }
}

953
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num) {
wafwerar's avatar
wafwerar 已提交
954
  p->pDownstream = taosMemoryCalloc(1, num * POINTER_BYTES);
955 956 957 958 959 960 961
  if (p->pDownstream == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  memcpy(p->pDownstream, pDownstream, num * POINTER_BYTES);
  p->numOfDownstream = num;
  return TSDB_CODE_SUCCESS;
962 963
}

964
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag, bool inheritUsOrder) {
965
  // todo add more information about exchange operation
966
  int32_t type = pOperator->operatorType;
967 968 969
  if (type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN ||
      type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN ||
      type == QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN) {
970 971 972
    *order = TSDB_ORDER_ASC;
    *scanFlag = MAIN_SCAN;
    return TSDB_CODE_SUCCESS;
973
  } else if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) {
974 975 976
    if (!inheritUsOrder) {
      *order = TSDB_ORDER_ASC;
    }
977 978
    *scanFlag = MAIN_SCAN;
    return TSDB_CODE_SUCCESS;
979
  } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
980
    STableScanInfo* pTableScanInfo = pOperator->info;
H
Haojun Liao 已提交
981 982
    *order = pTableScanInfo->base.cond.order;
    *scanFlag = pTableScanInfo->base.scanFlag;
983
    return TSDB_CODE_SUCCESS;
984 985
  } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN) {
    STableMergeScanInfo* pTableScanInfo = pOperator->info;
H
Haojun Liao 已提交
986 987
    *order = pTableScanInfo->base.cond.order;
    *scanFlag = pTableScanInfo->base.scanFlag;
988
    return TSDB_CODE_SUCCESS;
989
  } else {
H
Haojun Liao 已提交
990
    if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) {
991
      return TSDB_CODE_INVALID_PARA;
H
Haojun Liao 已提交
992
    } else {
993
      return getTableScanInfo(pOperator->pDownstream[0], order, scanFlag, inheritUsOrder);
994 995 996
    }
  }
}
997

998 999 1000 1001
//QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
SOperatorInfo* extractOperatorInTree(SOperatorInfo* pOperator, int32_t type, const char* id) {
  if (pOperator == NULL) {
    qError("invalid operator, failed to find tableScanOperator %s", id);
H
Haojun Liao 已提交
1002 1003 1004
    return NULL;
  }

1005 1006 1007 1008 1009 1010
  if (pOperator->operatorType == type) {
    return pOperator;
  } else {
    if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) {
      qError("invalid operator, failed to find tableScanOperator %s", id);
      return NULL;
S
slzhou 已提交
1011
    }
1012

1013
    return extractOperatorInTree(pOperator->pDownstream[0], type, id);
S
slzhou 已提交
1014
  }
1015 1016
}

1017
void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
C
Cary Xu 已提交
1018 1019 1020 1021 1022
  for (int32_t i = 0; i < numOfExprs; ++i) {
    SExprInfo* pExprInfo = &pExpr[i];
    for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
      if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_COLUMN) {
        taosMemoryFreeClear(pExprInfo->base.pParam[j].pCol);
5
54liuyao 已提交
1023 1024
      } else if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_VALUE) {
        taosVariantDestroy(&pExprInfo->base.pParam[j].param);
H
Haojun Liao 已提交
1025
      }
1026
    }
C
Cary Xu 已提交
1027 1028 1029

    taosMemoryFree(pExprInfo->base.pParam);
    taosMemoryFree(pExprInfo->pExpr);
H
Haojun Liao 已提交
1030 1031 1032
  }
}

5
54liuyao 已提交
1033
void destroyOperatorInfo(SOperatorInfo* pOperator) {
1034 1035 1036 1037
  if (pOperator == NULL) {
    return;
  }

1038
  if (pOperator->fpSet.closeFn != NULL) {
1039
    pOperator->fpSet.closeFn(pOperator->info);
1040 1041
  }

H
Haojun Liao 已提交
1042
  if (pOperator->pDownstream != NULL) {
L
Liu Jicong 已提交
1043
    for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
H
Haojun Liao 已提交
1044
      destroyOperatorInfo(pOperator->pDownstream[i]);
1045 1046
    }

wafwerar's avatar
wafwerar 已提交
1047
    taosMemoryFreeClear(pOperator->pDownstream);
H
Haojun Liao 已提交
1048
    pOperator->numOfDownstream = 0;
1049 1050
  }

1051
  cleanupExprSupp(&pOperator->exprSupp);
wafwerar's avatar
wafwerar 已提交
1052
  taosMemoryFreeClear(pOperator);
1053 1054
}

1055 1056 1057
// each operator should be set their own function to return total cost buffer
int32_t optrDefaultBufFn(SOperatorInfo* pOperator) {
  if (pOperator->blocking) {
H
Haojun Liao 已提交
1058
    return -1;
1059 1060 1061 1062 1063
  } else {
    return 0;
  }
}

1064 1065 1066 1067 1068 1069
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz) {
  *defaultPgsz = 4096;
  while (*defaultPgsz < rowSize * 4) {
    *defaultPgsz <<= 1u;
  }

1070
  // The default buffer for each operator in query is 10MB.
1071
  // at least four pages need to be in buffer
1072 1073
  // TODO: make this variable to be configurable.
  *defaultBufsz = 4096 * 2560;
1074 1075 1076 1077 1078 1079 1080
  if ((*defaultBufsz) <= (*defaultPgsz)) {
    (*defaultBufsz) = (*defaultPgsz) * 4;
  }

  return 0;
}

L
Liu Jicong 已提交
1081
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows) {
H
Haojun Liao 已提交
1082 1083 1084 1085
  if (numOfRows == 0) {
    numOfRows = 4096;
  }

1086 1087
  pResultInfo->capacity = numOfRows;
  pResultInfo->threshold = numOfRows * 0.75;
1088

1089 1090
  if (pResultInfo->threshold == 0) {
    pResultInfo->threshold = numOfRows;
1091 1092 1093
  }
}

1094 1095 1096 1097 1098
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock) {
  pInfo->pRes = pBlock;
  initResultRowInfo(&pInfo->resultRowInfo);
}

1099
static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
1100 1101 1102 1103 1104 1105 1106 1107 1108 1109
  if (pCtx == NULL) {
    return NULL;
  }

  for (int32_t i = 0; i < numOfOutput; ++i) {
    for (int32_t j = 0; j < pCtx[i].numOfParams; ++j) {
      taosVariantDestroy(&pCtx[i].param[j].param);
    }

    taosMemoryFreeClear(pCtx[i].subsidiaries.pCtx);
1110
    taosMemoryFreeClear(pCtx[i].subsidiaries.buf);
1111 1112
    taosMemoryFree(pCtx[i].input.pData);
    taosMemoryFree(pCtx[i].input.pColumnDataAgg);
1113 1114 1115 1116

    if (pCtx[i].udfName != NULL) {
      taosMemoryFree(pCtx[i].udfName);
    }
1117 1118 1119 1120 1121 1122
  }

  taosMemoryFreeClear(pCtx);
  return NULL;
}

1123
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr) {
1124 1125 1126 1127
  pSup->pExprInfo = pExprInfo;
  pSup->numOfExprs = numOfExpr;
  if (pSup->pExprInfo != NULL) {
    pSup->pCtx = createSqlFunctionCtx(pExprInfo, numOfExpr, &pSup->rowEntryInfoOffset);
1128 1129 1130
    if (pSup->pCtx == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
1131
  }
1132 1133

  return TSDB_CODE_SUCCESS;
1134 1135
}

1136 1137 1138 1139
void cleanupExprSupp(SExprSupp* pSupp) {
  destroySqlFunctionCtx(pSupp->pCtx, pSupp->numOfExprs);
  if (pSupp->pExprInfo != NULL) {
    destroyExprInfo(pSupp->pExprInfo, pSupp->numOfExprs);
C
Cary Xu 已提交
1140
    taosMemoryFreeClear(pSupp->pExprInfo);
1141
  }
H
Haojun Liao 已提交
1142 1143 1144 1145 1146 1147

  if (pSupp->pFilterInfo != NULL) {
    filterFreeInfo(pSupp->pFilterInfo);
    pSupp->pFilterInfo = NULL;
  }

1148 1149 1150
  taosMemoryFree(pSupp->rowEntryInfoOffset);
}

X
Xiaoyu Wang 已提交
1151
void cleanupBasicInfo(SOptrBasicInfo* pInfo) { pInfo->pRes = blockDataDestroy(pInfo->pRes); }
1152

H
Haojun Liao 已提交
1153
char* buildTaskId(uint64_t taskId, uint64_t queryId) {
1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167
  char* p = taosMemoryMalloc(64);

  int32_t offset = 6;
  memcpy(p, "TID:0x", offset);
  offset += tintToHex(taskId, &p[offset]);

  memcpy(&p[offset], " QID:0x", 7);
  offset += 7;
  offset += tintToHex(queryId, &p[offset]);

  p[offset] = 0;
  return p;
}

1168
SExecTaskInfo* doCreateExecTaskInfo(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model, char* dbFName) {
wafwerar's avatar
wafwerar 已提交
1169
  SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
H
Haojun Liao 已提交
1170 1171 1172 1173 1174
  if (pTaskInfo == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

1175
  setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
1176
  pTaskInfo->cost.created = taosGetTimestampUs();
H
Haojun Liao 已提交
1177

1178
  pTaskInfo->schemaInfo.dbname = taosStrdup(dbFName);
dengyihao's avatar
dengyihao 已提交
1179
  pTaskInfo->execModel = model;
D
dapan1121 已提交
1180
  pTaskInfo->stopInfo.pStopInfo = taosArrayInit(4, sizeof(SExchangeOpStopInfo));
H
Haojun Liao 已提交
1181
  pTaskInfo->pResultBlockList = taosArrayInit(128, POINTER_BYTES);
H
Haojun Liao 已提交
1182

1183
  taosInitRWLatch(&pTaskInfo->lock);
1184
  pTaskInfo->id.vgId = vgId;
1185
  pTaskInfo->id.queryId = queryId;
1186
  pTaskInfo->id.str = buildTaskId(taskId, queryId);
1187 1188
  return pTaskInfo;
}
H
Haojun Liao 已提交
1189

1190
int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, SExecTaskInfo* pTaskInfo) {
1191 1192
  SMetaReader mr = {0};
  metaReaderInit(&mr, pHandle->meta, 0);
H
Haojun Liao 已提交
1193
  int32_t code = metaGetTableEntryByUidCache(&mr, pScanNode->uid);
1194
  if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
1195 1196
    qError("failed to get the table meta, uid:0x%" PRIx64 ", suid:0x%" PRIx64 ", %s", pScanNode->uid, pScanNode->suid,
           GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
1197

D
dapan1121 已提交
1198
    metaReaderClear(&mr);
1199
    return terrno;
D
dapan1121 已提交
1200
  }
1201

1202
  SSchemaInfo* pSchemaInfo = &pTaskInfo->schemaInfo;
1203
  pSchemaInfo->tablename = taosStrdup(mr.me.name);
1204 1205

  if (mr.me.type == TSDB_SUPER_TABLE) {
1206 1207
    pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
    pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version;
1208
  } else if (mr.me.type == TSDB_CHILD_TABLE) {
1209 1210
    tDecoderClear(&mr.coder);

1211
    tb_uid_t suid = mr.me.ctbEntry.suid;
H
Haojun Liao 已提交
1212 1213
    code = metaGetTableEntryByUidCache(&mr, suid);
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1214
      metaReaderClear(&mr);
H
Haojun Liao 已提交
1215 1216 1217
      return terrno;
    }

1218 1219
    pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
    pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version;
1220
  } else {
1221
    pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow);
1222
  }
1223 1224

  metaReaderClear(&mr);
1225

H
Haojun Liao 已提交
1226 1227 1228 1229 1230
  pSchemaInfo->qsw = extractQueriedColumnSchema(pScanNode);
  return TSDB_CODE_SUCCESS;
}

SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) {
1231 1232 1233
  int32_t numOfCols = LIST_LENGTH(pScanNode->pScanCols);
  int32_t numOfTags = LIST_LENGTH(pScanNode->pScanPseudoCols);

1234
  SSchemaWrapper* pqSw = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
1235
  pqSw->pSchema = taosMemoryCalloc(numOfCols + numOfTags, sizeof(SSchema));
1236

L
Liu Jicong 已提交
1237
  for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
1238
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pScanNode->pScanCols, i);
1239 1240
    SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;

H
Haojun Liao 已提交
1241 1242 1243
    SSchema* pSchema = &pqSw->pSchema[pqSw->nCols++];
    pSchema->colId = pColNode->colId;
    pSchema->type = pColNode->node.resType.type;
H
Haojun Liao 已提交
1244 1245
    pSchema->bytes = pColNode->node.resType.bytes;
    tstrncpy(pSchema->name, pColNode->colName, tListLen(pSchema->name));
1246 1247
  }

1248
  // this the tags and pseudo function columns, we only keep the tag columns
1249
  for (int32_t i = 0; i < numOfTags; ++i) {
1250 1251 1252 1253 1254 1255 1256 1257 1258
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pScanNode->pScanPseudoCols, i);

    int32_t type = nodeType(pNode->pExpr);
    if (type == QUERY_NODE_COLUMN) {
      SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;

      SSchema* pSchema = &pqSw->pSchema[pqSw->nCols++];
      pSchema->colId = pColNode->colId;
      pSchema->type = pColNode->node.resType.type;
H
Haojun Liao 已提交
1259
      pSchema->bytes = pColNode->node.resType.bytes;
H
Haojun Liao 已提交
1260
      tstrncpy(pSchema->name, pColNode->colName, tListLen(pSchema->name));
1261 1262 1263
    }
  }

H
Haojun Liao 已提交
1264
  return pqSw;
1265 1266
}

1267 1268
static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) {
  taosMemoryFreeClear(pSchemaInfo->dbname);
1269
  taosMemoryFreeClear(pSchemaInfo->tablename);
1270 1271
  tDeleteSSchemaWrapper(pSchemaInfo->sw);
  tDeleteSSchemaWrapper(pSchemaInfo->qsw);
1272 1273
}

1274
static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) { tDeleteSSchemaWrapper(pStreamInfo->schema); }
1275

1276
bool groupbyTbname(SNodeList* pGroupList) {
1277
  bool bytbname = false;
1278
  if (LIST_LENGTH(pGroupList) == 1) {
1279 1280 1281 1282 1283 1284 1285 1286 1287 1288
    SNode* p = nodesListGetNode(pGroupList, 0);
    if (p->type == QUERY_NODE_FUNCTION) {
      // partition by tbname/group by tbname
      bytbname = (strcmp(((struct SFunctionNode*)p)->functionName, "tbname") == 0);
    }
  }

  return bytbname;
}

1289 1290
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond,
                                  SNode* pTagIndexCond, const char* pUser) {
1291 1292
  int32_t         type = nodeType(pPhyNode);
  const char*     idstr = GET_TASKID(pTaskInfo);
1293

X
Xiaoyu Wang 已提交
1294
  if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
1295
    SOperatorInfo* pOperator = NULL;
H
Haojun Liao 已提交
1296
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
dengyihao's avatar
dengyihao 已提交
1297
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
1298

1299 1300 1301 1302 1303 1304
      // NOTE: this is an patch to fix the physical plan
      // TODO remove it later
      if (pTableScanNode->scan.node.pLimit != NULL) {
        pTableScanNode->groupSort = true;
      }

1305
      STableListInfo* pTableListInfo = tableListCreate();
L
Liu Jicong 已提交
1306 1307
      int32_t code =
          createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle,
H
Haojun Liao 已提交
1308
                                  pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
1309
      if (code) {
wmmhello's avatar
wmmhello 已提交
1310
        pTaskInfo->code = code;
H
Haojun Liao 已提交
1311
        tableListDestroy(pTableListInfo);
1312
        qError("failed to createScanTableListInfo, code:%s, %s", tstrerror(code), idstr);
D
dapan1121 已提交
1313 1314
        return NULL;
      }
wmmhello's avatar
wmmhello 已提交
1315

1316
      code = extractTableSchemaInfo(pHandle, &pTableScanNode->scan, pTaskInfo);
S
slzhou 已提交
1317
      if (code) {
1318
        pTaskInfo->code = terrno;
H
Haojun Liao 已提交
1319
        tableListDestroy(pTableListInfo);
wmmhello's avatar
wmmhello 已提交
1320 1321 1322
        return NULL;
      }

1323
      pOperator = createTableScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo);
D
dapan1121 已提交
1324 1325 1326 1327 1328
      if (NULL == pOperator) {
        pTaskInfo->code = terrno;
        return NULL;
      }

1329
      STableScanInfo* pScanInfo = pOperator->info;
H
Haojun Liao 已提交
1330
      pTaskInfo->cost.pRecoder = &pScanInfo->base.readRecorder;
S
slzhou 已提交
1331 1332
    } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
      STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
1333
      STableListInfo* pTableListInfo = tableListCreate();
H
Haojun Liao 已提交
1334 1335 1336

      int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, true, pHandle,
                                             pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
L
Liu Jicong 已提交
1337
      if (code) {
wmmhello's avatar
wmmhello 已提交
1338
        pTaskInfo->code = code;
1339
        tableListDestroy(pTableListInfo);
H
Haojun Liao 已提交
1340
        qError("failed to createScanTableListInfo, code: %s", tstrerror(code));
wmmhello's avatar
wmmhello 已提交
1341 1342
        return NULL;
      }
1343

1344
      code = extractTableSchemaInfo(pHandle, &pTableScanNode->scan, pTaskInfo);
wmmhello's avatar
wmmhello 已提交
1345 1346
      if (code) {
        pTaskInfo->code = terrno;
1347
        tableListDestroy(pTableListInfo);
wmmhello's avatar
wmmhello 已提交
1348 1349
        return NULL;
      }
wmmhello's avatar
wmmhello 已提交
1350

1351
      pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo);
D
dapan1121 已提交
1352 1353
      if (NULL == pOperator) {
        pTaskInfo->code = terrno;
1354
        tableListDestroy(pTableListInfo);
D
dapan1121 已提交
1355 1356
        return NULL;
      }
wmmhello's avatar
wmmhello 已提交
1357

1358
      STableScanInfo* pScanInfo = pOperator->info;
H
Haojun Liao 已提交
1359
      pTaskInfo->cost.pRecoder = &pScanInfo->base.readRecorder;
H
Haojun Liao 已提交
1360
    } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
1361 1362
      pOperator = createExchangeOperatorInfo(pHandle ? pHandle->pMsgCb->clientRpc : NULL, (SExchangePhysiNode*)pPhyNode,
                                             pTaskInfo);
H
Haojun Liao 已提交
1363
    } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
5
54liuyao 已提交
1364
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
1365 1366
      STableListInfo* pTableListInfo = tableListCreate();

5
54liuyao 已提交
1367
      if (pHandle->vnode) {
L
Liu Jicong 已提交
1368 1369
        int32_t code =
            createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort,
H
Haojun Liao 已提交
1370
                                    pHandle, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
L
Liu Jicong 已提交
1371
        if (code) {
wmmhello's avatar
wmmhello 已提交
1372
          pTaskInfo->code = code;
1373
          tableListDestroy(pTableListInfo);
H
Haojun Liao 已提交
1374
          qError("failed to createScanTableListInfo, code: %s", tstrerror(code));
wmmhello's avatar
wmmhello 已提交
1375 1376
          return NULL;
        }
1377
      }
1378

H
Haojun Liao 已提交
1379
      pTaskInfo->schemaInfo.qsw = extractQueriedColumnSchema(&pTableScanNode->scan);
1380
      pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTableListInfo, pTaskInfo);
H
Haojun Liao 已提交
1381
    } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
L
Liu Jicong 已提交
1382
      SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
1383
      pOperator = createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pUser, pTaskInfo);
S
shenglian zhou 已提交
1384 1385 1386
    } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN == type) {
      STableCountScanPhysiNode* pTblCountScanNode = (STableCountScanPhysiNode*)pPhyNode;
      pOperator = createTableCountScanOperatorInfo(pHandle, pTblCountScanNode, pTaskInfo);
1387
    } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
X
Xiaoyu Wang 已提交
1388
      STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode;
1389
      STableListInfo* pTableListInfo = tableListCreate();
1390
      int32_t code = createScanTableListInfo(pScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond,
H
Haojun Liao 已提交
1391
                                             pTagIndexCond, pTaskInfo);
1392
      if (code != TSDB_CODE_SUCCESS) {
1393
        pTaskInfo->code = code;
H
Haojun Liao 已提交
1394
        qError("failed to getTableList, code: %s", tstrerror(code));
1395 1396 1397
        return NULL;
      }

1398
      pOperator = createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo);
1399
    } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) {
1400
      SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode;
1401
      STableListInfo* pTableListInfo = tableListCreate();
1402 1403

      if (pBlockNode->tableType == TSDB_SUPER_TABLE) {
H
Haojun Liao 已提交
1404 1405
        SArray* pList = taosArrayInit(4, sizeof(STableKeyInfo));
        int32_t code = vnodeGetAllTableList(pHandle->vnode, pBlockNode->uid, pList);
1406 1407 1408 1409
        if (code != TSDB_CODE_SUCCESS) {
          pTaskInfo->code = terrno;
          return NULL;
        }
H
Haojun Liao 已提交
1410

H
Haojun Liao 已提交
1411 1412
        size_t num = taosArrayGetSize(pList);
        for (int32_t i = 0; i < num; ++i) {
H
Haojun Liao 已提交
1413
          STableKeyInfo* p = taosArrayGet(pList, i);
H
Haojun Liao 已提交
1414
          tableListAddTableInfo(pTableListInfo, p->uid, 0);
H
Haojun Liao 已提交
1415
        }
H
Haojun Liao 已提交
1416

H
Haojun Liao 已提交
1417
        taosArrayDestroy(pList);
1418
      } else {  // Create group with only one table
H
Haojun Liao 已提交
1419
        tableListAddTableInfo(pTableListInfo, pBlockNode->uid, 0);
1420 1421
      }

1422
      pOperator = createDataBlockInfoScanOperator(pHandle, pBlockNode, pTableListInfo, pTaskInfo);
H
Haojun Liao 已提交
1423 1424
    } else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) {
      SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode;
1425
      STableListInfo* pTableListInfo = tableListCreate();
H
Haojun Liao 已提交
1426

L
Liu Jicong 已提交
1427
      int32_t code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo,
H
Haojun Liao 已提交
1428
                                             pTagCond, pTagIndexCond, pTaskInfo);
1429 1430 1431 1432
      if (code != TSDB_CODE_SUCCESS) {
        pTaskInfo->code = code;
        return NULL;
      }
1433

1434
      code = extractTableSchemaInfo(pHandle, &pScanNode->scan, pTaskInfo);
1435 1436 1437
      if (code != TSDB_CODE_SUCCESS) {
        pTaskInfo->code = code;
        return NULL;
H
Haojun Liao 已提交
1438 1439
      }

1440
      pOperator = createCacherowsScanOperator(pScanNode, pHandle, pTableListInfo, pTaskInfo);
1441
    } else if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
1442
      pOperator = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo);
H
Haojun Liao 已提交
1443
    } else {
H
Haojun Liao 已提交
1444 1445
      terrno = TSDB_CODE_INVALID_PARA;
      return NULL;
H
Haojun Liao 已提交
1446
    }
1447

1448
    if (pOperator != NULL) {  // todo moved away
1449 1450 1451
      pOperator->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId;
    }

1452
    return pOperator;
H
Haojun Liao 已提交
1453 1454
  }

1455
  size_t          size = LIST_LENGTH(pPhyNode->pChildren);
1456
  SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES);
1457 1458 1459 1460
  if (ops == NULL) {
    return NULL;
  }

dengyihao's avatar
dengyihao 已提交
1461
  for (int32_t i = 0; i < size; ++i) {
1462
    SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
1463
    ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, pTagCond, pTagIndexCond, pUser);
1464
    if (ops[i] == NULL) {
H
Haojun Liao 已提交
1465
      taosMemoryFree(ops);
1466 1467
      return NULL;
    }
1468
  }
H
Haojun Liao 已提交
1469

1470
  SOperatorInfo* pOptr = NULL;
H
Haojun Liao 已提交
1471
  if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
1472
    pOptr = createProjectOperatorInfo(ops[0], (SProjectPhysiNode*)pPhyNode, pTaskInfo);
1473
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_AGG == type) {
H
Haojun Liao 已提交
1474 1475
    SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode;
    if (pAggNode->pGroupKeys != NULL) {
H
Haojun Liao 已提交
1476
      pOptr = createGroupOperatorInfo(ops[0], pAggNode, pTaskInfo);
H
Haojun Liao 已提交
1477
    } else {
H
Haojun Liao 已提交
1478
      pOptr = createAggregateOperatorInfo(ops[0], pAggNode, pTaskInfo);
H
Haojun Liao 已提交
1479
    }
1480
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL == type) {
H
Haojun Liao 已提交
1481
    SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
5
54liuyao 已提交
1482
    pOptr = createIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo);
1483
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type) {
1484
    pOptr = createStreamIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo);
1485 1486
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == type) {
    SMergeAlignedIntervalPhysiNode* pIntervalPhyNode = (SMergeAlignedIntervalPhysiNode*)pPhyNode;
1487
    pOptr = createMergeAlignedIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo);
S
shenglian zhou 已提交
1488
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) {
X
Xiaoyu Wang 已提交
1489
    SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode;
1490
    pOptr = createMergeIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo);
5
54liuyao 已提交
1491
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) {
1492
    int32_t children = 0;
5
54liuyao 已提交
1493 1494
    pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL == type) {
5
54liuyao 已提交
1495
    int32_t children = pHandle->numOfVgroups;
5
54liuyao 已提交
1496
    pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
H
Haojun Liao 已提交
1497
  } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
1498
    pOptr = createSortOperatorInfo(ops[0], (SSortPhysiNode*)pPhyNode, pTaskInfo);
S
shenglian zhou 已提交
1499 1500
  } else if (QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT == type) {
    pOptr = createGroupSortOperatorInfo(ops[0], (SGroupSortPhysiNode*)pPhyNode, pTaskInfo);
X
Xiaoyu Wang 已提交
1501
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE == type) {
1502
    SMergePhysiNode* pMergePhyNode = (SMergePhysiNode*)pPhyNode;
1503
    pOptr = createMultiwayMergeOperatorInfo(ops, size, pMergePhyNode, pTaskInfo);
1504
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION == type) {
H
Haojun Liao 已提交
1505
    SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
1506
    pOptr = createSessionAggOperatorInfo(ops[0], pSessionNode, pTaskInfo);
1507
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION == type) {
1508 1509 1510 1511 1512
    pOptr = createStreamSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION == type) {
    int32_t children = 0;
    pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION == type) {
1513
    int32_t children = pHandle->numOfVgroups;
1514
    pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
H
Haojun Liao 已提交
1515
  } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
1516
    pOptr = createPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo);
1517
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION == type) {
1518
    pOptr = createStreamPartitionOperatorInfo(ops[0], (SStreamPartitionPhysiNode*)pPhyNode, pTaskInfo);
1519
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE == type) {
dengyihao's avatar
dengyihao 已提交
1520
    SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*)pPhyNode;
1521
    pOptr = createStatewindowOperatorInfo(ops[0], pStateNode, pTaskInfo);
1522
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) {
5
54liuyao 已提交
1523
    pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
1524
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) {
1525
    pOptr = createMergeJoinOperatorInfo(ops, size, (SSortMergeJoinPhysiNode*)pPhyNode, pTaskInfo);
1526
  } else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) {
H
Haojun Liao 已提交
1527
    pOptr = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, pTaskInfo);
5
54liuyao 已提交
1528 1529
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL == type) {
    pOptr = createStreamFillOperatorInfo(ops[0], (SStreamFillPhysiNode*)pPhyNode, pTaskInfo);
H
Haojun Liao 已提交
1530 1531
  } else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) {
    pOptr = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo);
1532 1533
  } else if (QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC == type) {
    pOptr = createTimeSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo);
H
Haojun Liao 已提交
1534 1535
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT == type) {
    pOptr = createEventwindowOperatorInfo(ops[0], pPhyNode, pTaskInfo);
H
Haojun Liao 已提交
1536
  } else {
H
Haojun Liao 已提交
1537
    terrno = TSDB_CODE_INVALID_PARA;
H
Haojun Liao 已提交
1538
    taosMemoryFree(ops);
H
Haojun Liao 已提交
1539
    return NULL;
H
Haojun Liao 已提交
1540
  }
1541

1542
  taosMemoryFree(ops);
1543 1544 1545 1546
  if (pOptr) {
    pOptr->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId;
  }

1547
  return pOptr;
1548
}
H
Haojun Liao 已提交
1549

L
Liu Jicong 已提交
1550 1551 1552 1553
static int32_t extractTbscanInStreamOpTree(SOperatorInfo* pOperator, STableScanInfo** ppInfo) {
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
    if (pOperator->numOfDownstream == 0) {
      qError("failed to find stream scan operator");
S
Shengliang Guan 已提交
1554
      return TSDB_CODE_APP_ERROR;
L
Liu Jicong 已提交
1555 1556 1557 1558
    }

    if (pOperator->numOfDownstream > 1) {
      qError("join not supported for stream block scan");
S
Shengliang Guan 已提交
1559
      return TSDB_CODE_APP_ERROR;
L
Liu Jicong 已提交
1560 1561 1562
    }
    return extractTbscanInStreamOpTree(pOperator->pDownstream[0], ppInfo);
  } else {
1563 1564
    SStreamScanInfo* pInfo = pOperator->info;
    *ppInfo = pInfo->pTableScanOp->info;
L
Liu Jicong 已提交
1565 1566 1567 1568
    return 0;
  }
}

1569 1570 1571 1572 1573 1574
int32_t extractTableScanNode(SPhysiNode* pNode, STableScanPhysiNode** ppNode) {
  if (pNode->pChildren == NULL || LIST_LENGTH(pNode->pChildren) == 0) {
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == pNode->type) {
      *ppNode = (STableScanPhysiNode*)pNode;
      return 0;
    } else {
S
Shengliang Guan 已提交
1575
      terrno = TSDB_CODE_APP_ERROR;
1576 1577 1578 1579
      return -1;
    }
  } else {
    if (LIST_LENGTH(pNode->pChildren) != 1) {
S
Shengliang Guan 已提交
1580
      terrno = TSDB_CODE_APP_ERROR;
1581 1582 1583 1584 1585 1586 1587 1588
      return -1;
    }
    SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pNode->pChildren, 0);
    return extractTableScanNode(pChildNode, ppNode);
  }
  return -1;
}

1589
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, STableListInfo* pTableListInfo, SReadHandle* readHandle) {
D
dapan1121 已提交
1590
  switch (pNode->type) {
D
dapan1121 已提交
1591 1592 1593 1594 1595 1596
    case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: {
      SInserterParam* pInserterParam = taosMemoryCalloc(1, sizeof(SInserterParam));
      if (NULL == pInserterParam) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      pInserterParam->readHandle = readHandle;
L
Liu Jicong 已提交
1597

D
dapan1121 已提交
1598 1599 1600
      *pParam = pInserterParam;
      break;
    }
D
dapan1121 已提交
1601
    case QUERY_NODE_PHYSICAL_PLAN_DELETE: {
1602
      SDeleterParam* pDeleterParam = taosMemoryCalloc(1, sizeof(SDeleterParam));
D
dapan1121 已提交
1603 1604 1605
      if (NULL == pDeleterParam) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
1606 1607 1608

      int32_t tbNum = tableListGetSize(pTableListInfo);
      pDeleterParam->suid = tableListGetSuid(pTableListInfo);
H
Haojun Liao 已提交
1609 1610

      // TODO extract uid list
D
dapan1121 已提交
1611 1612 1613 1614 1615
      pDeleterParam->pUidList = taosArrayInit(tbNum, sizeof(uint64_t));
      if (NULL == pDeleterParam->pUidList) {
        taosMemoryFree(pDeleterParam);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
H
Haojun Liao 已提交
1616

D
dapan1121 已提交
1617
      for (int32_t i = 0; i < tbNum; ++i) {
1618
        STableKeyInfo* pTable = tableListGetInfo(pTableListInfo, i);
D
dapan1121 已提交
1619 1620 1621 1622
        taosArrayPush(pDeleterParam->pUidList, &pTable->uid);
      }

      *pParam = pDeleterParam;
1623

D
dapan1121 已提交
1624 1625 1626 1627 1628 1629 1630 1631 1632
      break;
    }
    default:
      break;
  }

  return TSDB_CODE_SUCCESS;
}

1633 1634 1635
int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
                               int32_t vgId, char* sql, EOPTR_EXEC_MODEL model) {
  *pTaskInfo = doCreateExecTaskInfo(pPlan->id.queryId, taskId, vgId, model, pPlan->dbFName);
H
Haojun Liao 已提交
1636 1637 1638
  if (*pTaskInfo == NULL) {
    goto _complete;
  }
H
Haojun Liao 已提交
1639

1640 1641 1642 1643
  if (pHandle) {
    if (pHandle->pStateBackend) {
      (*pTaskInfo)->streamInfo.pState = pHandle->pStateBackend;
    }
H
Haojun Liao 已提交
1644 1645
  }

1646
  (*pTaskInfo)->sql = sql;
D
dapan1121 已提交
1647
  sql = NULL;
H
Haojun Liao 已提交
1648

1649
  (*pTaskInfo)->pSubplan = pPlan;
1650 1651
  (*pTaskInfo)->pRoot =
      createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user);
L
Liu Jicong 已提交
1652

D
dapan1121 已提交
1653
  if (NULL == (*pTaskInfo)->pRoot) {
H
Haojun Liao 已提交
1654
    terrno = (*pTaskInfo)->code;
D
dapan1121 已提交
1655
    goto _complete;
1656 1657
  }

H
Haojun Liao 已提交
1658
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1659

H
Haojun Liao 已提交
1660
_complete:
D
dapan1121 已提交
1661
  taosMemoryFree(sql);
H
Haojun Liao 已提交
1662
  doDestroyTask(*pTaskInfo);
H
Haojun Liao 已提交
1663
  return terrno;
H
Haojun Liao 已提交
1664 1665
}

H
Haojun Liao 已提交
1666 1667 1668 1669 1670
static void freeBlock(void* pParam) {
  SSDataBlock* pBlock = *(SSDataBlock**)pParam;
  blockDataDestroy(pBlock);
}

L
Liu Jicong 已提交
1671
void doDestroyTask(SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
1672
  qDebug("%s execTask is freed", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
1673
  destroyOperatorInfo(pTaskInfo->pRoot);
1674
  cleanupTableSchemaInfo(&pTaskInfo->schemaInfo);
1675
  cleanupStreamInfo(&pTaskInfo->streamInfo);
1676

D
dapan1121 已提交
1677
  if (!pTaskInfo->localFetch.localExec) {
D
dapan1121 已提交
1678 1679
    nodesDestroyNode((SNode*)pTaskInfo->pSubplan);
  }
1680

H
Haojun Liao 已提交
1681
  taosArrayDestroyEx(pTaskInfo->pResultBlockList, freeBlock);
D
dapan1121 已提交
1682
  taosArrayDestroy(pTaskInfo->stopInfo.pStopInfo);
wafwerar's avatar
wafwerar 已提交
1683 1684 1685
  taosMemoryFreeClear(pTaskInfo->sql);
  taosMemoryFreeClear(pTaskInfo->id.str);
  taosMemoryFreeClear(pTaskInfo);
1686 1687 1688 1689
}

static int64_t getQuerySupportBufSize(size_t numOfTables) {
  size_t s1 = sizeof(STableQueryInfo);
L
Liu Jicong 已提交
1690 1691
  //  size_t s3 = sizeof(STableCheckInfo);  buffer consumption in tsdb
  return (int64_t)(s1 * 1.5 * numOfTables);
1692 1693 1694 1695 1696 1697 1698
}

int32_t checkForQueryBuf(size_t numOfTables) {
  int64_t t = getQuerySupportBufSize(numOfTables);
  if (tsQueryBufferSizeBytes < 0) {
    return TSDB_CODE_SUCCESS;
  } else if (tsQueryBufferSizeBytes > 0) {
L
Liu Jicong 已提交
1699
    while (1) {
1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725
      int64_t s = tsQueryBufferSizeBytes;
      int64_t remain = s - t;
      if (remain >= 0) {
        if (atomic_val_compare_exchange_64(&tsQueryBufferSizeBytes, s, remain) == s) {
          return TSDB_CODE_SUCCESS;
        }
      } else {
        return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
      }
    }
  }

  // disable query processing if the value of tsQueryBufferSize is zero.
  return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
}

void releaseQueryBuf(size_t numOfTables) {
  if (tsQueryBufferSizeBytes < 0) {
    return;
  }

  int64_t t = getQuerySupportBufSize(numOfTables);

  // restore value is not enough buffer available
  atomic_add_fetch_64(&tsQueryBufferSizeBytes, t);
}
D
dapan1121 已提交
1726

H
Haojun Liao 已提交
1727
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList) {
1728
  SExplainExecInfo  execInfo = {0};
H
Haojun Liao 已提交
1729
  SExplainExecInfo* pExplainInfo = taosArrayPush(pExecInfoList, &execInfo);
1730

H
Haojun Liao 已提交
1731 1732 1733 1734 1735
  pExplainInfo->numOfRows = operatorInfo->resultInfo.totalRows;
  pExplainInfo->startupCost = operatorInfo->cost.openCost;
  pExplainInfo->totalCost = operatorInfo->cost.totalCost;
  pExplainInfo->verboseLen = 0;
  pExplainInfo->verboseInfo = NULL;
D
dapan1121 已提交
1736

1737
  if (operatorInfo->fpSet.getExplainFn) {
1738 1739
    int32_t code =
        operatorInfo->fpSet.getExplainFn(operatorInfo, &pExplainInfo->verboseInfo, &pExplainInfo->verboseLen);
D
dapan1121 已提交
1740
    if (code) {
1741
      qError("%s operator getExplainFn failed, code:%s", GET_TASKID(operatorInfo->pTaskInfo), tstrerror(code));
D
dapan1121 已提交
1742 1743 1744
      return code;
    }
  }
dengyihao's avatar
dengyihao 已提交
1745

D
dapan1121 已提交
1746
  int32_t code = 0;
D
dapan1121 已提交
1747
  for (int32_t i = 0; i < operatorInfo->numOfDownstream; ++i) {
H
Haojun Liao 已提交
1748 1749
    code = getOperatorExplainExecInfo(operatorInfo->pDownstream[i], pExecInfoList);
    if (code != TSDB_CODE_SUCCESS) {
1750
      //      taosMemoryFreeClear(*pRes);
S
Shengliang Guan 已提交
1751
      return TSDB_CODE_OUT_OF_MEMORY;
D
dapan1121 已提交
1752 1753 1754 1755
    }
  }

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1756
}
5
54liuyao 已提交
1757

1758 1759
int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId,
                     SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup) {
1760 1761 1762 1763 1764 1765
  SWinKey key = {
      .ts = win->skey,
      .groupId = tableGroupId,
  };
  char*   value = NULL;
  int32_t size = pAggSup->resultRowSize;
5
54liuyao 已提交
1766

1767
  if (streamStateAddIfNotExist(pState, &key, (void**)&value, &size) < 0) {
S
Shengliang Guan 已提交
1768
    return TSDB_CODE_OUT_OF_MEMORY;
1769 1770 1771 1772 1773 1774 1775 1776
  }
  *pResult = (SResultRow*)value;
  // set time window for current result
  (*pResult)->win = (*win);
  setResultRowInitCtx(*pResult, pCtx, numOfOutput, rowEntryInfoOffset);
  return TSDB_CODE_SUCCESS;
}

1777 1778
int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult) {
  streamStateReleaseBuf(pState, pKey, pResult);
1779 1780 1781
  return TSDB_CODE_SUCCESS;
}

1782 1783
int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize) {
  streamStatePut(pState, pKey, pResult, resSize);
1784 1785 1786
  return TSDB_CODE_SUCCESS;
}

1787
int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup,
1788
                                   SGroupResInfo* pGroupResInfo) {
1789
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
1790 1791 1792 1793 1794 1795 1796 1797
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  int32_t         numOfExprs = pSup->numOfExprs;
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
  SqlFunctionCtx* pCtx = pSup->pCtx;

  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);

  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
1798
    SWinKey* pKey = taosArrayGet(pGroupResInfo->pRows, i);
X
Xiaoyu Wang 已提交
1799 1800 1801
    int32_t  size = 0;
    void*    pVal = NULL;
    int32_t  code = streamStateGet(pState, pKey, &pVal, &size);
1802 1803 1804 1805 1806 1807
    ASSERT(code == 0);
    SResultRow* pRow = (SResultRow*)pVal;
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
    // no results, continue to check the next one
    if (pRow->numOfRows == 0) {
      pGroupResInfo->index += 1;
1808
      releaseOutputBuf(pState, pKey, pRow);
1809 1810 1811
      continue;
    }

H
Haojun Liao 已提交
1812
    if (pBlock->info.id.groupId == 0) {
1813
      pBlock->info.id.groupId = pKey->groupId;
1814
      void* tbname = NULL;
H
Haojun Liao 已提交
1815
      if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) {
L
Liu Jicong 已提交
1816
        pBlock->info.parTbName[0] = 0;
1817 1818
      } else {
        memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
1819
      }
1820
      tdbFree(tbname);
1821 1822
    } else {
      // current value belongs to different group, it can't be packed into one datablock
1823 1824
      if (pBlock->info.id.groupId != pKey->groupId) {
        releaseOutputBuf(pState, pKey, pRow);
1825 1826 1827 1828 1829 1830
        break;
      }
    }

    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
      ASSERT(pBlock->info.rows > 0);
1831
      releaseOutputBuf(pState, pKey, pRow);
1832 1833 1834 1835 1836 1837 1838 1839 1840 1841
      break;
    }

    pGroupResInfo->index += 1;

    for (int32_t j = 0; j < numOfExprs; ++j) {
      int32_t slotId = pExprInfo[j].base.resSchema.slotId;

      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
      if (pCtx[j].fpSet.finalize) {
1842 1843 1844 1845
        int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
        if (TAOS_FAILED(code1)) {
          qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code1));
          T_LONG_JMP(pTaskInfo->env, code1);
1846 1847 1848 1849 1850 1851 1852 1853 1854
        }
      } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
        // do nothing, todo refactor
      } else {
        // expand the result into multiple rows. E.g., _wstart, top(k, 20)
        // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
        SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
        char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
        for (int32_t k = 0; k < pRow->numOfRows; ++k) {
1855
          colDataSetVal(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
1856 1857 1858
        }
      }
    }
5
54liuyao 已提交
1859

1860
    pBlock->info.rows += pRow->numOfRows;
1861
    releaseOutputBuf(pState, pKey, pRow);
1862
  }
1863
  pBlock->info.dataLoad = 1;
1864 1865 1866
  blockDataUpdateTsWindow(pBlock, 0);
  return TSDB_CODE_SUCCESS;
}
5
54liuyao 已提交
1867 1868 1869 1870 1871 1872 1873

int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, int32_t size) {
  streamStateSessionPut(pState, key, (const void*)buf, size);
  releaseOutputBuf(pState, NULL, (SResultRow*)buf);
  return TSDB_CODE_SUCCESS;
}

1874
int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock,
5
54liuyao 已提交
1875
                                    SExprSupp* pSup, SGroupResInfo* pGroupResInfo) {
1876
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
5
54liuyao 已提交
1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  int32_t         numOfExprs = pSup->numOfExprs;
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
  SqlFunctionCtx* pCtx = pSup->pCtx;

  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);

  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
    SSessionKey* pKey = taosArrayGet(pGroupResInfo->pRows, i);
    int32_t      size = 0;
    void*        pVal = NULL;
    int32_t      code = streamStateSessionGet(pState, pKey, &pVal, &size);
    ASSERT(code == 0);
1890 1891
    if (code == -1) {
      // coverity scan
5
54liuyao 已提交
1892
      pGroupResInfo->index += 1;
1893 1894
      continue;
    }
5
54liuyao 已提交
1895 1896 1897 1898 1899 1900 1901 1902 1903
    SResultRow* pRow = (SResultRow*)pVal;
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
    // no results, continue to check the next one
    if (pRow->numOfRows == 0) {
      pGroupResInfo->index += 1;
      releaseOutputBuf(pState, NULL, pRow);
      continue;
    }

H
Haojun Liao 已提交
1904 1905
    if (pBlock->info.id.groupId == 0) {
      pBlock->info.id.groupId = pKey->groupId;
1906

1907
      void* tbname = NULL;
H
Haojun Liao 已提交
1908
      if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) {
1909
        pBlock->info.parTbName[0] = 0;
1910
      } else {
1911
        memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
1912
      }
1913
      tdbFree(tbname);
5
54liuyao 已提交
1914 1915
    } else {
      // current value belongs to different group, it can't be packed into one datablock
H
Haojun Liao 已提交
1916
      if (pBlock->info.id.groupId != pKey->groupId) {
5
54liuyao 已提交
1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947
        releaseOutputBuf(pState, NULL, pRow);
        break;
      }
    }

    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
      ASSERT(pBlock->info.rows > 0);
      releaseOutputBuf(pState, NULL, pRow);
      break;
    }

    pGroupResInfo->index += 1;

    for (int32_t j = 0; j < numOfExprs; ++j) {
      int32_t slotId = pExprInfo[j].base.resSchema.slotId;

      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
      if (pCtx[j].fpSet.finalize) {
        int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
        if (TAOS_FAILED(code1)) {
          qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code1));
          T_LONG_JMP(pTaskInfo->env, code1);
        }
      } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
        // do nothing, todo refactor
      } else {
        // expand the result into multiple rows. E.g., _wstart, top(k, 20)
        // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
        SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
        char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
        for (int32_t k = 0; k < pRow->numOfRows; ++k) {
1948
          colDataSetVal(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
5
54liuyao 已提交
1949 1950 1951 1952
        }
      }
    }

1953
    pBlock->info.dataLoad = 1;
5
54liuyao 已提交
1954 1955 1956 1957 1958 1959
    pBlock->info.rows += pRow->numOfRows;
    // saveSessionDiscBuf(pState, pKey, pVal, size);
    releaseOutputBuf(pState, NULL, pRow);
  }
  blockDataUpdateTsWindow(pBlock, 0);
  return TSDB_CODE_SUCCESS;
1960
}
L
Liu Jicong 已提交
1961 1962

void qStreamCloseTsdbReader(void* task) {
1963 1964 1965 1966
  if (task == NULL) {
    return;
  }

L
Liu Jicong 已提交
1967 1968
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)task;
  SOperatorInfo* pOp = pTaskInfo->pRoot;
1969 1970 1971 1972 1973 1974

  qDebug("stream close tsdb reader, reset status uid:%" PRId64 " ts:%" PRId64, pTaskInfo->streamInfo.currentOffset.uid,
         pTaskInfo->streamInfo.currentOffset.ts);

  // todo refactor, other thread may already use this read to extract data.
  pTaskInfo->streamInfo.currentOffset = (STqOffsetVal){0};
L
Liu Jicong 已提交
1975 1976 1977 1978 1979 1980
  while (pOp->numOfDownstream == 1 && pOp->pDownstream[0]) {
    SOperatorInfo* pDownstreamOp = pOp->pDownstream[0];
    if (pDownstreamOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
      SStreamScanInfo* pInfo = pDownstreamOp->info;
      if (pInfo->pTableScanOp) {
        STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
1981 1982 1983 1984 1985 1986 1987

        setOperatorCompleted(pInfo->pTableScanOp);
        while(pTaskInfo->owner != 0) {
          taosMsleep(100);
          qDebug("wait for the reader stopping");
        }

L
Liu Jicong 已提交
1988 1989
        tsdbReaderClose(pTSInfo->base.dataReader);
        pTSInfo->base.dataReader = NULL;
1990 1991 1992 1993

        // restore the status, todo refactor.
        pInfo->pTableScanOp->status = OP_OPENED;
        pTaskInfo->status = TASK_NOT_COMPLETED;
L
Liu Jicong 已提交
1994 1995 1996 1997 1998
        return;
      }
    }
  }
}
1999

2000
static void extractTableList(SArray* pList, const SOperatorInfo* pOperator) {
2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016
  if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
    STableScanInfo* pScanInfo = pOperator->info;
    taosArrayPush(pList, &pScanInfo->base.pTableListInfo);
  } else {
    if (pOperator->pDownstream != NULL && pOperator->pDownstream[0] != NULL) {
      extractTableList(pList, pOperator->pDownstream[0]);
    }
  }
}

SArray* getTableListInfo(const SExecTaskInfo* pTaskInfo) {
  SArray* pArray = taosArrayInit(0, POINTER_BYTES);
  SOperatorInfo* pOperator = pTaskInfo->pRoot;
  extractTableList(pArray, pOperator);
  return pArray;
}