executorInt.c 43.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
15

H
Haojun Liao 已提交
16 17
#include "filter.h"
#include "function.h"
18 19
#include "functionMgt.h"
#include "os.h"
H
Haojun Liao 已提交
20
#include "querynodes.h"
21
#include "tfill.h"
dengyihao's avatar
dengyihao 已提交
22
#include "tname.h"
23

H
Haojun Liao 已提交
24
#include "tdatablock.h"
H
Haojun Liao 已提交
25
#include "tmsg.h"
26
#include "ttime.h"
H
Haojun Liao 已提交
27

28
#include "executorInt.h"
dengyihao's avatar
dengyihao 已提交
29
#include "index.h"
30
#include "operator.h"
31
#include "query.h"
32
#include "querytask.h"
33
#include "tcompare.h"
H
Haojun Liao 已提交
34
#include "thash.h"
35
#include "ttypes.h"
dengyihao's avatar
dengyihao 已提交
36
#include "vnode.h"
37

X
Xiaoyu Wang 已提交
38
#define SET_REVERSE_SCAN_FLAG(runtime)    ((runtime)->scanFlag = REVERSE_SCAN)
39 40 41 42
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)

#if 0
static UNUSED_FUNC void *u_malloc (size_t __size) {
wafwerar's avatar
wafwerar 已提交
43
  uint32_t v = taosRand();
44 45 46 47

  if (v % 1000 <= 0) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
48
    return taosMemoryMalloc(__size);
49 50 51 52
  }
}

static UNUSED_FUNC void* u_calloc(size_t num, size_t __size) {
wafwerar's avatar
wafwerar 已提交
53
  uint32_t v = taosRand();
54 55 56
  if (v % 1000 <= 0) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
57
    return taosMemoryCalloc(num, __size);
58 59 60 61
  }
}

static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
wafwerar's avatar
wafwerar 已提交
62
  uint32_t v = taosRand();
63 64 65
  if (v % 5 <= 1) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
66
    return taosMemoryRealloc(p, __size);
67 68 69 70 71 72 73 74
  }
}

#define calloc  u_calloc
#define malloc  u_malloc
#define realloc u_realloc
#endif

75
static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pBlock);
76

X
Xiaoyu Wang 已提交
77 78
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
79

H
Haojun Liao 已提交
80 81 82 83
static void    extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep,
                                                   int32_t status);
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
                                   bool createDummyCol);
H
Haojun Liao 已提交
84
static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
85
                                  SGroupResInfo* pGroupResInfo, int32_t threshold);
H
Haojun Liao 已提交
86

87
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize) {
L
Liu Jicong 已提交
88
  SFilePage* pData = NULL;
89 90 91

  // in the first scan, new space needed for results
  int32_t pageId = -1;
92
  if (*currentPageId == -1) {
93
    pData = getNewBufPage(pResultBuf, &pageId);
94 95
    pData->num = sizeof(SFilePage);
  } else {
96
    pData = getBufPage(pResultBuf, *currentPageId);
97 98 99 100 101
    if (pData == NULL) {
      qError("failed to get buffer, code:%s", tstrerror(terrno));
      return NULL;
    }

102
    pageId = *currentPageId;
103

wmmhello's avatar
wmmhello 已提交
104
    if (pData->num + interBufSize > getBufPageSize(pResultBuf)) {
105
      // release current page first, and prepare the next one
106
      releaseBufPage(pResultBuf, pData);
107

108
      pData = getNewBufPage(pResultBuf, &pageId);
109 110 111 112 113 114 115 116 117 118
      if (pData != NULL) {
        pData->num = sizeof(SFilePage);
      }
    }
  }

  if (pData == NULL) {
    return NULL;
  }

119 120
  setBufPageDirty(pData, true);

121 122
  // set the number of rows in current disk page
  SResultRow* pResultRow = (SResultRow*)((char*)pData + pData->num);
123

X
Xiaoyu Wang 已提交
124
  memset((char*)pResultRow, 0, interBufSize);
125 126 127
  pResultRow->pageId = pageId;
  pResultRow->offset = (int32_t)pData->num;

128
  *currentPageId = pageId;
wmmhello's avatar
wmmhello 已提交
129
  pData->num += interBufSize;
130 131 132
  return pResultRow;
}

133 134 135 136 137 138 139
/**
 * the struct of key in hash table
 * +----------+---------------+
 * | group id |   key data    |
 * | 8 bytes  | actual length |
 * +----------+---------------+
 */
140 141
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData,
                                   int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo,
D
dapan1121 已提交
142
                                   bool isIntervalQuery, SAggSupporter* pSup, bool keepGroup) {
143
  SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
D
dapan1121 已提交
144 145 146
  if (!keepGroup) {
    *(uint64_t*)pSup->keyBuf = calcGroupId(pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
  }
X
Xiaoyu Wang 已提交
147

dengyihao's avatar
dengyihao 已提交
148
  SResultRowPosition* p1 =
149
      (SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
H
Haojun Liao 已提交
150

151 152
  SResultRow* pResult = NULL;

H
Haojun Liao 已提交
153 154
  // in case of repeat scan/reverse scan, no new time window added.
  if (isIntervalQuery) {
155
    if (p1 != NULL) {  // the *p1 may be NULL in case of sliding+offset exists.
156
      pResult = getResultRowByPos(pResultBuf, p1, true);
157 158 159 160
      if (NULL == pResult) {
        T_LONG_JMP(pTaskInfo->env, terrno);
      }

161
      ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
H
Haojun Liao 已提交
162 163
    }
  } else {
dengyihao's avatar
dengyihao 已提交
164 165
    // In case of group by column query, the required SResultRow object must be existInCurrentResusltRowInfo in the
    // pResultRowInfo object.
H
Haojun Liao 已提交
166
    if (p1 != NULL) {
167
      // todo
168
      pResult = getResultRowByPos(pResultBuf, p1, true);
169 170 171 172
      if (NULL == pResult) {
        T_LONG_JMP(pTaskInfo->env, terrno);
      }

173
      ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
H
Haojun Liao 已提交
174 175 176
    }
  }

L
Liu Jicong 已提交
177
  // 1. close current opened time window
178
  if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId))) {
179
    SResultRowPosition pos = pResultRowInfo->cur;
X
Xiaoyu Wang 已提交
180
    SFilePage*         pPage = getBufPage(pResultBuf, pos.pageId);
181 182 183 184
    if (pPage == NULL) {
      qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
      T_LONG_JMP(pTaskInfo->env, terrno);
    }
185 186 187 188 189
    releaseBufPage(pResultBuf, pPage);
  }

  // allocate a new buffer page
  if (pResult == NULL) {
190
    pResult = getNewResultRow(pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
191 192 193
    if (pResult == NULL) {
      T_LONG_JMP(pTaskInfo->env, terrno);
    }
194

195 196
    // add a new result set for a new group
    SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
197
    tSimpleHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos,
L
Liu Jicong 已提交
198
                   sizeof(SResultRowPosition));
H
Haojun Liao 已提交
199 200
  }

201 202 203
  // 2. set the new time window to be the new active time window
  pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};

H
Haojun Liao 已提交
204
  // too many time window in query
205
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH &&
206
      tSimpleHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) {
207
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
H
Haojun Liao 已提交
208 209
  }

H
Haojun Liao 已提交
210
  return pResult;
H
Haojun Liao 已提交
211 212
}

213
//  query_range_start, query_range_end, window_duration, window_start, window_end
214
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow) {
215 216 217
  pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP;
  pColData->info.bytes = sizeof(int64_t);

H
Haojun Liao 已提交
218
  colInfoDataEnsureCapacity(pColData, 5, false);
219 220
  colDataSetInt64(pColData, 0, &pQueryWindow->skey);
  colDataSetInt64(pColData, 1, &pQueryWindow->ekey);
221 222

  int64_t interval = 0;
223 224 225
  colDataSetInt64(pColData, 2, &interval);  // this value may be variable in case of 'n' and 'y'.
  colDataSetInt64(pColData, 3, &pQueryWindow->skey);
  colDataSetInt64(pColData, 4, &pQueryWindow->ekey);
226 227
}

G
Ganlin Zhao 已提交
228
static void doSetInputDataBlockInfo(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag) {
229 230
  SqlFunctionCtx* pCtx = pExprSup->pCtx;
  for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
231
    pCtx[i].order = order;
232
    pCtx[i].input.numOfRows = pBlock->info.rows;
233
    setBlockSMAInfo(&pCtx[i], &pExprSup->pExprInfo[i], pBlock);
234
    pCtx[i].pSrcBlock = pBlock;
G
Ganlin Zhao 已提交
235
    pCtx[i].scanFlag = scanFlag;
236 237 238
  }
}

239
void setInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol) {
240
  if (pBlock->pBlockAgg != NULL) {
G
Ganlin Zhao 已提交
241
    doSetInputDataBlockInfo(pExprSup, pBlock, order, scanFlag);
242
  } else {
243
    doSetInputDataBlock(pExprSup, pBlock, order, scanFlag, createDummyCol);
H
Haojun Liao 已提交
244
  }
245 246
}

L
Liu Jicong 已提交
247 248
static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t paramIndex,
                                             int32_t numOfRows) {
249 250 251 252 253 254 255 256
  SColumnInfoData* pColInfo = NULL;
  if (pInput->pData[paramIndex] == NULL) {
    pColInfo = taosMemoryCalloc(1, sizeof(SColumnInfoData));
    if (pColInfo == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

    // Set the correct column info (data type and bytes)
257 258
    pColInfo->info.type = pFuncParam->param.nType;
    pColInfo->info.bytes = pFuncParam->param.nLen;
259 260

    pInput->pData[paramIndex] = pColInfo;
261 262
  } else {
    pColInfo = pInput->pData[paramIndex];
263 264
  }

H
Haojun Liao 已提交
265
  colInfoDataEnsureCapacity(pColInfo, numOfRows, false);
266

267
  int8_t type = pFuncParam->param.nType;
268 269
  if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT) {
    int64_t v = pFuncParam->param.i;
dengyihao's avatar
dengyihao 已提交
270
    for (int32_t i = 0; i < numOfRows; ++i) {
271
      colDataSetInt64(pColInfo, i, &v);
272 273 274
    }
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
    double v = pFuncParam->param.d;
dengyihao's avatar
dengyihao 已提交
275
    for (int32_t i = 0; i < numOfRows; ++i) {
276
      colDataSetDouble(pColInfo, i, &v);
277
    }
278
  } else if (type == TSDB_DATA_TYPE_VARCHAR) {
L
Liu Jicong 已提交
279
    char* tmp = taosMemoryMalloc(pFuncParam->param.nLen + VARSTR_HEADER_SIZE);
280
    STR_WITH_SIZE_TO_VARSTR(tmp, pFuncParam->param.pz, pFuncParam->param.nLen);
L
Liu Jicong 已提交
281
    for (int32_t i = 0; i < numOfRows; ++i) {
282
      colDataSetVal(pColInfo, i, tmp, false);
283
    }
H
Haojun Liao 已提交
284
    taosMemoryFree(tmp);
285 286 287 288 289
  }

  return TSDB_CODE_SUCCESS;
}

290 291
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
                                   bool createDummyCol) {
292
  int32_t         code = TSDB_CODE_SUCCESS;
293
  SqlFunctionCtx* pCtx = pExprSup->pCtx;
294

295
  for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
L
Liu Jicong 已提交
296
    pCtx[i].order = order;
297 298
    pCtx[i].input.numOfRows = pBlock->info.rows;

L
Liu Jicong 已提交
299
    pCtx[i].pSrcBlock = pBlock;
X
Xiaoyu Wang 已提交
300
    pCtx[i].scanFlag = scanFlag;
H
Haojun Liao 已提交
301

302
    SInputColumnInfoData* pInput = &pCtx[i].input;
H
Haojun Liao 已提交
303
    pInput->uid = pBlock->info.id.uid;
H
Haojun Liao 已提交
304
    pInput->colDataSMAIsSet = false;
305

306
    SExprInfo* pOneExpr = &pExprSup->pExprInfo[i];
307
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
dengyihao's avatar
dengyihao 已提交
308
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
G
Ganlin Zhao 已提交
309 310
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
        int32_t slotId = pFuncParam->pCol->slotId;
dengyihao's avatar
dengyihao 已提交
311
        pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
312 313 314
        pInput->totalRows = pBlock->info.rows;
        pInput->numOfRows = pBlock->info.rows;
        pInput->startRowIndex = 0;
315

316
        // NOTE: the last parameter is the primary timestamp column
H
Haojun Liao 已提交
317
        // todo: refactor this
318
        if (fmIsImplicitTsFunc(pCtx[i].functionId) && (j == pOneExpr->base.numOfParams - 1)) {
L
Liu Jicong 已提交
319
          pInput->pPTS = pInput->pData[j];  // in case of merge function, this is not always the ts column data.
320
        }
321 322
        ASSERT(pInput->pData[j] != NULL);
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
323 324 325
        // todo avoid case: top(k, 12), 12 is the value parameter.
        // sum(11), 11 is also the value parameter.
        if (createDummyCol && pOneExpr->base.numOfParams == 1) {
326 327 328 329
          pInput->totalRows = pBlock->info.rows;
          pInput->numOfRows = pBlock->info.rows;
          pInput->startRowIndex = 0;

330
          code = doCreateConstantValColumnInfo(pInput, pFuncParam, j, pBlock->info.rows);
331 332 333
          if (code != TSDB_CODE_SUCCESS) {
            return code;
          }
334
        }
G
Ganlin Zhao 已提交
335 336
      }
    }
H
Haojun Liao 已提交
337
  }
338 339

  return code;
H
Haojun Liao 已提交
340 341
}

5
54liuyao 已提交
342
bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
343
  struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
344

345 346 347 348 349
  // in case of timestamp column, always generated results.
  int32_t functionId = pCtx->functionId;
  if (functionId == -1) {
    return false;
  }
350

G
Ganlin Zhao 已提交
351
  if (pCtx->scanFlag == PRE_SCAN) {
352
    return fmIsRepeatScanFunc(pCtx->functionId);
353 354
  }

355 356
  if (isRowEntryCompleted(pResInfo)) {
    return false;
357 358
  }

359 360 361
  return true;
}

H
Haojun Liao 已提交
362
static int32_t doCreateConstantValColumnSMAInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t type,
363 364 365 366 367 368
                                                int32_t paramIndex, int32_t numOfRows) {
  if (pInput->pData[paramIndex] == NULL) {
    pInput->pData[paramIndex] = taosMemoryCalloc(1, sizeof(SColumnInfoData));
    if (pInput->pData[paramIndex] == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
369

370 371 372
    // Set the correct column info (data type and bytes)
    pInput->pData[paramIndex]->info.type = type;
    pInput->pData[paramIndex]->info.bytes = tDataTypes[type].bytes;
373
  }
H
Haojun Liao 已提交
374

375 376 377 378 379 380
  SColumnDataAgg* da = NULL;
  if (pInput->pColumnDataAgg[paramIndex] == NULL) {
    da = taosMemoryCalloc(1, sizeof(SColumnDataAgg));
    pInput->pColumnDataAgg[paramIndex] = da;
    if (da == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
381 382
    }
  } else {
383
    da = pInput->pColumnDataAgg[paramIndex];
384 385
  }

386 387
  if (type == TSDB_DATA_TYPE_BIGINT) {
    int64_t v = pFuncParam->param.i;
388
    *da = (SColumnDataAgg){.numOfNull = 0, .min = v, .max = v, .sum = v * numOfRows};
389 390
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
    double v = pFuncParam->param.d;
391
    *da = (SColumnDataAgg){.numOfNull = 0};
392

393 394 395 396 397 398
    *(double*)&da->min = v;
    *(double*)&da->max = v;
    *(double*)&da->sum = v * numOfRows;
  } else if (type == TSDB_DATA_TYPE_BOOL) {  // todo validate this data type
    bool v = pFuncParam->param.i;

399
    *da = (SColumnDataAgg){.numOfNull = 0};
400 401 402 403 404
    *(bool*)&da->min = 0;
    *(bool*)&da->max = v;
    *(bool*)&da->sum = v * numOfRows;
  } else if (type == TSDB_DATA_TYPE_TIMESTAMP) {
    // do nothing
405
  } else {
H
Haojun Liao 已提交
406
    qError("invalid constant type for sma info");
407 408
  }

409 410
  return TSDB_CODE_SUCCESS;
}
411

412
void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pBlock) {
413 414 415 416 417 418 419
  int32_t numOfRows = pBlock->info.rows;

  SInputColumnInfoData* pInput = &pCtx->input;
  pInput->numOfRows = numOfRows;
  pInput->totalRows = numOfRows;

  if (pBlock->pBlockAgg != NULL) {
H
Haojun Liao 已提交
420
    pInput->colDataSMAIsSet = true;
421

422 423
    for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
      SFunctParam* pFuncParam = &pExprInfo->base.pParam[j];
424

425 426
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
        int32_t slotId = pFuncParam->pCol->slotId;
427 428
        pInput->pColumnDataAgg[j] = pBlock->pBlockAgg[slotId];
        if (pInput->pColumnDataAgg[j] == NULL) {
H
Haojun Liao 已提交
429
          pInput->colDataSMAIsSet = false;
430
        }
431 432 433 434

        // Here we set the column info data since the data type for each column data is required, but
        // the data in the corresponding SColumnInfoData will not be used.
        pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
435
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
H
Haojun Liao 已提交
436
        doCreateConstantValColumnSMAInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows);
437 438
      }
    }
439
  } else {
H
Haojun Liao 已提交
440
    pInput->colDataSMAIsSet = false;
441 442 443 444
  }
}

/////////////////////////////////////////////////////////////////////////////////////////////
445
STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key) {
L
Liu Jicong 已提交
446
  STimeWindow win = {0};
447
  win.skey = taosTimeTruncate(key, pInterval, precision);
448 449

  /*
H
Haojun Liao 已提交
450
   * if the realSkey > INT64_MAX - pInterval->interval, the query duration between
451 452
   * realSkey and realEkey must be less than one interval.Therefore, no need to adjust the query ranges.
   */
453 454 455
  win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
  if (win.ekey < win.skey) {
    win.ekey = INT64_MAX;
456
  }
457 458

  return win;
459 460
}

461
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset) {
5
54liuyao 已提交
462
  bool init = false;
463
  for (int32_t i = 0; i < numOfOutput; ++i) {
464
    pCtx[i].resultInfo = getResultEntryInfo(pResult, i, rowEntryInfoOffset);
5
54liuyao 已提交
465 466 467
    if (init) {
      continue;
    }
468 469 470 471 472

    struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
    if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) {
      continue;
    }
473

474
    if (pCtx[i].isPseudoFunc) {
475 476 477
      continue;
    }

478 479 480 481 482 483
    if (!pResInfo->initialized) {
      if (pCtx[i].functionId != -1) {
        pCtx[i].fpSet.init(&pCtx[i], pResInfo);
      } else {
        pResInfo->initialized = true;
      }
5
54liuyao 已提交
484 485
    } else {
      init = true;
486 487 488 489
    }
  }
}

490 491 492 493 494 495 496 497 498 499 500 501 502 503
void clearResultRowInitFlag(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
  for (int32_t i = 0; i < numOfOutput; ++i) {
    SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
    if (pResInfo == NULL) {
      continue;
    }

    pResInfo->initialized = false;
    pResInfo->numOfRes = 0;
    pResInfo->isNullRes = 0;
    pResInfo->complete = false;
  }
}

H
Haojun Liao 已提交
504 505
void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo) {
  if (pFilterInfo == NULL || pBlock->info.rows == 0) {
S
shenglian zhou 已提交
506 507
    return;
  }
508

509
  SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
510
  int32_t            code = filterSetDataFromSlotId(pFilterInfo, &param1);
511

512
  SColumnInfoData* p = NULL;
513
  int32_t          status = 0;
H
Haojun Liao 已提交
514

515
  // todo the keep seems never to be True??
H
Haojun Liao 已提交
516
  bool keep = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status);
517
  extractQualifiedTupleByFilterResult(pBlock, p, keep, status);
H
Haojun Liao 已提交
518

519
  if (pColMatchInfo != NULL) {
520
    size_t size = taosArrayGetSize(pColMatchInfo->pList);
H
Haojun Liao 已提交
521
    for (int32_t i = 0; i < size; ++i) {
H
Haojun Liao 已提交
522
      SColMatchItem* pInfo = taosArrayGet(pColMatchInfo->pList, i);
523
      if (pInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
524
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pInfo->dstSlotId);
525
        if (pColData->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
H
Haojun Liao 已提交
526
          blockDataUpdateTsWindow(pBlock, pInfo->dstSlotId);
527 528 529 530 531 532
          break;
        }
      }
    }
  }

533 534
  colDataDestroy(p);
  taosMemoryFree(p);
535 536
}

537
void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep, int32_t status) {
538 539 540 541
  if (keep) {
    return;
  }

H
Haojun Liao 已提交
542
  int8_t* pIndicator = (int8_t*)p->pData;
H
Haojun Liao 已提交
543 544 545
  int32_t totalRows = pBlock->info.rows;

  if (status == FILTER_RESULT_ALL_QUALIFIED) {
546
    // here nothing needs to be done
H
Haojun Liao 已提交
547
  } else if (status == FILTER_RESULT_NONE_QUALIFIED) {
548
    pBlock->info.rows = 0;
H
Haojun Liao 已提交
549
  } else {
H
Haojun Liao 已提交
550
    int32_t bmLen = BitmapLen(totalRows);
H
Haojun Liao 已提交
551 552
    char*   pBitmap = NULL;
    int32_t maxRows = 0;
553

554 555
    size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
    for (int32_t i = 0; i < numOfCols; ++i) {
556
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
557
      // it is a reserved column for scalar function, and no data in this column yet.
H
Haojun Liao 已提交
558
      if (pDst->pData == NULL) {
559 560 561
        continue;
      }

562
      int32_t numOfRows = 0;
H
Haojun Liao 已提交
563 564
      if (IS_VAR_DATA_TYPE(pDst->info.type)) {
        int32_t j = 0;
565 566
        pDst->varmeta.length = 0;

X
Xiaoyu Wang 已提交
567
        while (j < totalRows) {
H
Haojun Liao 已提交
568
          if (pIndicator[j] == 0) {
569
            j += 1;
H
Haojun Liao 已提交
570 571
            continue;
          }
572

H
Haojun Liao 已提交
573 574 575 576
          if (colDataIsNull_var(pDst, j)) {
            colDataSetNull_var(pDst, numOfRows);
          } else {
            char* p1 = colDataGetVarData(pDst, j);
577
            colDataSetVal(pDst, numOfRows, p1, false);
H
Haojun Liao 已提交
578
          }
H
Haojun Liao 已提交
579 580
          numOfRows += 1;
          j += 1;
D
dapan1121 已提交
581
        }
582

H
Haojun Liao 已提交
583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602
        if (maxRows < numOfRows) {
          maxRows = numOfRows;
        }
      } else {
        if (pBitmap == NULL) {
          pBitmap = taosMemoryCalloc(1, bmLen);
        }

        memcpy(pBitmap, pDst->nullbitmap, bmLen);
        memset(pDst->nullbitmap, 0, bmLen);

        int32_t j = 0;

        switch (pDst->info.type) {
          case TSDB_DATA_TYPE_BIGINT:
          case TSDB_DATA_TYPE_UBIGINT:
          case TSDB_DATA_TYPE_DOUBLE:
          case TSDB_DATA_TYPE_TIMESTAMP:
            while (j < totalRows) {
              if (pIndicator[j] == 0) {
H
Haojun Liao 已提交
603
                j += 1;
H
Haojun Liao 已提交
604 605 606 607 608 609 610
                continue;
              }

              if (colDataIsNull_f(pBitmap, j)) {
                colDataSetNull_f(pDst->nullbitmap, numOfRows);
              } else {
                ((int64_t*)pDst->pData)[numOfRows] = ((int64_t*)pDst->pData)[j];
H
Haojun Liao 已提交
611
              }
H
Haojun Liao 已提交
612 613 614 615 616 617 618 619 620
              numOfRows += 1;
              j += 1;
            }
            break;
          case TSDB_DATA_TYPE_FLOAT:
          case TSDB_DATA_TYPE_INT:
          case TSDB_DATA_TYPE_UINT:
            while (j < totalRows) {
              if (pIndicator[j] == 0) {
H
Haojun Liao 已提交
621
                j += 1;
H
Haojun Liao 已提交
622
                continue;
H
Haojun Liao 已提交
623
              }
H
Haojun Liao 已提交
624 625 626
              if (colDataIsNull_f(pBitmap, j)) {
                colDataSetNull_f(pDst->nullbitmap, numOfRows);
              } else {
H
Haojun Liao 已提交
627
                ((int32_t*)pDst->pData)[numOfRows] = ((int32_t*)pDst->pData)[j];
H
Haojun Liao 已提交
628
              }
H
Haojun Liao 已提交
629 630 631 632 633 634 635 636
              numOfRows += 1;
              j += 1;
            }
            break;
          case TSDB_DATA_TYPE_SMALLINT:
          case TSDB_DATA_TYPE_USMALLINT:
            while (j < totalRows) {
              if (pIndicator[j] == 0) {
H
Haojun Liao 已提交
637
                j += 1;
H
Haojun Liao 已提交
638
                continue;
H
Haojun Liao 已提交
639
              }
H
Haojun Liao 已提交
640 641 642
              if (colDataIsNull_f(pBitmap, j)) {
                colDataSetNull_f(pDst->nullbitmap, numOfRows);
              } else {
H
Haojun Liao 已提交
643
                ((int16_t*)pDst->pData)[numOfRows] = ((int16_t*)pDst->pData)[j];
H
Haojun Liao 已提交
644
              }
H
Haojun Liao 已提交
645 646 647 648 649 650 651 652 653
              numOfRows += 1;
              j += 1;
            }
            break;
          case TSDB_DATA_TYPE_BOOL:
          case TSDB_DATA_TYPE_TINYINT:
          case TSDB_DATA_TYPE_UTINYINT:
            while (j < totalRows) {
              if (pIndicator[j] == 0) {
H
Haojun Liao 已提交
654
                j += 1;
H
Haojun Liao 已提交
655
                continue;
H
Haojun Liao 已提交
656
              }
H
Haojun Liao 已提交
657 658 659 660 661 662 663 664 665
              if (colDataIsNull_f(pBitmap, j)) {
                colDataSetNull_f(pDst->nullbitmap, numOfRows);
              } else {
                ((int8_t*)pDst->pData)[numOfRows] = ((int8_t*)pDst->pData)[j];
              }
              numOfRows += 1;
              j += 1;
            }
            break;
D
dapan1121 已提交
666
        }
H
Haojun Liao 已提交
667
      }
668

H
Haojun Liao 已提交
669 670
      if (maxRows < numOfRows) {
        maxRows = numOfRows;
671
      }
672
    }
673

H
Haojun Liao 已提交
674
    pBlock->info.rows = maxRows;
H
Haojun Liao 已提交
675 676 677
    if (pBitmap != NULL) {
      taosMemoryFree(pBitmap);
    }
678 679 680
  }
}

H
Haojun Liao 已提交
681
void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset) {
682
  bool returnNotNull = false;
683
  for (int32_t j = 0; j < numOfExprs; ++j) {
684
    SResultRowEntryInfo* pResInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
685 686 687 688 689 690 691
    if (!isRowEntryInitialized(pResInfo)) {
      continue;
    }

    if (pRow->numOfRows < pResInfo->numOfRes) {
      pRow->numOfRows = pResInfo->numOfRes;
    }
692

693
    if (pCtx[j].isNotNullFunc) {
694 695
      returnNotNull = true;
    }
696
  }
S
shenglian zhou 已提交
697 698
  // if all expr skips all blocks, e.g. all null inputs for max function, output one row in final result.
  //  except for first/last, which require not null output, output no rows
699
  if (pRow->numOfRows == 0 && !returnNotNull) {
700
    pRow->numOfRows = 1;
701 702 703
  }
}

H
Haojun Liao 已提交
704 705
void copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx,
                              SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo) {
706 707 708
  for (int32_t j = 0; j < numOfExprs; ++j) {
    int32_t slotId = pExprInfo[j].base.resSchema.slotId;

709
    pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
710
    if (pCtx[j].fpSet.finalize) {
711
      if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_group_key") == 0) {
712 713
        // for groupkey along with functions that output multiple lines(e.g. Histogram)
        // need to match groupkey result for each output row of that function.
714 715 716 717 718
        if (pCtx[j].resultInfo->numOfRes != 0) {
          pCtx[j].resultInfo->numOfRes = pRow->numOfRows;
        }
      }

719 720 721
      int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
      if (TAOS_FAILED(code)) {
        qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
722
        T_LONG_JMP(pTaskInfo->env, code);
723 724
      }
    } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
725
      // do nothing
726
    } else {
727 728
      // expand the result into multiple rows. E.g., _wstart, top(k, 20)
      // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
729 730 731
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
      char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
      for (int32_t k = 0; k < pRow->numOfRows; ++k) {
732
        colDataSetVal(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
733 734 735
      }
    }
  }
736 737
}

738 739 740
// todo refactor. SResultRow has direct pointer in miainfo
int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
                           SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
X
Xiaoyu Wang 已提交
741
  SFilePage* page = getBufPage(pBuf, resultRowPosition->pageId);
742 743 744 745 746
  if (page == NULL) {
    qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
    T_LONG_JMP(pTaskInfo->env, terrno);
  }

747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770
  SResultRow* pRow = (SResultRow*)((char*)page + resultRowPosition->offset);

  SqlFunctionCtx* pCtx = pSup->pCtx;
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  const int32_t*  rowEntryOffset = pSup->rowEntryInfoOffset;

  doUpdateNumOfRows(pCtx, pRow, pSup->numOfExprs, rowEntryOffset);
  if (pRow->numOfRows == 0) {
    releaseBufPage(pBuf, page);
    return 0;
  }

  int32_t size = pBlock->info.capacity;
  while (pBlock->info.rows + pRow->numOfRows > size) {
    size = size * 1.25;
  }

  int32_t code = blockDataEnsureCapacity(pBlock, size);
  if (TAOS_FAILED(code)) {
    releaseBufPage(pBuf, page);
    qError("%s ensure result data capacity failed, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
    T_LONG_JMP(pTaskInfo->env, code);
  }

H
Haojun Liao 已提交
771
  copyResultrowToDataBlock(pExprInfo, pSup->numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
772 773

  releaseBufPage(pBuf, page);
774
  pBlock->info.rows += pRow->numOfRows;
775 776 777
  return 0;
}

778
int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
779
                           SGroupResInfo* pGroupResInfo, int32_t threshold) {
780 781 782 783 784
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  int32_t         numOfExprs = pSup->numOfExprs;
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
  SqlFunctionCtx* pCtx = pSup->pCtx;

785
  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
786

787
  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
L
Liu Jicong 已提交
788 789
    SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
    SFilePage*  page = getBufPage(pBuf, pPos->pos.pageId);
790 791 792 793
    if (page == NULL) {
      qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
      T_LONG_JMP(pTaskInfo->env, terrno);
    }
794

795
    SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset);
796

H
Haojun Liao 已提交
797
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
798 799

    // no results, continue to check the next one
800 801
    if (pRow->numOfRows == 0) {
      pGroupResInfo->index += 1;
802
      releaseBufPage(pBuf, page);
803 804 805
      continue;
    }

H
Haojun Liao 已提交
806 807
    if (pBlock->info.id.groupId == 0) {
      pBlock->info.id.groupId = pPos->groupId;
808 809
    } else {
      // current value belongs to different group, it can't be packed into one datablock
H
Haojun Liao 已提交
810
      if (pBlock->info.id.groupId != pPos->groupId) {
811
        releaseBufPage(pBuf, page);
812 813 814 815
        break;
      }
    }

816
    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
817
      blockDataEnsureCapacity(pBlock, pBlock->info.rows + pRow->numOfRows);
D
dapan1121 已提交
818
      qDebug("datablock capacity not sufficient, expand to required:%" PRId64 ", current capacity:%d, %s",
X
Xiaoyu Wang 已提交
819 820
             (pRow->numOfRows + pBlock->info.rows), pBlock->info.capacity, GET_TASKID(pTaskInfo));
      // todo set the pOperator->resultInfo size
821 822 823
    }

    pGroupResInfo->index += 1;
H
Haojun Liao 已提交
824
    copyResultrowToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
825

826
    releaseBufPage(pBuf, page);
827
    pBlock->info.rows += pRow->numOfRows;
828 829 830
    if (pBlock->info.rows >= threshold) {
      break;
    }
831 832
  }

D
dapan1121 已提交
833
  qDebug("%s result generated, rows:%" PRId64 ", groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
H
Haojun Liao 已提交
834
         pBlock->info.id.groupId);
H
Haojun Liao 已提交
835
  pBlock->info.dataLoad = 1;
836
  blockDataUpdateTsWindow(pBlock, 0);
837 838 839
  return 0;
}

840 841 842 843 844 845 846 847 848 849 850 851 852 853
void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
                           SDiskbasedBuf* pBuf) {
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  SSDataBlock*   pBlock = pbInfo->pRes;

  // set output datablock version
  pBlock->info.version = pTaskInfo->version;

  blockDataCleanup(pBlock);
  if (!hasRemainResults(pGroupResInfo)) {
    return;
  }

  // clear the existed group id
H
Haojun Liao 已提交
854
  pBlock->info.id.groupId = 0;
855
  ASSERT(!pbInfo->mergeResultBlock);
856
  doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold);
857

858
  void* tbname = NULL;
H
Haojun Liao 已提交
859
  if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) {
860 861 862
    pBlock->info.parTbName[0] = 0;
  } else {
    memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
863
  }
864
  tdbFree(tbname);
865 866
}

X
Xiaoyu Wang 已提交
867 868
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
                            SDiskbasedBuf* pBuf) {
869
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
870
  SSDataBlock*   pBlock = pbInfo->pRes;
871

872 873 874
  // set output datablock version
  pBlock->info.version = pTaskInfo->version;

875
  blockDataCleanup(pBlock);
876
  if (!hasRemainResults(pGroupResInfo)) {
877 878 879
    return;
  }

880
  // clear the existed group id
H
Haojun Liao 已提交
881
  pBlock->info.id.groupId = 0;
882
  if (!pbInfo->mergeResultBlock) {
883
    doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold);
884
  } else {
dengyihao's avatar
dengyihao 已提交
885
    while (hasRemainResults(pGroupResInfo)) {
886
      doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold);
887 888
      if (pBlock->info.rows >= pOperator->resultInfo.threshold) {
        break;
889 890
      }

891
      // clearing group id to continue to merge data that belong to different groups
H
Haojun Liao 已提交
892
      pBlock->info.id.groupId = 0;
893
    }
894 895

    // clear the group id info in SSDataBlock, since the client does not need it
H
Haojun Liao 已提交
896
    pBlock->info.id.groupId = 0;
897 898 899
  }
}

900
void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
C
Cary Xu 已提交
901 902 903 904 905
  for (int32_t i = 0; i < numOfExprs; ++i) {
    SExprInfo* pExprInfo = &pExpr[i];
    for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
      if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_COLUMN) {
        taosMemoryFreeClear(pExprInfo->base.pParam[j].pCol);
5
54liuyao 已提交
906 907
      } else if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_VALUE) {
        taosVariantDestroy(&pExprInfo->base.pParam[j].param);
H
Haojun Liao 已提交
908
      }
909
    }
C
Cary Xu 已提交
910 911 912

    taosMemoryFree(pExprInfo->base.pParam);
    taosMemoryFree(pExprInfo->pExpr);
H
Haojun Liao 已提交
913 914 915
  }
}

916 917 918 919 920 921
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz) {
  *defaultPgsz = 4096;
  while (*defaultPgsz < rowSize * 4) {
    *defaultPgsz <<= 1u;
  }

922
  // The default buffer for each operator in query is 10MB.
923
  // at least four pages need to be in buffer
924 925
  // TODO: make this variable to be configurable.
  *defaultBufsz = 4096 * 2560;
926 927 928 929 930 931 932
  if ((*defaultBufsz) <= (*defaultPgsz)) {
    (*defaultBufsz) = (*defaultPgsz) * 4;
  }

  return 0;
}

L
Liu Jicong 已提交
933
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows) {
H
Haojun Liao 已提交
934 935 936 937
  if (numOfRows == 0) {
    numOfRows = 4096;
  }

938 939
  pResultInfo->capacity = numOfRows;
  pResultInfo->threshold = numOfRows * 0.75;
940

941 942
  if (pResultInfo->threshold == 0) {
    pResultInfo->threshold = numOfRows;
943 944 945
  }
}

946 947 948 949 950
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock) {
  pInfo->pRes = pBlock;
  initResultRowInfo(&pInfo->resultRowInfo);
}

951
static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
952 953 954 955 956 957 958 959 960 961
  if (pCtx == NULL) {
    return NULL;
  }

  for (int32_t i = 0; i < numOfOutput; ++i) {
    for (int32_t j = 0; j < pCtx[i].numOfParams; ++j) {
      taosVariantDestroy(&pCtx[i].param[j].param);
    }

    taosMemoryFreeClear(pCtx[i].subsidiaries.pCtx);
962
    taosMemoryFreeClear(pCtx[i].subsidiaries.buf);
963 964
    taosMemoryFree(pCtx[i].input.pData);
    taosMemoryFree(pCtx[i].input.pColumnDataAgg);
965 966 967 968

    if (pCtx[i].udfName != NULL) {
      taosMemoryFree(pCtx[i].udfName);
    }
969 970 971 972 973 974
  }

  taosMemoryFreeClear(pCtx);
  return NULL;
}

975
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr) {
976 977 978 979
  pSup->pExprInfo = pExprInfo;
  pSup->numOfExprs = numOfExpr;
  if (pSup->pExprInfo != NULL) {
    pSup->pCtx = createSqlFunctionCtx(pExprInfo, numOfExpr, &pSup->rowEntryInfoOffset);
980 981 982
    if (pSup->pCtx == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
983
  }
984 985

  return TSDB_CODE_SUCCESS;
986 987
}

988 989 990 991
void cleanupExprSupp(SExprSupp* pSupp) {
  destroySqlFunctionCtx(pSupp->pCtx, pSupp->numOfExprs);
  if (pSupp->pExprInfo != NULL) {
    destroyExprInfo(pSupp->pExprInfo, pSupp->numOfExprs);
C
Cary Xu 已提交
992
    taosMemoryFreeClear(pSupp->pExprInfo);
993
  }
H
Haojun Liao 已提交
994 995 996 997 998 999

  if (pSupp->pFilterInfo != NULL) {
    filterFreeInfo(pSupp->pFilterInfo);
    pSupp->pFilterInfo = NULL;
  }

1000 1001 1002
  taosMemoryFree(pSupp->rowEntryInfoOffset);
}

X
Xiaoyu Wang 已提交
1003
void cleanupBasicInfo(SOptrBasicInfo* pInfo) { pInfo->pRes = blockDataDestroy(pInfo->pRes); }
1004

1005
bool groupbyTbname(SNodeList* pGroupList) {
1006
  bool bytbname = false;
1007
  if (LIST_LENGTH(pGroupList) == 1) {
1008 1009 1010 1011 1012 1013 1014 1015 1016 1017
    SNode* p = nodesListGetNode(pGroupList, 0);
    if (p->type == QUERY_NODE_FUNCTION) {
      // partition by tbname/group by tbname
      bytbname = (strcmp(((struct SFunctionNode*)p)->functionName, "tbname") == 0);
    }
  }

  return bytbname;
}

1018
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, SExecTaskInfo* pTask, SReadHandle* readHandle) {
D
dapan1121 已提交
1019
  switch (pNode->type) {
D
dapan1121 已提交
1020 1021 1022 1023 1024 1025
    case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: {
      SInserterParam* pInserterParam = taosMemoryCalloc(1, sizeof(SInserterParam));
      if (NULL == pInserterParam) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      pInserterParam->readHandle = readHandle;
L
Liu Jicong 已提交
1026

D
dapan1121 已提交
1027 1028 1029
      *pParam = pInserterParam;
      break;
    }
D
dapan1121 已提交
1030
    case QUERY_NODE_PHYSICAL_PLAN_DELETE: {
1031
      SDeleterParam* pDeleterParam = taosMemoryCalloc(1, sizeof(SDeleterParam));
D
dapan1121 已提交
1032 1033 1034
      if (NULL == pDeleterParam) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
1035

X
Xiaoyu Wang 已提交
1036
      SArray*         pInfoList = getTableListInfo(pTask);
1037 1038 1039
      STableListInfo* pTableListInfo = taosArrayGetP(pInfoList, 0);
      taosArrayDestroy(pInfoList);

1040
      pDeleterParam->suid = tableListGetSuid(pTableListInfo);
H
Haojun Liao 已提交
1041 1042

      // TODO extract uid list
1043 1044
      int32_t numOfTables = tableListGetSize(pTableListInfo);
      pDeleterParam->pUidList = taosArrayInit(numOfTables, sizeof(uint64_t));
D
dapan1121 已提交
1045 1046 1047 1048
      if (NULL == pDeleterParam->pUidList) {
        taosMemoryFree(pDeleterParam);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
H
Haojun Liao 已提交
1049

1050
      for (int32_t i = 0; i < numOfTables; ++i) {
1051
        STableKeyInfo* pTable = tableListGetInfo(pTableListInfo, i);
D
dapan1121 已提交
1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064
        taosArrayPush(pDeleterParam->pUidList, &pTable->uid);
      }

      *pParam = pDeleterParam;
      break;
    }
    default:
      break;
  }

  return TSDB_CODE_SUCCESS;
}

1065 1066
int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId,
                     SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup) {
1067 1068 1069 1070 1071 1072
  SWinKey key = {
      .ts = win->skey,
      .groupId = tableGroupId,
  };
  char*   value = NULL;
  int32_t size = pAggSup->resultRowSize;
5
54liuyao 已提交
1073

1074
  if (streamStateAddIfNotExist(pState, &key, (void**)&value, &size) < 0) {
S
Shengliang Guan 已提交
1075
    return TSDB_CODE_OUT_OF_MEMORY;
1076 1077 1078 1079 1080 1081 1082 1083
  }
  *pResult = (SResultRow*)value;
  // set time window for current result
  (*pResult)->win = (*win);
  setResultRowInitCtx(*pResult, pCtx, numOfOutput, rowEntryInfoOffset);
  return TSDB_CODE_SUCCESS;
}

1084 1085
int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult) {
  streamStateReleaseBuf(pState, pKey, pResult);
1086 1087 1088
  return TSDB_CODE_SUCCESS;
}

1089 1090
int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize) {
  streamStatePut(pState, pKey, pResult, resSize);
1091 1092 1093
  return TSDB_CODE_SUCCESS;
}

1094
int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup,
1095
                                   SGroupResInfo* pGroupResInfo) {
1096
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
1097 1098 1099 1100 1101 1102 1103 1104
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  int32_t         numOfExprs = pSup->numOfExprs;
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
  SqlFunctionCtx* pCtx = pSup->pCtx;

  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);

  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
1105
    SWinKey* pKey = taosArrayGet(pGroupResInfo->pRows, i);
X
Xiaoyu Wang 已提交
1106 1107 1108
    int32_t  size = 0;
    void*    pVal = NULL;
    int32_t  code = streamStateGet(pState, pKey, &pVal, &size);
1109 1110 1111 1112 1113 1114
    ASSERT(code == 0);
    SResultRow* pRow = (SResultRow*)pVal;
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
    // no results, continue to check the next one
    if (pRow->numOfRows == 0) {
      pGroupResInfo->index += 1;
1115
      releaseOutputBuf(pState, pKey, pRow);
1116 1117 1118
      continue;
    }

H
Haojun Liao 已提交
1119
    if (pBlock->info.id.groupId == 0) {
1120
      pBlock->info.id.groupId = pKey->groupId;
1121
      void* tbname = NULL;
H
Haojun Liao 已提交
1122
      if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) {
L
Liu Jicong 已提交
1123
        pBlock->info.parTbName[0] = 0;
1124 1125
      } else {
        memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
1126
      }
1127
      tdbFree(tbname);
1128 1129
    } else {
      // current value belongs to different group, it can't be packed into one datablock
1130 1131
      if (pBlock->info.id.groupId != pKey->groupId) {
        releaseOutputBuf(pState, pKey, pRow);
1132 1133 1134 1135 1136 1137
        break;
      }
    }

    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
      ASSERT(pBlock->info.rows > 0);
1138
      releaseOutputBuf(pState, pKey, pRow);
1139 1140 1141 1142 1143 1144 1145 1146 1147 1148
      break;
    }

    pGroupResInfo->index += 1;

    for (int32_t j = 0; j < numOfExprs; ++j) {
      int32_t slotId = pExprInfo[j].base.resSchema.slotId;

      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
      if (pCtx[j].fpSet.finalize) {
1149 1150 1151 1152
        int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
        if (TAOS_FAILED(code1)) {
          qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code1));
          T_LONG_JMP(pTaskInfo->env, code1);
1153 1154 1155 1156 1157 1158 1159 1160 1161
        }
      } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
        // do nothing, todo refactor
      } else {
        // expand the result into multiple rows. E.g., _wstart, top(k, 20)
        // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
        SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
        char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
        for (int32_t k = 0; k < pRow->numOfRows; ++k) {
1162
          colDataSetVal(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
1163 1164 1165
        }
      }
    }
5
54liuyao 已提交
1166

1167
    pBlock->info.rows += pRow->numOfRows;
1168
    releaseOutputBuf(pState, pKey, pRow);
1169
  }
1170
  pBlock->info.dataLoad = 1;
1171 1172 1173
  blockDataUpdateTsWindow(pBlock, 0);
  return TSDB_CODE_SUCCESS;
}
5
54liuyao 已提交
1174 1175 1176 1177 1178 1179 1180

int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, int32_t size) {
  streamStateSessionPut(pState, key, (const void*)buf, size);
  releaseOutputBuf(pState, NULL, (SResultRow*)buf);
  return TSDB_CODE_SUCCESS;
}

1181
int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock,
5
54liuyao 已提交
1182
                                    SExprSupp* pSup, SGroupResInfo* pGroupResInfo) {
1183
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
5
54liuyao 已提交
1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  int32_t         numOfExprs = pSup->numOfExprs;
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
  SqlFunctionCtx* pCtx = pSup->pCtx;

  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);

  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
    SSessionKey* pKey = taosArrayGet(pGroupResInfo->pRows, i);
    int32_t      size = 0;
    void*        pVal = NULL;
    int32_t      code = streamStateSessionGet(pState, pKey, &pVal, &size);
    ASSERT(code == 0);
1197 1198
    if (code == -1) {
      // coverity scan
5
54liuyao 已提交
1199
      pGroupResInfo->index += 1;
1200 1201
      continue;
    }
5
54liuyao 已提交
1202 1203 1204 1205 1206 1207 1208 1209 1210
    SResultRow* pRow = (SResultRow*)pVal;
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
    // no results, continue to check the next one
    if (pRow->numOfRows == 0) {
      pGroupResInfo->index += 1;
      releaseOutputBuf(pState, NULL, pRow);
      continue;
    }

H
Haojun Liao 已提交
1211 1212
    if (pBlock->info.id.groupId == 0) {
      pBlock->info.id.groupId = pKey->groupId;
1213

1214
      void* tbname = NULL;
H
Haojun Liao 已提交
1215
      if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) {
1216
        pBlock->info.parTbName[0] = 0;
1217
      } else {
1218
        memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
1219
      }
1220
      tdbFree(tbname);
5
54liuyao 已提交
1221 1222
    } else {
      // current value belongs to different group, it can't be packed into one datablock
H
Haojun Liao 已提交
1223
      if (pBlock->info.id.groupId != pKey->groupId) {
5
54liuyao 已提交
1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254
        releaseOutputBuf(pState, NULL, pRow);
        break;
      }
    }

    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
      ASSERT(pBlock->info.rows > 0);
      releaseOutputBuf(pState, NULL, pRow);
      break;
    }

    pGroupResInfo->index += 1;

    for (int32_t j = 0; j < numOfExprs; ++j) {
      int32_t slotId = pExprInfo[j].base.resSchema.slotId;

      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
      if (pCtx[j].fpSet.finalize) {
        int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
        if (TAOS_FAILED(code1)) {
          qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code1));
          T_LONG_JMP(pTaskInfo->env, code1);
        }
      } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
        // do nothing, todo refactor
      } else {
        // expand the result into multiple rows. E.g., _wstart, top(k, 20)
        // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
        SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
        char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
        for (int32_t k = 0; k < pRow->numOfRows; ++k) {
1255
          colDataSetVal(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
5
54liuyao 已提交
1256 1257 1258 1259
        }
      }
    }

1260
    pBlock->info.dataLoad = 1;
5
54liuyao 已提交
1261 1262 1263 1264 1265 1266
    pBlock->info.rows += pRow->numOfRows;
    // saveSessionDiscBuf(pState, pKey, pVal, size);
    releaseOutputBuf(pState, NULL, pRow);
  }
  blockDataUpdateTsWindow(pBlock, 0);
  return TSDB_CODE_SUCCESS;
1267
}
L
Liu Jicong 已提交
1268 1269

void qStreamCloseTsdbReader(void* task) {
1270 1271 1272 1273
  if (task == NULL) {
    return;
  }

L
Liu Jicong 已提交
1274 1275
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)task;
  SOperatorInfo* pOp = pTaskInfo->pRoot;
1276

1277 1278
  qDebug("stream close tsdb reader, reset status uid:%" PRId64 " ts:%" PRId64, pTaskInfo->streamInfo.currentOffset.uid,
         pTaskInfo->streamInfo.currentOffset.ts);
1279 1280

  // todo refactor, other thread may already use this read to extract data.
1281
  pTaskInfo->streamInfo.currentOffset = (STqOffsetVal){0};
L
Liu Jicong 已提交
1282 1283 1284 1285 1286 1287
  while (pOp->numOfDownstream == 1 && pOp->pDownstream[0]) {
    SOperatorInfo* pDownstreamOp = pOp->pDownstream[0];
    if (pDownstreamOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
      SStreamScanInfo* pInfo = pDownstreamOp->info;
      if (pInfo->pTableScanOp) {
        STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
1288 1289

        setOperatorCompleted(pInfo->pTableScanOp);
X
Xiaoyu Wang 已提交
1290
        while (pTaskInfo->owner != 0) {
1291 1292 1293 1294
          taosMsleep(100);
          qDebug("wait for the reader stopping");
        }

L
Liu Jicong 已提交
1295 1296
        tsdbReaderClose(pTSInfo->base.dataReader);
        pTSInfo->base.dataReader = NULL;
1297 1298 1299 1300

        // restore the status, todo refactor.
        pInfo->pTableScanOp->status = OP_OPENED;
        pTaskInfo->status = TASK_NOT_COMPLETED;
L
Liu Jicong 已提交
1301 1302 1303 1304 1305
        return;
      }
    }
  }
}