executorInt.c 40.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
15

H
Haojun Liao 已提交
16 17
#include "filter.h"
#include "function.h"
18 19
#include "functionMgt.h"
#include "os.h"
H
Haojun Liao 已提交
20
#include "querynodes.h"
21
#include "tfill.h"
dengyihao's avatar
dengyihao 已提交
22
#include "tname.h"
23

H
Haojun Liao 已提交
24
#include "tdatablock.h"
H
Haojun Liao 已提交
25
#include "tmsg.h"
26
#include "ttime.h"
H
Haojun Liao 已提交
27

28
#include "executorInt.h"
dengyihao's avatar
dengyihao 已提交
29
#include "index.h"
30
#include "operator.h"
31
#include "query.h"
32
#include "querytask.h"
33
#include "tcompare.h"
H
Haojun Liao 已提交
34
#include "thash.h"
35
#include "ttypes.h"
36
#include "storageapi.h"
37

X
Xiaoyu Wang 已提交
38
#define SET_REVERSE_SCAN_FLAG(runtime)    ((runtime)->scanFlag = REVERSE_SCAN)
39 40 41 42
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)

#if 0
static UNUSED_FUNC void *u_malloc (size_t __size) {
wafwerar's avatar
wafwerar 已提交
43
  uint32_t v = taosRand();
44 45 46 47

  if (v % 1000 <= 0) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
48
    return taosMemoryMalloc(__size);
49 50 51 52
  }
}

static UNUSED_FUNC void* u_calloc(size_t num, size_t __size) {
wafwerar's avatar
wafwerar 已提交
53
  uint32_t v = taosRand();
54 55 56
  if (v % 1000 <= 0) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
57
    return taosMemoryCalloc(num, __size);
58 59 60 61
  }
}

static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
wafwerar's avatar
wafwerar 已提交
62
  uint32_t v = taosRand();
63 64 65
  if (v % 5 <= 1) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
66
    return taosMemoryRealloc(p, __size);
67 68 69 70 71 72 73 74
  }
}

#define calloc  u_calloc
#define malloc  u_malloc
#define realloc u_realloc
#endif

75
static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pBlock);
76

X
Xiaoyu Wang 已提交
77 78
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
79

H
Haojun Liao 已提交
80 81 82 83
static void    extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep,
                                                   int32_t status);
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
                                   bool createDummyCol);
H
Haojun Liao 已提交
84
static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
D
dapan1121 已提交
85
                                  SGroupResInfo* pGroupResInfo, int32_t threshold, bool ignoreGroup);
86

87
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize) {
L
Liu Jicong 已提交
88
  SFilePage* pData = NULL;
89 90 91

  // in the first scan, new space needed for results
  int32_t pageId = -1;
92
  if (*currentPageId == -1) {
93
    pData = getNewBufPage(pResultBuf, &pageId);
94 95
    pData->num = sizeof(SFilePage);
  } else {
96
    pData = getBufPage(pResultBuf, *currentPageId);
97 98 99 100 101
    if (pData == NULL) {
      qError("failed to get buffer, code:%s", tstrerror(terrno));
      return NULL;
    }

102
    pageId = *currentPageId;
103

wmmhello's avatar
wmmhello 已提交
104
    if (pData->num + interBufSize > getBufPageSize(pResultBuf)) {
105
      // release current page first, and prepare the next one
106
      releaseBufPage(pResultBuf, pData);
107

108
      pData = getNewBufPage(pResultBuf, &pageId);
109 110 111 112 113 114 115 116 117 118
      if (pData != NULL) {
        pData->num = sizeof(SFilePage);
      }
    }
  }

  if (pData == NULL) {
    return NULL;
  }

119 120
  setBufPageDirty(pData, true);

121 122
  // set the number of rows in current disk page
  SResultRow* pResultRow = (SResultRow*)((char*)pData + pData->num);
123

X
Xiaoyu Wang 已提交
124
  memset((char*)pResultRow, 0, interBufSize);
125 126 127
  pResultRow->pageId = pageId;
  pResultRow->offset = (int32_t)pData->num;

128
  *currentPageId = pageId;
wmmhello's avatar
wmmhello 已提交
129
  pData->num += interBufSize;
130 131 132
  return pResultRow;
}

133 134 135 136 137 138 139
/**
 * the struct of key in hash table
 * +----------+---------------+
 * | group id |   key data    |
 * | 8 bytes  | actual length |
 * +----------+---------------+
 */
140 141
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData,
                                   int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo,
D
dapan1121 已提交
142
                                   bool isIntervalQuery, SAggSupporter* pSup, bool keepGroup) {
143
  SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
D
dapan1121 已提交
144 145 146
  if (!keepGroup) {
    *(uint64_t*)pSup->keyBuf = calcGroupId(pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
  }
H
Haojun Liao 已提交
147

dengyihao's avatar
dengyihao 已提交
148
  SResultRowPosition* p1 =
149
      (SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
H
Haojun Liao 已提交
150

151 152
  SResultRow* pResult = NULL;

H
Haojun Liao 已提交
153 154
  // in case of repeat scan/reverse scan, no new time window added.
  if (isIntervalQuery) {
155
    if (p1 != NULL) {  // the *p1 may be NULL in case of sliding+offset exists.
156
      pResult = getResultRowByPos(pResultBuf, p1, true);
157 158 159 160
      if (NULL == pResult) {
        T_LONG_JMP(pTaskInfo->env, terrno);
      }

161
      ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
H
Haojun Liao 已提交
162 163
    }
  } else {
dengyihao's avatar
dengyihao 已提交
164 165
    // In case of group by column query, the required SResultRow object must be existInCurrentResusltRowInfo in the
    // pResultRowInfo object.
H
Haojun Liao 已提交
166
    if (p1 != NULL) {
167
      // todo
168
      pResult = getResultRowByPos(pResultBuf, p1, true);
169 170 171 172
      if (NULL == pResult) {
        T_LONG_JMP(pTaskInfo->env, terrno);
      }

173
      ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
H
Haojun Liao 已提交
174 175 176
    }
  }

L
Liu Jicong 已提交
177
  // 1. close current opened time window
178
  if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId))) {
179
    SResultRowPosition pos = pResultRowInfo->cur;
X
Xiaoyu Wang 已提交
180
    SFilePage*         pPage = getBufPage(pResultBuf, pos.pageId);
181 182 183 184
    if (pPage == NULL) {
      qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
      T_LONG_JMP(pTaskInfo->env, terrno);
    }
185 186 187 188 189
    releaseBufPage(pResultBuf, pPage);
  }

  // allocate a new buffer page
  if (pResult == NULL) {
190
    pResult = getNewResultRow(pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
191 192 193
    if (pResult == NULL) {
      T_LONG_JMP(pTaskInfo->env, terrno);
    }
194

195 196
    // add a new result set for a new group
    SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
197
    tSimpleHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos,
L
Liu Jicong 已提交
198
                   sizeof(SResultRowPosition));
H
Haojun Liao 已提交
199 200
  }

201 202 203
  // 2. set the new time window to be the new active time window
  pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};

H
Haojun Liao 已提交
204
  // too many time window in query
205
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH &&
206
      tSimpleHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) {
207
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
H
Haojun Liao 已提交
208 209
  }

H
Haojun Liao 已提交
210
  return pResult;
H
Haojun Liao 已提交
211 212
}

213
//  query_range_start, query_range_end, window_duration, window_start, window_end
214
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow) {
215 216 217
  pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP;
  pColData->info.bytes = sizeof(int64_t);

H
Haojun Liao 已提交
218
  colInfoDataEnsureCapacity(pColData, 5, false);
219 220
  colDataSetInt64(pColData, 0, &pQueryWindow->skey);
  colDataSetInt64(pColData, 1, &pQueryWindow->ekey);
221 222

  int64_t interval = 0;
223 224 225
  colDataSetInt64(pColData, 2, &interval);  // this value may be variable in case of 'n' and 'y'.
  colDataSetInt64(pColData, 3, &pQueryWindow->skey);
  colDataSetInt64(pColData, 4, &pQueryWindow->ekey);
226 227
}

G
Ganlin Zhao 已提交
228
static void doSetInputDataBlockInfo(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag) {
229 230
  SqlFunctionCtx* pCtx = pExprSup->pCtx;
  for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
231
    pCtx[i].order = order;
232
    pCtx[i].input.numOfRows = pBlock->info.rows;
233
    setBlockSMAInfo(&pCtx[i], &pExprSup->pExprInfo[i], pBlock);
234
    pCtx[i].pSrcBlock = pBlock;
G
Ganlin Zhao 已提交
235
    pCtx[i].scanFlag = scanFlag;
236 237 238
  }
}

239
void setInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol) {
240
  if (pBlock->pBlockAgg != NULL) {
G
Ganlin Zhao 已提交
241
    doSetInputDataBlockInfo(pExprSup, pBlock, order, scanFlag);
242
  } else {
243
    doSetInputDataBlock(pExprSup, pBlock, order, scanFlag, createDummyCol);
H
Haojun Liao 已提交
244
  }
245 246
}

L
Liu Jicong 已提交
247 248
static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t paramIndex,
                                             int32_t numOfRows) {
249 250 251 252 253 254 255 256
  SColumnInfoData* pColInfo = NULL;
  if (pInput->pData[paramIndex] == NULL) {
    pColInfo = taosMemoryCalloc(1, sizeof(SColumnInfoData));
    if (pColInfo == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

    // Set the correct column info (data type and bytes)
257 258
    pColInfo->info.type = pFuncParam->param.nType;
    pColInfo->info.bytes = pFuncParam->param.nLen;
259 260

    pInput->pData[paramIndex] = pColInfo;
261 262
  } else {
    pColInfo = pInput->pData[paramIndex];
263 264
  }

H
Haojun Liao 已提交
265
  colInfoDataEnsureCapacity(pColInfo, numOfRows, false);
266

267
  int8_t type = pFuncParam->param.nType;
268 269
  if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT) {
    int64_t v = pFuncParam->param.i;
dengyihao's avatar
dengyihao 已提交
270
    for (int32_t i = 0; i < numOfRows; ++i) {
271
      colDataSetInt64(pColInfo, i, &v);
272 273 274
    }
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
    double v = pFuncParam->param.d;
dengyihao's avatar
dengyihao 已提交
275
    for (int32_t i = 0; i < numOfRows; ++i) {
276
      colDataSetDouble(pColInfo, i, &v);
277
    }
D
Dingle Zhang 已提交
278
  } else if (type == TSDB_DATA_TYPE_VARCHAR || type == TSDB_DATA_TYPE_GEOMETRY) {
L
Liu Jicong 已提交
279
    char* tmp = taosMemoryMalloc(pFuncParam->param.nLen + VARSTR_HEADER_SIZE);
280
    STR_WITH_SIZE_TO_VARSTR(tmp, pFuncParam->param.pz, pFuncParam->param.nLen);
L
Liu Jicong 已提交
281
    for (int32_t i = 0; i < numOfRows; ++i) {
282
      colDataSetVal(pColInfo, i, tmp, false);
283
    }
H
Haojun Liao 已提交
284
    taosMemoryFree(tmp);
285 286 287 288 289
  }

  return TSDB_CODE_SUCCESS;
}

290 291
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
                                   bool createDummyCol) {
292
  int32_t         code = TSDB_CODE_SUCCESS;
293
  SqlFunctionCtx* pCtx = pExprSup->pCtx;
294

295
  for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
L
Liu Jicong 已提交
296
    pCtx[i].order = order;
297 298
    pCtx[i].input.numOfRows = pBlock->info.rows;

L
Liu Jicong 已提交
299
    pCtx[i].pSrcBlock = pBlock;
X
Xiaoyu Wang 已提交
300
    pCtx[i].scanFlag = scanFlag;
H
Haojun Liao 已提交
301

302
    SInputColumnInfoData* pInput = &pCtx[i].input;
H
Haojun Liao 已提交
303
    pInput->uid = pBlock->info.id.uid;
H
Haojun Liao 已提交
304
    pInput->colDataSMAIsSet = false;
305

306
    SExprInfo* pOneExpr = &pExprSup->pExprInfo[i];
307
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
dengyihao's avatar
dengyihao 已提交
308
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
G
Ganlin Zhao 已提交
309 310
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
        int32_t slotId = pFuncParam->pCol->slotId;
dengyihao's avatar
dengyihao 已提交
311
        pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
312 313 314
        pInput->totalRows = pBlock->info.rows;
        pInput->numOfRows = pBlock->info.rows;
        pInput->startRowIndex = 0;
315

316
        // NOTE: the last parameter is the primary timestamp column
H
Haojun Liao 已提交
317
        // todo: refactor this
318
        if (fmIsImplicitTsFunc(pCtx[i].functionId) && (j == pOneExpr->base.numOfParams - 1)) {
L
Liu Jicong 已提交
319
          pInput->pPTS = pInput->pData[j];  // in case of merge function, this is not always the ts column data.
320
        }
321 322
        ASSERT(pInput->pData[j] != NULL);
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
323 324 325
        // todo avoid case: top(k, 12), 12 is the value parameter.
        // sum(11), 11 is also the value parameter.
        if (createDummyCol && pOneExpr->base.numOfParams == 1) {
326 327 328 329
          pInput->totalRows = pBlock->info.rows;
          pInput->numOfRows = pBlock->info.rows;
          pInput->startRowIndex = 0;

330
          code = doCreateConstantValColumnInfo(pInput, pFuncParam, j, pBlock->info.rows);
331 332 333
          if (code != TSDB_CODE_SUCCESS) {
            return code;
          }
334
        }
G
Ganlin Zhao 已提交
335 336
      }
    }
H
Haojun Liao 已提交
337
  }
338 339

  return code;
H
Haojun Liao 已提交
340 341
}

5
54liuyao 已提交
342
bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
343
  struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
344

345 346 347 348 349
  // in case of timestamp column, always generated results.
  int32_t functionId = pCtx->functionId;
  if (functionId == -1) {
    return false;
  }
350

G
Ganlin Zhao 已提交
351
  if (pCtx->scanFlag == PRE_SCAN) {
352
    return fmIsRepeatScanFunc(pCtx->functionId);
353 354
  }

355 356
  if (isRowEntryCompleted(pResInfo)) {
    return false;
357 358
  }

359 360 361
  return true;
}

H
Haojun Liao 已提交
362
static int32_t doCreateConstantValColumnSMAInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t type,
363 364 365 366 367 368
                                                int32_t paramIndex, int32_t numOfRows) {
  if (pInput->pData[paramIndex] == NULL) {
    pInput->pData[paramIndex] = taosMemoryCalloc(1, sizeof(SColumnInfoData));
    if (pInput->pData[paramIndex] == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
369

370 371 372
    // Set the correct column info (data type and bytes)
    pInput->pData[paramIndex]->info.type = type;
    pInput->pData[paramIndex]->info.bytes = tDataTypes[type].bytes;
373
  }
H
Haojun Liao 已提交
374

375 376 377 378 379 380
  SColumnDataAgg* da = NULL;
  if (pInput->pColumnDataAgg[paramIndex] == NULL) {
    da = taosMemoryCalloc(1, sizeof(SColumnDataAgg));
    pInput->pColumnDataAgg[paramIndex] = da;
    if (da == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
381 382
    }
  } else {
383
    da = pInput->pColumnDataAgg[paramIndex];
384 385
  }

386 387
  if (type == TSDB_DATA_TYPE_BIGINT) {
    int64_t v = pFuncParam->param.i;
388
    *da = (SColumnDataAgg){.numOfNull = 0, .min = v, .max = v, .sum = v * numOfRows};
389 390
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
    double v = pFuncParam->param.d;
391
    *da = (SColumnDataAgg){.numOfNull = 0};
392

393 394 395 396 397 398
    *(double*)&da->min = v;
    *(double*)&da->max = v;
    *(double*)&da->sum = v * numOfRows;
  } else if (type == TSDB_DATA_TYPE_BOOL) {  // todo validate this data type
    bool v = pFuncParam->param.i;

399
    *da = (SColumnDataAgg){.numOfNull = 0};
400 401 402 403 404
    *(bool*)&da->min = 0;
    *(bool*)&da->max = v;
    *(bool*)&da->sum = v * numOfRows;
  } else if (type == TSDB_DATA_TYPE_TIMESTAMP) {
    // do nothing
405
  } else {
H
Haojun Liao 已提交
406
    qError("invalid constant type for sma info");
407 408
  }

409 410
  return TSDB_CODE_SUCCESS;
}
411

412
void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pBlock) {
413 414 415 416 417 418 419
  int32_t numOfRows = pBlock->info.rows;

  SInputColumnInfoData* pInput = &pCtx->input;
  pInput->numOfRows = numOfRows;
  pInput->totalRows = numOfRows;

  if (pBlock->pBlockAgg != NULL) {
H
Haojun Liao 已提交
420
    pInput->colDataSMAIsSet = true;
421

422 423
    for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
      SFunctParam* pFuncParam = &pExprInfo->base.pParam[j];
424

425 426
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
        int32_t slotId = pFuncParam->pCol->slotId;
427 428
        pInput->pColumnDataAgg[j] = pBlock->pBlockAgg[slotId];
        if (pInput->pColumnDataAgg[j] == NULL) {
H
Haojun Liao 已提交
429
          pInput->colDataSMAIsSet = false;
430
        }
431 432 433 434

        // Here we set the column info data since the data type for each column data is required, but
        // the data in the corresponding SColumnInfoData will not be used.
        pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
435
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
H
Haojun Liao 已提交
436
        doCreateConstantValColumnSMAInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows);
437 438
      }
    }
439
  } else {
H
Haojun Liao 已提交
440
    pInput->colDataSMAIsSet = false;
441 442 443 444
  }
}

/////////////////////////////////////////////////////////////////////////////////////////////
445
STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key) {
L
Liu Jicong 已提交
446
  STimeWindow win = {0};
447
  win.skey = taosTimeTruncate(key, pInterval, precision);
448 449

  /*
H
Haojun Liao 已提交
450
   * if the realSkey > INT64_MAX - pInterval->interval, the query duration between
451 452
   * realSkey and realEkey must be less than one interval.Therefore, no need to adjust the query ranges.
   */
453 454 455
  win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
  if (win.ekey < win.skey) {
    win.ekey = INT64_MAX;
456
  }
457 458

  return win;
459 460
}

461
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset) {
5
54liuyao 已提交
462
  bool init = false;
463
  for (int32_t i = 0; i < numOfOutput; ++i) {
464
    pCtx[i].resultInfo = getResultEntryInfo(pResult, i, rowEntryInfoOffset);
5
54liuyao 已提交
465 466 467
    if (init) {
      continue;
    }
468 469 470 471 472

    struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
    if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) {
      continue;
    }
473

474
    if (pCtx[i].isPseudoFunc) {
475 476 477
      continue;
    }

478 479 480 481 482 483
    if (!pResInfo->initialized) {
      if (pCtx[i].functionId != -1) {
        pCtx[i].fpSet.init(&pCtx[i], pResInfo);
      } else {
        pResInfo->initialized = true;
      }
5
54liuyao 已提交
484 485
    } else {
      init = true;
486 487 488 489
    }
  }
}

490 491 492 493 494 495 496 497 498 499 500 501 502 503
void clearResultRowInitFlag(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
  for (int32_t i = 0; i < numOfOutput; ++i) {
    SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
    if (pResInfo == NULL) {
      continue;
    }

    pResInfo->initialized = false;
    pResInfo->numOfRes = 0;
    pResInfo->isNullRes = 0;
    pResInfo->complete = false;
  }
}

H
Haojun Liao 已提交
504 505
void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo) {
  if (pFilterInfo == NULL || pBlock->info.rows == 0) {
S
shenglian zhou 已提交
506 507
    return;
  }
508

509
  SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
510
  int32_t            code = filterSetDataFromSlotId(pFilterInfo, &param1);
511

512
  SColumnInfoData* p = NULL;
513
  int32_t          status = 0;
H
Haojun Liao 已提交
514

515
  // todo the keep seems never to be True??
H
Haojun Liao 已提交
516
  bool keep = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status);
517
  extractQualifiedTupleByFilterResult(pBlock, p, keep, status);
H
Haojun Liao 已提交
518

519
  if (pColMatchInfo != NULL) {
520
    size_t size = taosArrayGetSize(pColMatchInfo->pList);
H
Haojun Liao 已提交
521
    for (int32_t i = 0; i < size; ++i) {
H
Haojun Liao 已提交
522
      SColMatchItem* pInfo = taosArrayGet(pColMatchInfo->pList, i);
523
      if (pInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
524
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pInfo->dstSlotId);
525
        if (pColData->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
H
Haojun Liao 已提交
526
          blockDataUpdateTsWindow(pBlock, pInfo->dstSlotId);
527 528 529 530 531 532
          break;
        }
      }
    }
  }

533 534
  colDataDestroy(p);
  taosMemoryFree(p);
535 536
}

537
void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep, int32_t status) {
538 539 540 541
  if (keep) {
    return;
  }

H
Haojun Liao 已提交
542
  int8_t* pIndicator = (int8_t*)p->pData;
H
Haojun Liao 已提交
543 544 545
  int32_t totalRows = pBlock->info.rows;

  if (status == FILTER_RESULT_ALL_QUALIFIED) {
546
    // here nothing needs to be done
H
Haojun Liao 已提交
547
  } else if (status == FILTER_RESULT_NONE_QUALIFIED) {
548
    pBlock->info.rows = 0;
H
Haojun Liao 已提交
549
  } else {
H
Haojun Liao 已提交
550
    int32_t bmLen = BitmapLen(totalRows);
H
Haojun Liao 已提交
551 552
    char*   pBitmap = NULL;
    int32_t maxRows = 0;
553

554 555
    size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
    for (int32_t i = 0; i < numOfCols; ++i) {
556
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
557
      // it is a reserved column for scalar function, and no data in this column yet.
H
Haojun Liao 已提交
558
      if (pDst->pData == NULL) {
559 560 561
        continue;
      }

562
      int32_t numOfRows = 0;
H
Haojun Liao 已提交
563 564
      if (IS_VAR_DATA_TYPE(pDst->info.type)) {
        int32_t j = 0;
565 566
        pDst->varmeta.length = 0;

X
Xiaoyu Wang 已提交
567
        while (j < totalRows) {
H
Haojun Liao 已提交
568
          if (pIndicator[j] == 0) {
569
            j += 1;
H
Haojun Liao 已提交
570 571
            continue;
          }
572

H
Haojun Liao 已提交
573 574 575
          if (colDataIsNull_var(pDst, j)) {
            colDataSetNull_var(pDst, numOfRows);
          } else {
S
slzhou 已提交
576
            // fix address sanitizer error. p1 may point to memory that will change during realloc of colDataSetVal, first copy it to p2
H
Haojun Liao 已提交
577
            char* p1 = colDataGetVarData(pDst, j);
578 579 580 581 582 583
            int32_t len = 0;
            if (pDst->info.type == TSDB_DATA_TYPE_JSON) {
              len = getJsonValueLen(p1);
            } else {
              len = varDataTLen(p1);
            }
S
slzhou 已提交
584
            char* p2 = taosMemoryMalloc(len);
585
            memcpy(p2, p1, len);
S
slzhou 已提交
586 587
            colDataSetVal(pDst, numOfRows, p2, false);
            taosMemoryFree(p2);
H
Haojun Liao 已提交
588
          }
H
Haojun Liao 已提交
589 590
          numOfRows += 1;
          j += 1;
D
dapan1121 已提交
591
        }
592

H
Haojun Liao 已提交
593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612
        if (maxRows < numOfRows) {
          maxRows = numOfRows;
        }
      } else {
        if (pBitmap == NULL) {
          pBitmap = taosMemoryCalloc(1, bmLen);
        }

        memcpy(pBitmap, pDst->nullbitmap, bmLen);
        memset(pDst->nullbitmap, 0, bmLen);

        int32_t j = 0;

        switch (pDst->info.type) {
          case TSDB_DATA_TYPE_BIGINT:
          case TSDB_DATA_TYPE_UBIGINT:
          case TSDB_DATA_TYPE_DOUBLE:
          case TSDB_DATA_TYPE_TIMESTAMP:
            while (j < totalRows) {
              if (pIndicator[j] == 0) {
H
Haojun Liao 已提交
613
                j += 1;
H
Haojun Liao 已提交
614 615 616 617 618 619 620
                continue;
              }

              if (colDataIsNull_f(pBitmap, j)) {
                colDataSetNull_f(pDst->nullbitmap, numOfRows);
              } else {
                ((int64_t*)pDst->pData)[numOfRows] = ((int64_t*)pDst->pData)[j];
H
Haojun Liao 已提交
621
              }
H
Haojun Liao 已提交
622 623 624 625 626 627 628 629 630
              numOfRows += 1;
              j += 1;
            }
            break;
          case TSDB_DATA_TYPE_FLOAT:
          case TSDB_DATA_TYPE_INT:
          case TSDB_DATA_TYPE_UINT:
            while (j < totalRows) {
              if (pIndicator[j] == 0) {
H
Haojun Liao 已提交
631
                j += 1;
H
Haojun Liao 已提交
632
                continue;
H
Haojun Liao 已提交
633
              }
H
Haojun Liao 已提交
634 635 636
              if (colDataIsNull_f(pBitmap, j)) {
                colDataSetNull_f(pDst->nullbitmap, numOfRows);
              } else {
H
Haojun Liao 已提交
637
                ((int32_t*)pDst->pData)[numOfRows] = ((int32_t*)pDst->pData)[j];
H
Haojun Liao 已提交
638
              }
H
Haojun Liao 已提交
639 640 641 642 643 644 645 646
              numOfRows += 1;
              j += 1;
            }
            break;
          case TSDB_DATA_TYPE_SMALLINT:
          case TSDB_DATA_TYPE_USMALLINT:
            while (j < totalRows) {
              if (pIndicator[j] == 0) {
H
Haojun Liao 已提交
647
                j += 1;
H
Haojun Liao 已提交
648
                continue;
H
Haojun Liao 已提交
649
              }
H
Haojun Liao 已提交
650 651 652
              if (colDataIsNull_f(pBitmap, j)) {
                colDataSetNull_f(pDst->nullbitmap, numOfRows);
              } else {
H
Haojun Liao 已提交
653
                ((int16_t*)pDst->pData)[numOfRows] = ((int16_t*)pDst->pData)[j];
H
Haojun Liao 已提交
654
              }
H
Haojun Liao 已提交
655 656 657 658 659 660 661 662 663
              numOfRows += 1;
              j += 1;
            }
            break;
          case TSDB_DATA_TYPE_BOOL:
          case TSDB_DATA_TYPE_TINYINT:
          case TSDB_DATA_TYPE_UTINYINT:
            while (j < totalRows) {
              if (pIndicator[j] == 0) {
H
Haojun Liao 已提交
664
                j += 1;
H
Haojun Liao 已提交
665
                continue;
H
Haojun Liao 已提交
666
              }
H
Haojun Liao 已提交
667 668 669 670 671 672 673 674 675
              if (colDataIsNull_f(pBitmap, j)) {
                colDataSetNull_f(pDst->nullbitmap, numOfRows);
              } else {
                ((int8_t*)pDst->pData)[numOfRows] = ((int8_t*)pDst->pData)[j];
              }
              numOfRows += 1;
              j += 1;
            }
            break;
D
dapan1121 已提交
676
        }
H
Haojun Liao 已提交
677
      }
678

H
Haojun Liao 已提交
679 680
      if (maxRows < numOfRows) {
        maxRows = numOfRows;
681
      }
682
    }
683

H
Haojun Liao 已提交
684
    pBlock->info.rows = maxRows;
H
Haojun Liao 已提交
685 686 687
    if (pBitmap != NULL) {
      taosMemoryFree(pBitmap);
    }
688 689 690
  }
}

H
Haojun Liao 已提交
691
void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset) {
692
  bool returnNotNull = false;
693
  for (int32_t j = 0; j < numOfExprs; ++j) {
694
    SResultRowEntryInfo* pResInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
695 696
    if (!isRowEntryInitialized(pResInfo)) {
      continue;
dengyihao's avatar
dengyihao 已提交
697
    } else {
698 699 700 701 702
    }

    if (pRow->numOfRows < pResInfo->numOfRes) {
      pRow->numOfRows = pResInfo->numOfRes;
    }
703

704
    if (pCtx[j].isNotNullFunc) {
705 706
      returnNotNull = true;
    }
707
  }
S
shenglian zhou 已提交
708 709
  // if all expr skips all blocks, e.g. all null inputs for max function, output one row in final result.
  //  except for first/last, which require not null output, output no rows
710
  if (pRow->numOfRows == 0 && !returnNotNull) {
711
    pRow->numOfRows = 1;
712 713 714
  }
}

H
Haojun Liao 已提交
715 716
void copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx,
                              SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo) {
717 718 719
  for (int32_t j = 0; j < numOfExprs; ++j) {
    int32_t slotId = pExprInfo[j].base.resSchema.slotId;

720
    pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
721
    if (pCtx[j].fpSet.finalize) {
722
      if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_group_key") == 0) {
723 724
        // for groupkey along with functions that output multiple lines(e.g. Histogram)
        // need to match groupkey result for each output row of that function.
725 726 727 728 729
        if (pCtx[j].resultInfo->numOfRes != 0) {
          pCtx[j].resultInfo->numOfRes = pRow->numOfRows;
        }
      }

730 731 732
      int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
      if (TAOS_FAILED(code)) {
        qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
733
        T_LONG_JMP(pTaskInfo->env, code);
734 735
      }
    } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
736
      // do nothing
737
    } else {
738 739
      // expand the result into multiple rows. E.g., _wstart, top(k, 20)
      // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
740 741 742
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
      char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
      for (int32_t k = 0; k < pRow->numOfRows; ++k) {
743
        colDataSetVal(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
744 745 746
      }
    }
  }
747 748
}

749 750 751
// todo refactor. SResultRow has direct pointer in miainfo
int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
                           SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
X
Xiaoyu Wang 已提交
752
  SFilePage* page = getBufPage(pBuf, resultRowPosition->pageId);
753 754 755 756 757
  if (page == NULL) {
    qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
    T_LONG_JMP(pTaskInfo->env, terrno);
  }

758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781
  SResultRow* pRow = (SResultRow*)((char*)page + resultRowPosition->offset);

  SqlFunctionCtx* pCtx = pSup->pCtx;
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  const int32_t*  rowEntryOffset = pSup->rowEntryInfoOffset;

  doUpdateNumOfRows(pCtx, pRow, pSup->numOfExprs, rowEntryOffset);
  if (pRow->numOfRows == 0) {
    releaseBufPage(pBuf, page);
    return 0;
  }

  int32_t size = pBlock->info.capacity;
  while (pBlock->info.rows + pRow->numOfRows > size) {
    size = size * 1.25;
  }

  int32_t code = blockDataEnsureCapacity(pBlock, size);
  if (TAOS_FAILED(code)) {
    releaseBufPage(pBuf, page);
    qError("%s ensure result data capacity failed, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
    T_LONG_JMP(pTaskInfo->env, code);
  }

H
Haojun Liao 已提交
782
  copyResultrowToDataBlock(pExprInfo, pSup->numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
783 784

  releaseBufPage(pBuf, page);
785
  pBlock->info.rows += pRow->numOfRows;
786 787 788
  return 0;
}

789
int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
D
dapan1121 已提交
790
                           SGroupResInfo* pGroupResInfo, int32_t threshold, bool ignoreGroup) {
791 792 793 794 795
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  int32_t         numOfExprs = pSup->numOfExprs;
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
  SqlFunctionCtx* pCtx = pSup->pCtx;

796
  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
797

798
  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
L
Liu Jicong 已提交
799 800
    SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
    SFilePage*  page = getBufPage(pBuf, pPos->pos.pageId);
801 802 803 804
    if (page == NULL) {
      qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
      T_LONG_JMP(pTaskInfo->env, terrno);
    }
805

806
    SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset);
807

H
Haojun Liao 已提交
808
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
809 810

    // no results, continue to check the next one
811 812
    if (pRow->numOfRows == 0) {
      pGroupResInfo->index += 1;
813
      releaseBufPage(pBuf, page);
814 815 816
      continue;
    }

D
dapan1121 已提交
817 818 819 820 821 822 823 824 825
    if (!ignoreGroup) {
      if (pBlock->info.id.groupId == 0) {
        pBlock->info.id.groupId = pPos->groupId;
      } else {
        // current value belongs to different group, it can't be packed into one datablock
        if (pBlock->info.id.groupId != pPos->groupId) {
          releaseBufPage(pBuf, page);
          break;
        }
826 827 828
      }
    }

829
    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
830
      uint32_t newSize = pBlock->info.rows + pRow->numOfRows + ((numOfRows - i) > 1 ? 1 : 0);
D
dapan1121 已提交
831
      blockDataEnsureCapacity(pBlock, newSize);
D
dapan1121 已提交
832
      qDebug("datablock capacity not sufficient, expand to required:%d, current capacity:%d, %s",
D
dapan1121 已提交
833
             newSize, pBlock->info.capacity, GET_TASKID(pTaskInfo));
dengyihao's avatar
dengyihao 已提交
834
      // todo set the pOperator->resultInfo size
835 836 837
    }

    pGroupResInfo->index += 1;
H
Haojun Liao 已提交
838
    copyResultrowToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
839

840
    releaseBufPage(pBuf, page);
841
    pBlock->info.rows += pRow->numOfRows;
842 843 844
    if (pBlock->info.rows >= threshold) {
      break;
    }
845 846
  }

D
dapan1121 已提交
847
  qDebug("%s result generated, rows:%" PRId64 ", groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
H
Haojun Liao 已提交
848
         pBlock->info.id.groupId);
H
Haojun Liao 已提交
849
  pBlock->info.dataLoad = 1;
850
  blockDataUpdateTsWindow(pBlock, 0);
851 852 853
  return 0;
}

854 855 856
void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
                           SDiskbasedBuf* pBuf) {
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
857 858
  SStorageAPI* pAPI = &pTaskInfo->storageAPI;

859 860 861 862 863 864 865 866 867 868 869
  SSDataBlock*   pBlock = pbInfo->pRes;

  // set output datablock version
  pBlock->info.version = pTaskInfo->version;

  blockDataCleanup(pBlock);
  if (!hasRemainResults(pGroupResInfo)) {
    return;
  }

  // clear the existed group id
H
Haojun Liao 已提交
870
  pBlock->info.id.groupId = 0;
871
  ASSERT(!pbInfo->mergeResultBlock);
D
dapan1121 已提交
872
  doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold, false);
873

874
  void* tbname = NULL;
875
  if (pAPI->stateStore.streamStateGetParName((void*)pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) {
876 877 878
    pBlock->info.parTbName[0] = 0;
  } else {
    memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
879
  }
880 881

  pAPI->stateStore.streamStateFreeVal(tbname);
882 883
}

X
Xiaoyu Wang 已提交
884 885
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
                            SDiskbasedBuf* pBuf) {
886
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
887
  SSDataBlock*   pBlock = pbInfo->pRes;
888

889 890 891
  // set output datablock version
  pBlock->info.version = pTaskInfo->version;

892
  blockDataCleanup(pBlock);
893
  if (!hasRemainResults(pGroupResInfo)) {
894 895 896
    return;
  }

897
  // clear the existed group id
H
Haojun Liao 已提交
898
  pBlock->info.id.groupId = 0;
899
  if (!pbInfo->mergeResultBlock) {
D
dapan1121 已提交
900
    doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold, false);
901
  } else {
dengyihao's avatar
dengyihao 已提交
902
    while (hasRemainResults(pGroupResInfo)) {
D
dapan1121 已提交
903
      doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold, true);
904 905
      if (pBlock->info.rows >= pOperator->resultInfo.threshold) {
        break;
906 907
      }

908
      // clearing group id to continue to merge data that belong to different groups
H
Haojun Liao 已提交
909
      pBlock->info.id.groupId = 0;
910
    }
911 912

    // clear the group id info in SSDataBlock, since the client does not need it
H
Haojun Liao 已提交
913
    pBlock->info.id.groupId = 0;
914 915 916
  }
}

917
void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
C
Cary Xu 已提交
918 919 920 921 922
  for (int32_t i = 0; i < numOfExprs; ++i) {
    SExprInfo* pExprInfo = &pExpr[i];
    for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
      if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_COLUMN) {
        taosMemoryFreeClear(pExprInfo->base.pParam[j].pCol);
5
54liuyao 已提交
923 924
      } else if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_VALUE) {
        taosVariantDestroy(&pExprInfo->base.pParam[j].param);
H
Haojun Liao 已提交
925
      }
926
    }
C
Cary Xu 已提交
927 928 929

    taosMemoryFree(pExprInfo->base.pParam);
    taosMemoryFree(pExprInfo->pExpr);
H
Haojun Liao 已提交
930 931 932
  }
}

933 934 935 936 937 938
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz) {
  *defaultPgsz = 4096;
  while (*defaultPgsz < rowSize * 4) {
    *defaultPgsz <<= 1u;
  }

939
  // The default buffer for each operator in query is 10MB.
940
  // at least four pages need to be in buffer
941 942
  // TODO: make this variable to be configurable.
  *defaultBufsz = 4096 * 2560;
943 944 945 946 947 948 949
  if ((*defaultBufsz) <= (*defaultPgsz)) {
    (*defaultBufsz) = (*defaultPgsz) * 4;
  }

  return 0;
}

L
Liu Jicong 已提交
950
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows) {
H
Haojun Liao 已提交
951 952 953 954
  if (numOfRows == 0) {
    numOfRows = 4096;
  }

955 956
  pResultInfo->capacity = numOfRows;
  pResultInfo->threshold = numOfRows * 0.75;
957

958 959
  if (pResultInfo->threshold == 0) {
    pResultInfo->threshold = numOfRows;
960 961 962
  }
}

963 964 965 966 967
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock) {
  pInfo->pRes = pBlock;
  initResultRowInfo(&pInfo->resultRowInfo);
}

968
static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
969 970 971 972 973 974 975 976 977 978
  if (pCtx == NULL) {
    return NULL;
  }

  for (int32_t i = 0; i < numOfOutput; ++i) {
    for (int32_t j = 0; j < pCtx[i].numOfParams; ++j) {
      taosVariantDestroy(&pCtx[i].param[j].param);
    }

    taosMemoryFreeClear(pCtx[i].subsidiaries.pCtx);
979
    taosMemoryFreeClear(pCtx[i].subsidiaries.buf);
980 981
    taosMemoryFree(pCtx[i].input.pData);
    taosMemoryFree(pCtx[i].input.pColumnDataAgg);
982 983 984 985

    if (pCtx[i].udfName != NULL) {
      taosMemoryFree(pCtx[i].udfName);
    }
986 987 988 989 990 991
  }

  taosMemoryFreeClear(pCtx);
  return NULL;
}

992
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr, SFunctionStateStore* pStore) {
993 994 995
  pSup->pExprInfo = pExprInfo;
  pSup->numOfExprs = numOfExpr;
  if (pSup->pExprInfo != NULL) {
996
    pSup->pCtx = createSqlFunctionCtx(pExprInfo, numOfExpr, &pSup->rowEntryInfoOffset, pStore);
997 998 999
    if (pSup->pCtx == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
1000
  }
1001 1002

  return TSDB_CODE_SUCCESS;
1003 1004
}

1005 1006 1007 1008
void cleanupExprSupp(SExprSupp* pSupp) {
  destroySqlFunctionCtx(pSupp->pCtx, pSupp->numOfExprs);
  if (pSupp->pExprInfo != NULL) {
    destroyExprInfo(pSupp->pExprInfo, pSupp->numOfExprs);
C
Cary Xu 已提交
1009
    taosMemoryFreeClear(pSupp->pExprInfo);
1010
  }
H
Haojun Liao 已提交
1011 1012 1013 1014 1015 1016

  if (pSupp->pFilterInfo != NULL) {
    filterFreeInfo(pSupp->pFilterInfo);
    pSupp->pFilterInfo = NULL;
  }

1017 1018 1019
  taosMemoryFree(pSupp->rowEntryInfoOffset);
}

X
Xiaoyu Wang 已提交
1020
void cleanupBasicInfo(SOptrBasicInfo* pInfo) { pInfo->pRes = blockDataDestroy(pInfo->pRes); }
1021

1022
bool groupbyTbname(SNodeList* pGroupList) {
1023
  bool bytbname = false;
1024
  if (LIST_LENGTH(pGroupList) == 1) {
1025 1026 1027 1028 1029 1030 1031 1032 1033 1034
    SNode* p = nodesListGetNode(pGroupList, 0);
    if (p->type == QUERY_NODE_FUNCTION) {
      // partition by tbname/group by tbname
      bytbname = (strcmp(((struct SFunctionNode*)p)->functionName, "tbname") == 0);
    }
  }

  return bytbname;
}

1035
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, SExecTaskInfo* pTask, SReadHandle* readHandle) {
D
dapan1121 已提交
1036
  switch (pNode->type) {
D
dapan1121 已提交
1037 1038 1039 1040 1041 1042
    case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: {
      SInserterParam* pInserterParam = taosMemoryCalloc(1, sizeof(SInserterParam));
      if (NULL == pInserterParam) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      pInserterParam->readHandle = readHandle;
L
Liu Jicong 已提交
1043

D
dapan1121 已提交
1044 1045 1046
      *pParam = pInserterParam;
      break;
    }
D
dapan1121 已提交
1047
    case QUERY_NODE_PHYSICAL_PLAN_DELETE: {
1048
      SDeleterParam* pDeleterParam = taosMemoryCalloc(1, sizeof(SDeleterParam));
D
dapan1121 已提交
1049 1050 1051
      if (NULL == pDeleterParam) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
1052

X
Xiaoyu Wang 已提交
1053
      SArray*         pInfoList = getTableListInfo(pTask);
1054 1055 1056
      STableListInfo* pTableListInfo = taosArrayGetP(pInfoList, 0);
      taosArrayDestroy(pInfoList);

1057
      pDeleterParam->suid = tableListGetSuid(pTableListInfo);
H
Haojun Liao 已提交
1058 1059

      // TODO extract uid list
1060 1061
      int32_t numOfTables = tableListGetSize(pTableListInfo);
      pDeleterParam->pUidList = taosArrayInit(numOfTables, sizeof(uint64_t));
D
dapan1121 已提交
1062 1063 1064 1065
      if (NULL == pDeleterParam->pUidList) {
        taosMemoryFree(pDeleterParam);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
H
Haojun Liao 已提交
1066

1067
      for (int32_t i = 0; i < numOfTables; ++i) {
1068
        STableKeyInfo* pTable = tableListGetInfo(pTableListInfo, i);
D
dapan1121 已提交
1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081
        taosArrayPush(pDeleterParam->pUidList, &pTable->uid);
      }

      *pParam = pDeleterParam;
      break;
    }
    default:
      break;
  }

  return TSDB_CODE_SUCCESS;
}

1082 1083
int32_t releaseOutputBuf(void* pState, SWinKey* pKey, SResultRow* pResult, SStateStore* pAPI) {
  pAPI->streamStateReleaseBuf(pState, pKey, pResult);
dengyihao's avatar
dengyihao 已提交
1084 1085
  return TSDB_CODE_SUCCESS;
}
1086

1087 1088 1089
int32_t saveSessionDiscBuf(void* pState, SSessionKey* key, void* buf, int32_t size, SStateStore* pAPI) {
  pAPI->streamStateSessionPut(pState, key, (const void*)buf, size);
  releaseOutputBuf(pState, NULL, (SResultRow*)buf, pAPI);
1090 1091 1092
  return TSDB_CODE_SUCCESS;
}

1093
int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDataBlock* pBlock,
5
54liuyao 已提交
1094
                                    SExprSupp* pSup, SGroupResInfo* pGroupResInfo) {
1095
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
1096 1097
  SStorageAPI* pAPI = &pTaskInfo->storageAPI;

5
54liuyao 已提交
1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  int32_t         numOfExprs = pSup->numOfExprs;
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
  SqlFunctionCtx* pCtx = pSup->pCtx;

  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);

  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
    SSessionKey* pKey = taosArrayGet(pGroupResInfo->pRows, i);
    int32_t      size = 0;
    void*        pVal = NULL;
1109
    int32_t      code = pAPI->stateStore.streamStateSessionGet(pState, pKey, &pVal, &size);
5
54liuyao 已提交
1110
    ASSERT(code == 0);
1111 1112
    if (code == -1) {
      // coverity scan
5
54liuyao 已提交
1113
      pGroupResInfo->index += 1;
1114 1115
      continue;
    }
5
54liuyao 已提交
1116 1117 1118 1119 1120
    SResultRow* pRow = (SResultRow*)pVal;
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
    // no results, continue to check the next one
    if (pRow->numOfRows == 0) {
      pGroupResInfo->index += 1;
1121
      releaseOutputBuf(pState, NULL, pRow, &pAPI->stateStore);
5
54liuyao 已提交
1122 1123 1124
      continue;
    }

H
Haojun Liao 已提交
1125 1126
    if (pBlock->info.id.groupId == 0) {
      pBlock->info.id.groupId = pKey->groupId;
1127

1128
      void* tbname = NULL;
1129
      if (pAPI->stateStore.streamStateGetParName((void*)pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) {
1130
        pBlock->info.parTbName[0] = 0;
1131
      } else {
1132
        memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
1133
      }
1134
      pAPI->stateStore.streamStateFreeVal(tbname);
5
54liuyao 已提交
1135 1136
    } else {
      // current value belongs to different group, it can't be packed into one datablock
H
Haojun Liao 已提交
1137
      if (pBlock->info.id.groupId != pKey->groupId) {
1138
        releaseOutputBuf(pState, NULL, pRow, &pAPI->stateStore);
5
54liuyao 已提交
1139 1140 1141 1142 1143 1144
        break;
      }
    }

    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
      ASSERT(pBlock->info.rows > 0);
1145
      releaseOutputBuf(pState, NULL, pRow, &pAPI->stateStore);
5
54liuyao 已提交
1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168
      break;
    }

    pGroupResInfo->index += 1;

    for (int32_t j = 0; j < numOfExprs; ++j) {
      int32_t slotId = pExprInfo[j].base.resSchema.slotId;

      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
      if (pCtx[j].fpSet.finalize) {
        int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
        if (TAOS_FAILED(code1)) {
          qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code1));
          T_LONG_JMP(pTaskInfo->env, code1);
        }
      } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
        // do nothing, todo refactor
      } else {
        // expand the result into multiple rows. E.g., _wstart, top(k, 20)
        // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
        SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
        char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
        for (int32_t k = 0; k < pRow->numOfRows; ++k) {
1169
          colDataSetVal(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
5
54liuyao 已提交
1170 1171 1172 1173
        }
      }
    }

1174
    pBlock->info.dataLoad = 1;
5
54liuyao 已提交
1175
    pBlock->info.rows += pRow->numOfRows;
1176
    releaseOutputBuf(pState, NULL, pRow, &pAPI->stateStore);
5
54liuyao 已提交
1177 1178 1179
  }
  blockDataUpdateTsWindow(pBlock, 0);
  return TSDB_CODE_SUCCESS;
1180
}
L
Liu Jicong 已提交
1181 1182

void qStreamCloseTsdbReader(void* task) {
D
dapan1121 已提交
1183 1184 1185 1186
  if (task == NULL) {
    return;
  }

L
Liu Jicong 已提交
1187 1188
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)task;
  SOperatorInfo* pOp = pTaskInfo->pRoot;
D
dapan1121 已提交
1189

1190 1191
  qDebug("stream close tsdb reader, reset status uid:%" PRId64 " ts:%" PRId64, pTaskInfo->streamInfo.currentOffset.uid,
         pTaskInfo->streamInfo.currentOffset.ts);
D
dapan1121 已提交
1192 1193

  // todo refactor, other thread may already use this read to extract data.
1194
  pTaskInfo->streamInfo.currentOffset = (STqOffsetVal){0};
L
Liu Jicong 已提交
1195 1196 1197 1198 1199 1200
  while (pOp->numOfDownstream == 1 && pOp->pDownstream[0]) {
    SOperatorInfo* pDownstreamOp = pOp->pDownstream[0];
    if (pDownstreamOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
      SStreamScanInfo* pInfo = pDownstreamOp->info;
      if (pInfo->pTableScanOp) {
        STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
D
dapan1121 已提交
1201 1202

        setOperatorCompleted(pInfo->pTableScanOp);
X
Xiaoyu Wang 已提交
1203
        while (pTaskInfo->owner != 0) {
D
dapan1121 已提交
1204 1205 1206 1207
          taosMsleep(100);
          qDebug("wait for the reader stopping");
        }

1208
        pTaskInfo->storageAPI.tsdReader.tsdReaderClose(pTSInfo->base.dataReader);
L
Liu Jicong 已提交
1209
        pTSInfo->base.dataReader = NULL;
D
dapan1121 已提交
1210 1211 1212 1213

        // restore the status, todo refactor.
        pInfo->pTableScanOp->status = OP_OPENED;
        pTaskInfo->status = TASK_NOT_COMPLETED;
L
Liu Jicong 已提交
1214 1215 1216 1217 1218
        return;
      }
    }
  }
}