groupoperator.c 47.4 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "filter.h"
H
Haojun Liao 已提交
17
#include "function.h"
18
#include "os.h"
H
Haojun Liao 已提交
19 20 21 22 23
#include "tname.h"

#include "tdatablock.h"
#include "tmsg.h"

24
#include "executorInt.h"
H
Haojun Liao 已提交
25 26 27 28
#include "executorimpl.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"
29 30
#include "operator.h"
#include "querytask.h"
H
Haojun Liao 已提交
31

H
Haojun Liao 已提交
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
typedef struct SGroupbyOperatorInfo {
  SOptrBasicInfo binfo;
  SAggSupporter  aggSup;
  SArray*        pGroupCols;     // group by columns, SArray<SColumn>
  SArray*        pGroupColVals;  // current group column values, SArray<SGroupKeys>
  bool           isInit;         // denote if current val is initialized or not
  char*          keyBuf;         // group by keys for hash
  int32_t        groupKeyLen;    // total group by column width
  SGroupResInfo  groupResInfo;
  SExprSupp      scalarSup;
} SGroupbyOperatorInfo;

// The sort in partition may be needed later.
typedef struct SPartitionOperatorInfo {
  SOptrBasicInfo binfo;
  SArray*        pGroupCols;
  SArray*        pGroupColVals;  // current group column values, SArray<SGroupKeys>
  char*          keyBuf;         // group by keys for hash
  int32_t        groupKeyLen;    // total group by column width
  SHashObj*      pGroupSet;      // quick locate the window object for each result

  SDiskbasedBuf* pBuf;              // query result buffer based on blocked-wised disk file
  int32_t        rowCapacity;       // maximum number of rows for each buffer page
  int32_t*       columnOffset;      // start position for each column data
  SArray*        sortedGroupArray;  // SDataGroupInfo sorted by group id
  int32_t        groupIndex;        // group index
  int32_t        pageIndex;         // page index of current group
  SExprSupp      scalarSup;
} SPartitionOperatorInfo;

62
static void*    getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len);
H
Haojun Liao 已提交
63
static int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity);
64 65
static int32_t  setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData,
                                        int16_t bytes, uint64_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup);
H
Haojun Liao 已提交
66
static SArray*  extractColumnInfo(SNodeList* pNodeList);
H
Haojun Liao 已提交
67

H
Haojun Liao 已提交
68
static void freeGroupKey(void* param) {
69
  SGroupKeys* pKey = (SGroupKeys*)param;
H
Haojun Liao 已提交
70 71 72
  taosMemoryFree(pKey->pData);
}

73
static void destroyGroupOperatorInfo(void* param) {
H
Haojun Liao 已提交
74
  SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param;
75 76 77 78
  if (pInfo == NULL) {
    return;
  }

79
  cleanupBasicInfo(&pInfo->binfo);
H
Haojun Liao 已提交
80 81
  taosMemoryFreeClear(pInfo->keyBuf);
  taosArrayDestroy(pInfo->pGroupCols);
H
Haojun Liao 已提交
82
  taosArrayDestroyEx(pInfo->pGroupColVals, freeGroupKey);
83
  cleanupExprSupp(&pInfo->scalarSup);
H
Haojun Liao 已提交
84 85 86

  cleanupGroupResInfo(&pInfo->groupResInfo);
  cleanupAggSup(&pInfo->aggSup);
D
dapan1121 已提交
87
  taosMemoryFreeClear(param);
H
Haojun Liao 已提交
88 89
}

wmmhello's avatar
wmmhello 已提交
90
static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) {
H
Haojun Liao 已提交
91 92
  *pGroupColVals = taosArrayInit(4, sizeof(SGroupKeys));
  if ((*pGroupColVals) == NULL) {
H
Haojun Liao 已提交
93 94 95 96 97
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  int32_t numOfGroupCols = taosArrayGetSize(pGroupColList);
  for (int32_t i = 0; i < numOfGroupCols; ++i) {
5
54liuyao 已提交
98
    SColumn* pCol = (SColumn*)taosArrayGet(pGroupColList, i);
99
    (*keyLen) += pCol->bytes;  // actual data + null_flag
H
Haojun Liao 已提交
100

101
    SGroupKeys key = {0};
102 103
    key.bytes = pCol->bytes;
    key.type = pCol->type;
H
Haojun Liao 已提交
104
    key.isNull = false;
105
    key.pData = taosMemoryCalloc(1, pCol->bytes);
H
Haojun Liao 已提交
106 107 108 109
    if (key.pData == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

H
Haojun Liao 已提交
110
    taosArrayPush((*pGroupColVals), &key);
H
Haojun Liao 已提交
111 112 113
  }

  int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols;
114
  (*keyLen) += nullFlagSize;
H
Haojun Liao 已提交
115

116
  (*keyBuf) = taosMemoryCalloc(1, (*keyLen));
H
Haojun Liao 已提交
117
  if ((*keyBuf) == NULL) {
H
Haojun Liao 已提交
118 119 120 121 122 123
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  return TSDB_CODE_SUCCESS;
}

124 125
static bool groupKeyCompare(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex,
                            int32_t numOfGroupCols) {
H
Haojun Liao 已提交
126 127
  SColumnDataAgg* pColAgg = NULL;
  for (int32_t i = 0; i < numOfGroupCols; ++i) {
H
Haojun Liao 已提交
128
    SColumn*         pCol = taosArrayGet(pGroupCols, i);
H
Haojun Liao 已提交
129 130
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
    if (pBlock->pBlockAgg != NULL) {
131
      pColAgg = pBlock->pBlockAgg[pCol->slotId];  // TODO is agg data matched?
H
Haojun Liao 已提交
132 133 134 135
    }

    bool isNull = colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg);

H
Haojun Liao 已提交
136
    SGroupKeys* pkey = taosArrayGet(pGroupColVals, i);
H
Haojun Liao 已提交
137 138 139 140 141 142 143 144 145 146
    if (pkey->isNull && isNull) {
      continue;
    }

    if (isNull || pkey->isNull) {
      return false;
    }

    char* val = colDataGetData(pColInfoData, rowIndex);

wmmhello's avatar
wmmhello 已提交
147 148 149
    if (pkey->type == TSDB_DATA_TYPE_JSON) {
      int32_t dataLen = getJsonValueLen(val);

150
      if (memcmp(pkey->pData, val, dataLen) == 0) {
wmmhello's avatar
wmmhello 已提交
151 152 153 154 155
        continue;
      } else {
        return false;
      }
    } else if (IS_VAR_DATA_TYPE(pkey->type)) {
H
Haojun Liao 已提交
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
      int32_t len = varDataLen(val);
      if (len == varDataLen(pkey->pData) && memcmp(varDataVal(pkey->pData), varDataVal(val), len) == 0) {
        continue;
      } else {
        return false;
      }
    } else {
      if (memcmp(pkey->pData, val, pkey->bytes) != 0) {
        return false;
      }
    }
  }

  return true;
}

wmmhello's avatar
wmmhello 已提交
172
static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex) {
H
Haojun Liao 已提交
173 174
  SColumnDataAgg* pColAgg = NULL;

175 176
  size_t numOfGroupCols = taosArrayGetSize(pGroupCols);

H
Haojun Liao 已提交
177
  for (int32_t i = 0; i < numOfGroupCols; ++i) {
H
Haojun Liao 已提交
178
    SColumn*         pCol = (SColumn*) taosArrayGet(pGroupCols, i);
H
Haojun Liao 已提交
179 180
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);

H
Haojun Liao 已提交
181 182 183 184 185
    // valid range check. todo: return error code.
    if (pCol->slotId > taosArrayGetSize(pBlock->pDataBlock)) {
      continue;
    }

H
Haojun Liao 已提交
186
    if (pBlock->pBlockAgg != NULL) {
187
      pColAgg = pBlock->pBlockAgg[pCol->slotId];  // TODO is agg data matched?
H
Haojun Liao 已提交
188 189
    }

H
Haojun Liao 已提交
190
    SGroupKeys* pkey = taosArrayGet(pGroupColVals, i);
H
Haojun Liao 已提交
191 192 193
    if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
      pkey->isNull = true;
    } else {
194
      pkey->isNull = false;
H
Haojun Liao 已提交
195
      char* val = colDataGetData(pColInfoData, rowIndex);
wmmhello's avatar
wmmhello 已提交
196
      if (pkey->type == TSDB_DATA_TYPE_JSON) {
197
        if (tTagIsJson(val)) {
wmmhello's avatar
wmmhello 已提交
198 199 200
          terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
          return;
        }
wmmhello's avatar
wmmhello 已提交
201 202 203
        int32_t dataLen = getJsonValueLen(val);
        memcpy(pkey->pData, val, dataLen);
      } else if (IS_VAR_DATA_TYPE(pkey->type)) {
H
Haojun Liao 已提交
204
        memcpy(pkey->pData, val, varDataTLen(val));
205
        ASSERT(varDataTLen(val) <= pkey->bytes);
H
Haojun Liao 已提交
206 207 208 209 210 211 212
      } else {
        memcpy(pkey->pData, val, pkey->bytes);
      }
    }
  }
}

wmmhello's avatar
wmmhello 已提交
213
static int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) {
H
Haojun Liao 已提交
214 215 216 217 218 219 220 221 222 223 224 225
  size_t numOfGroupCols = taosArrayGetSize(pGroupColVals);

  char* isNull = (char*)pKey;
  char* pStart = (char*)pKey + sizeof(int8_t) * numOfGroupCols;
  for (int32_t i = 0; i < numOfGroupCols; ++i) {
    SGroupKeys* pkey = taosArrayGet(pGroupColVals, i);
    if (pkey->isNull) {
      isNull[i] = 1;
      continue;
    }

    isNull[i] = 0;
wmmhello's avatar
wmmhello 已提交
226 227 228 229 230
    if (pkey->type == TSDB_DATA_TYPE_JSON) {
      int32_t dataLen = getJsonValueLen(pkey->pData);
      memcpy(pStart, (pkey->pData), dataLen);
      pStart += dataLen;
    } else if (IS_VAR_DATA_TYPE(pkey->type)) {
H
Haojun Liao 已提交
231 232
      varDataCopy(pStart, pkey->pData);
      pStart += varDataTLen(pkey->pData);
233
      ASSERT(varDataTLen(pkey->pData) <= pkey->bytes);
H
Haojun Liao 已提交
234 235 236 237 238 239
    } else {
      memcpy(pStart, pkey->pData, pkey->bytes);
      pStart += pkey->bytes;
    }
  }

240
  return (int32_t)(pStart - (char*)pKey);
H
Haojun Liao 已提交
241 242 243 244 245
}

// assign the group keys or user input constant values if required
static void doAssignGroupKeys(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t totalRows, int32_t rowIndex) {
  for (int32_t i = 0; i < numOfOutput; ++i) {
246
    if (pCtx[i].functionId == -1) {  // select count(*),key from t group by key.
H
Haojun Liao 已提交
247 248 249
      SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[i]);

      SColumnInfoData* pColInfoData = pCtx[i].input.pData[0];
250
      // todo OPT all/all not NULL
H
Haojun Liao 已提交
251 252 253 254
      if (!colDataIsNull(pColInfoData, totalRows, rowIndex, NULL)) {
        char* dest = GET_ROWCELL_INTERBUF(pEntryInfo);
        char* data = colDataGetData(pColInfoData, rowIndex);

wmmhello's avatar
wmmhello 已提交
255 256 257 258
        if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
          int32_t dataLen = getJsonValueLen(data);
          memcpy(dest, data, dataLen);
        } else if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
259 260 261 262
          varDataCopy(dest, data);
        } else {
          memcpy(dest, data, pColInfoData->info.bytes);
        }
263
      } else {  // it is a NULL value
H
Haojun Liao 已提交
264
        pEntryInfo->isNullRes = 1;
H
Haojun Liao 已提交
265
      }
H
Haojun Liao 已提交
266 267

      pEntryInfo->numOfRes = 1;
H
Haojun Liao 已提交
268 269 270 271 272 273 274 275
    }
  }
}

static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
  SExecTaskInfo*        pTaskInfo = pOperator->pTaskInfo;
  SGroupbyOperatorInfo* pInfo = pOperator->info;

276
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
H
Haojun Liao 已提交
277 278 279 280 281 282
  int32_t         numOfGroupCols = taosArrayGetSize(pInfo->pGroupCols);
  //  if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) {
  // qError("QInfo:0x%"PRIx64" group by not supported on double/float columns, abort", GET_TASKID(pRuntimeEnv));
  //    return;
  //  }

H
Haojun Liao 已提交
283
  int32_t len = 0;
wmmhello's avatar
wmmhello 已提交
284
  terrno = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
285

H
Haojun Liao 已提交
286
  int32_t num = 0;
D
dapan1121 已提交
287
  uint64_t groupId = 0;
H
Haojun Liao 已提交
288 289 290
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
    // Compare with the previous row of this column, and do not set the output buffer again if they are identical.
    if (!pInfo->isInit) {
291
      recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
wmmhello's avatar
wmmhello 已提交
292
      if (terrno != TSDB_CODE_SUCCESS) {  // group by json error
293
        T_LONG_JMP(pTaskInfo->env, terrno);
wmmhello's avatar
wmmhello 已提交
294
      }
H
Haojun Liao 已提交
295 296 297 298 299
      pInfo->isInit = true;
      num++;
      continue;
    }

H
Haojun Liao 已提交
300
    bool equal = groupKeyCompare(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols);
H
Haojun Liao 已提交
301 302 303 304 305
    if (equal) {
      num++;
      continue;
    }

H
Haojun Liao 已提交
306
    // The first row of a new block does not belongs to the previous existed group
307
    if (j == 0) {
H
Haojun Liao 已提交
308
      num++;
309
      recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
wmmhello's avatar
wmmhello 已提交
310
      if (terrno != TSDB_CODE_SUCCESS) {  // group by json error
311
        T_LONG_JMP(pTaskInfo->env, terrno);
wmmhello's avatar
wmmhello 已提交
312
      }
H
Haojun Liao 已提交
313 314 315
      continue;
    }

H
Haojun Liao 已提交
316
    len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
317
    int32_t ret = setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf,
318
                                          len, pBlock->info.id.groupId, pInfo->aggSup.pResultBuf, &pInfo->aggSup);
H
Haojun Liao 已提交
319
    if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
S
Shengliang Guan 已提交
320
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
H
Haojun Liao 已提交
321 322 323
    }

    int32_t rowIndex = j - num;
dengyihao's avatar
dengyihao 已提交
324 325
    applyAggFunctionOnPartialTuples(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows,
                                    pOperator->exprSupp.numOfExprs);
H
Haojun Liao 已提交
326 327

    // assign the group keys or user input constant values if required
328
    doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex);
329
    recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
H
Haojun Liao 已提交
330 331 332 333
    num = 1;
  }

  if (num > 0) {
H
Haojun Liao 已提交
334
    len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
335
    int32_t ret = setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf,
336
                                          len, pBlock->info.id.groupId, pInfo->aggSup.pResultBuf, &pInfo->aggSup);
H
Haojun Liao 已提交
337
    if (ret != TSDB_CODE_SUCCESS) {
S
Shengliang Guan 已提交
338
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
H
Haojun Liao 已提交
339 340 341
    }

    int32_t rowIndex = pBlock->info.rows - num;
dengyihao's avatar
dengyihao 已提交
342 343
    applyAggFunctionOnPartialTuples(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows,
                                    pOperator->exprSupp.numOfExprs);
344
    doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex);
H
Haojun Liao 已提交
345 346 347
  }
}

348 349 350 351
static SSDataBlock* buildGroupResultDataBlock(SOperatorInfo* pOperator) {
  SGroupbyOperatorInfo* pInfo = pOperator->info;

  SSDataBlock* pRes = pInfo->binfo.pRes;
352
  while (1) {
353
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
H
Haojun Liao 已提交
354
    doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
355

356
    if (!hasRemainResults(&pInfo->groupResInfo)) {
H
Haojun Liao 已提交
357
      setOperatorCompleted(pOperator);
358 359 360 361 362 363 364 365 366
      break;
    }

    if (pRes->info.rows > 0) {
      break;
    }
  }

  pOperator->resultInfo.totalRows += pRes->info.rows;
367
  return (pRes->info.rows == 0) ? NULL : pRes;
368 369
}

370
static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
371 372 373 374
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

375 376
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

H
Haojun Liao 已提交
377 378
  SGroupbyOperatorInfo* pInfo = pOperator->info;
  if (pOperator->status == OP_RES_TO_RETURN) {
379
    return buildGroupResultDataBlock(pOperator);
H
Haojun Liao 已提交
380 381
  }

382 383 384
  int32_t order = TSDB_ORDER_ASC;
  int32_t scanFlag = MAIN_SCAN;

385
  int64_t        st = taosGetTimestampUs();
H
Haojun Liao 已提交
386 387 388
  SOperatorInfo* downstream = pOperator->pDownstream[0];

  while (1) {
389
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
H
Haojun Liao 已提交
390 391 392 393
    if (pBlock == NULL) {
      break;
    }

394
    int32_t code = getTableScanInfo(pOperator, &order, &scanFlag, false);
395
    if (code != TSDB_CODE_SUCCESS) {
396
      T_LONG_JMP(pTaskInfo->env, code);
397 398
    }

H
Haojun Liao 已提交
399
    // the pDataBlock are always the same one, no need to call this again
400
    setInputDataBlock(&pOperator->exprSupp, pBlock, order, scanFlag, true);
401

402
    // there is an scalar expression that needs to be calculated right before apply the group aggregation.
403
    if (pInfo->scalarSup.pExprInfo != NULL) {
404 405
      pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
                                              pInfo->scalarSup.numOfExprs, NULL);
406
      if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
407
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
408
      }
409 410
    }

H
Haojun Liao 已提交
411 412 413 414
    doHashGroupbyAgg(pOperator, pBlock);
  }

  pOperator->status = OP_RES_TO_RETURN;
H
Haojun Liao 已提交
415

416 417 418 419 420 421 422 423 424 425 426 427 428 429
#if 0
  if(pOperator->fpSet.encodeResultRow){
    char *result = NULL;
    int32_t length = 0;
    pOperator->fpSet.encodeResultRow(pOperator, &result, &length);
    SAggSupporter* pSup = &pInfo->aggSup;
    taosHashClear(pSup->pResultRowHashTable);
    pInfo->binfo.resultRowInfo.size = 0;
    pOperator->fpSet.decodeResultRow(pOperator, result);
    if(result){
      taosMemoryFree(result);
    }
  }
#endif
430
  initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, 0);
431

432
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
433
  return buildGroupResultDataBlock(pOperator);
H
Haojun Liao 已提交
434 435
}

5
54liuyao 已提交
436
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo) {
437
  int32_t               code = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
438 439 440
  SGroupbyOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupbyOperatorInfo));
  SOperatorInfo*        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
441
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
442 443 444
    goto _error;
  }

H
Haojun Liao 已提交
445
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pAggNode->node.pOutputDataBlockDesc);
H
Haojun Liao 已提交
446 447 448 449 450 451 452 453 454
  initBasicInfo(&pInfo->binfo, pResBlock);

  int32_t    numOfScalarExpr = 0;
  SExprInfo* pScalarExprInfo = NULL;
  if (pAggNode->pExprs != NULL) {
    pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr);
  }

  pInfo->pGroupCols = extractColumnInfo(pAggNode->pGroupKeys);
455
  code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr);
456 457 458
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
459

H
Haojun Liao 已提交
460
  initResultSizeInfo(&pOperator->resultInfo, 4096);
H
Haojun Liao 已提交
461 462
  blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);

H
Haojun Liao 已提交
463
  code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pInfo->pGroupCols);
H
Haojun Liao 已提交
464 465 466 467
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
468 469
  int32_t    num = 0;
  SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
L
Liu Jicong 已提交
470 471
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, pInfo->groupKeyLen, pTaskInfo->id.str,
                    pTaskInfo->streamInfo.pState);
472 473 474 475
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

476 477 478 479 480
  code = filterInitFromNode((SNode*)pAggNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

481
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
482
  setOperatorInfo(pOperator, "GroupbyAggOperator", 0, true, OP_NOT_OPENED, pInfo, pTaskInfo);
H
Haojun Liao 已提交
483

D
dapan1121 已提交
484 485
  pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock;

dengyihao's avatar
dengyihao 已提交
486 487
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, hashGroupbyAggregate, NULL, destroyGroupOperatorInfo,
                                         optrDefaultBufFn, NULL);
H
Haojun Liao 已提交
488
  code = appendDownstream(pOperator, &downstream, 1);
489 490 491 492
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
493 494
  return pOperator;

495
_error:
496
  pTaskInfo->code = code;
H
Haojun Liao 已提交
497 498 499
  if (pInfo != NULL) {
    destroyGroupOperatorInfo(pInfo);
  }
H
Haojun Liao 已提交
500 501
  taosMemoryFreeClear(pOperator);
  return NULL;
502 503
}

H
Haojun Liao 已提交
504 505
static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
  SPartitionOperatorInfo* pInfo = pOperator->info;
dengyihao's avatar
dengyihao 已提交
506 507
  SExecTaskInfo*          pTaskInfo = pOperator->pTaskInfo;

H
Haojun Liao 已提交
508
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
509
    recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
H
Haojun Liao 已提交
510 511
    int32_t len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);

512
    SDataGroupInfo* pGroupInfo = NULL;
513
    void*           pPage = getCurrentDataGroupInfo(pInfo, &pGroupInfo, len);
514 515 516
    if (pPage == NULL) {
      T_LONG_JMP(pTaskInfo->env, terrno);
    }
H
Haojun Liao 已提交
517

518 519 520 521 522
    pGroupInfo->numOfRows += 1;

    // group id
    if (pGroupInfo->groupId == 0) {
      pGroupInfo->groupId = calcGroupId(pInfo->keyBuf, len);
H
Haojun Liao 已提交
523 524
    }

525
    // number of rows
526
    int32_t* rows = (int32_t*)pPage;
H
Haojun Liao 已提交
527

528
    size_t numOfCols = pOperator->exprSupp.numOfExprs;
529
    for (int32_t i = 0; i < numOfCols; ++i) {
530
      SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i];
531
      int32_t    slotId = pExpr->base.pParam[0].pCol->slotId;
532 533

      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
H
Haojun Liao 已提交
534

H
Haojun Liao 已提交
535 536
      int32_t bytes = pColInfoData->info.bytes;
      int32_t startOffset = pInfo->columnOffset[i];
H
Haojun Liao 已提交
537

538
      int32_t* columnLen = NULL;
539
      int32_t  contentLen = 0;
H
Haojun Liao 已提交
540 541

      if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
wafwerar's avatar
wafwerar 已提交
542
        int32_t* offset = (int32_t*)((char*)pPage + startOffset);
543 544
        columnLen = (int32_t*)((char*)pPage + startOffset + sizeof(int32_t) * pInfo->rowCapacity);
        char* data = (char*)((char*)columnLen + sizeof(int32_t));
H
Haojun Liao 已提交
545 546 547 548

        if (colDataIsNull_s(pColInfoData, j)) {
          offset[(*rows)] = -1;
          contentLen = 0;
549
        } else if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
wmmhello's avatar
wmmhello 已提交
550
          offset[*rows] = (*columnLen);
551
          char*   src = colDataGetData(pColInfoData, j);
wmmhello's avatar
wmmhello 已提交
552 553 554 555
          int32_t dataLen = getJsonValueLen(src);

          memcpy(data + (*columnLen), src, dataLen);
          int32_t v = (data + (*columnLen) + dataLen - (char*)pPage);
556
          ASSERT(v > 0);
wmmhello's avatar
wmmhello 已提交
557 558

          contentLen = dataLen;
H
Haojun Liao 已提交
559 560 561 562
        } else {
          offset[*rows] = (*columnLen);
          char* src = colDataGetData(pColInfoData, j);
          memcpy(data + (*columnLen), src, varDataTLen(src));
563
          int32_t v = (data + (*columnLen) + varDataTLen(src) - (char*)pPage);
564
          ASSERT(v > 0);
565

H
Haojun Liao 已提交
566 567
          contentLen = varDataTLen(src);
        }
H
Haojun Liao 已提交
568
      } else {
wafwerar's avatar
wafwerar 已提交
569
        char* bitmap = (char*)pPage + startOffset;
570 571
        columnLen = (int32_t*)((char*)pPage + startOffset + BitmapLen(pInfo->rowCapacity));
        char* data = (char*)columnLen + sizeof(int32_t);
H
Haojun Liao 已提交
572 573 574

        bool isNull = colDataIsNull_f(pColInfoData->nullbitmap, j);
        if (isNull) {
H
Haojun Liao 已提交
575
          colDataSetNull_f(bitmap, (*rows));
H
Haojun Liao 已提交
576
        } else {
H
Haojun Liao 已提交
577
          memcpy(data + (*columnLen), colDataGetData(pColInfoData, j), bytes);
578
          ASSERT((data + (*columnLen) + bytes - (char*)pPage) <= getBufPageSize(pInfo->pBuf));
H
Haojun Liao 已提交
579
        }
H
Haojun Liao 已提交
580
        contentLen = bytes;
H
Haojun Liao 已提交
581
      }
H
Haojun Liao 已提交
582 583

      (*columnLen) += contentLen;
H
Haojun Liao 已提交
584 585
    }

H
Haojun Liao 已提交
586 587
    (*rows) += 1;

H
Haojun Liao 已提交
588 589 590
    setBufPageDirty(pPage, true);
    releaseBufPage(pInfo->pBuf, pPage);
  }
H
Haojun Liao 已提交
591 592 593 594 595 596
}

void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len) {
  SDataGroupInfo* p = taosHashGet(pInfo->pGroupSet, pInfo->keyBuf, len);

  void* pPage = NULL;
597
  if (p == NULL) {  // it is a new group
H
Haojun Liao 已提交
598 599 600 601 602 603 604
    SDataGroupInfo gi = {0};
    gi.pPageList = taosArrayInit(100, sizeof(int32_t));
    taosHashPut(pInfo->pGroupSet, pInfo->keyBuf, len, &gi, sizeof(SDataGroupInfo));

    p = taosHashGet(pInfo->pGroupSet, pInfo->keyBuf, len);

    int32_t pageId = 0;
605
    pPage = getNewBufPage(pInfo->pBuf, &pageId);
606 607 608
    if (pPage == NULL) {
      return pPage;
    }
H
Haojun Liao 已提交
609

610
    taosArrayPush(p->pPageList, &pageId);
611
    *(int32_t*)pPage = 0;
H
Haojun Liao 已提交
612 613 614
  } else {
    int32_t* curId = taosArrayGetLast(p->pPageList);
    pPage = getBufPage(pInfo->pBuf, *curId);
615 616 617 618
    if (pPage == NULL) {
      qError("failed to get buffer, code:%s", tstrerror(terrno));
      return pPage;
    }
H
Haojun Liao 已提交
619

620
    int32_t* rows = (int32_t*)pPage;
H
Haojun Liao 已提交
621
    if (*rows >= pInfo->rowCapacity) {
622 623 624
      // release buffer
      releaseBufPage(pInfo->pBuf, pPage);

H
Haojun Liao 已提交
625 626
      // add a new page for current group
      int32_t pageId = 0;
627
      pPage = getNewBufPage(pInfo->pBuf, &pageId);
628 629 630 631 632
      if (pPage == NULL) {
        qError("failed to get new buffer, code:%s", tstrerror(terrno));
        return NULL;
      }

H
Haojun Liao 已提交
633
      taosArrayPush(p->pPageList, &pageId);
634
      memset(pPage, 0, getBufPageSize(pInfo->pBuf));
H
Haojun Liao 已提交
635 636
    }
  }
H
Haojun Liao 已提交
637

H
Haojun Liao 已提交
638 639 640 641 642 643 644 645 646 647 648 649 650 651
  *pGroupInfo = p;
  return pPage;
}

uint64_t calcGroupId(char* pData, int32_t len) {
  T_MD5_CTX context;
  tMD5Init(&context);
  tMD5Update(&context, (uint8_t*)pData, len);
  tMD5Final(&context);

  // NOTE: only extract the initial 8 bytes of the final MD5 digest
  uint64_t id = 0;
  memcpy(&id, context.digest, sizeof(uint64_t));
  return id;
H
Haojun Liao 已提交
652 653
}

H
Haojun Liao 已提交
654
int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity) {
655
  size_t   numOfCols = taosArrayGetSize(pBlock->pDataBlock);
656
  int32_t* offset = taosMemoryCalloc(numOfCols, sizeof(int32_t));
H
Haojun Liao 已提交
657

658 659
  offset[0] = sizeof(int32_t) +
              sizeof(uint64_t);  // the number of rows in current page, ref to SSDataBlock paged serialization format
H
Haojun Liao 已提交
660

661
  for (int32_t i = 0; i < numOfCols - 1; ++i) {
H
Haojun Liao 已提交
662 663 664 665
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);

    int32_t bytes = pColInfoData->info.bytes;
    int32_t payloadLen = bytes * rowCapacity;
666

H
Haojun Liao 已提交
667 668 669 670 671 672 673 674 675 676 677 678
    if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
      // offset segment + content length + payload
      offset[i + 1] = rowCapacity * sizeof(int32_t) + sizeof(int32_t) + payloadLen + offset[i];
    } else {
      // bitmap + content length + payload
      offset[i + 1] = BitmapLen(rowCapacity) + sizeof(int32_t) + payloadLen + offset[i];
    }
  }

  return offset;
}

5
54liuyao 已提交
679
static void clearPartitionOperator(SPartitionOperatorInfo* pInfo) {
680 681 682 683
  int32_t size = taosArrayGetSize(pInfo->sortedGroupArray);
  for (int32_t i = 0; i < size; i++) {
    SDataGroupInfo* pGp = taosArrayGet(pInfo->sortedGroupArray, i);
    taosArrayDestroy(pGp->pPageList);
5
54liuyao 已提交
684
  }
685
  taosArrayClear(pInfo->sortedGroupArray);
5
54liuyao 已提交
686 687 688
  clearDiskbasedBuf(pInfo->pBuf);
}

689 690 691
static int compareDataGroupInfo(const void* group1, const void* group2) {
  const SDataGroupInfo* pGroupInfo1 = group1;
  const SDataGroupInfo* pGroupInfo2 = group2;
692 693 694 695 696

  if (pGroupInfo1->groupId == pGroupInfo2->groupId) {
    return 0;
  }

697
  return (pGroupInfo1->groupId < pGroupInfo2->groupId) ? -1 : 1;
698 699
}

H
Haojun Liao 已提交
700 701
static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
  SPartitionOperatorInfo* pInfo = pOperator->info;
dengyihao's avatar
dengyihao 已提交
702 703
  SExecTaskInfo*          pTaskInfo = pOperator->pTaskInfo;

704 705
  SDataGroupInfo* pGroupInfo =
      (pInfo->groupIndex != -1) ? taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex) : NULL;
706
  if (pInfo->groupIndex == -1 || pInfo->pageIndex >= taosArrayGetSize(pGroupInfo->pPageList)) {
H
Haojun Liao 已提交
707
    // try next group data
708 709
    ++pInfo->groupIndex;
    if (pInfo->groupIndex >= taosArrayGetSize(pInfo->sortedGroupArray)) {
H
Haojun Liao 已提交
710
      setOperatorCompleted(pOperator);
5
54liuyao 已提交
711
      clearPartitionOperator(pInfo);
H
Haojun Liao 已提交
712 713 714
      return NULL;
    }

715
    pGroupInfo = taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex);
H
Haojun Liao 已提交
716 717 718 719
    pInfo->pageIndex = 0;
  }

  int32_t* pageId = taosArrayGet(pGroupInfo->pPageList, pInfo->pageIndex);
720
  void*    page = getBufPage(pInfo->pBuf, *pageId);
721 722 723 724
  if (page == NULL) {
    qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
    T_LONG_JMP(pTaskInfo->env, terrno);
  }
dengyihao's avatar
dengyihao 已提交
725

726
  blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->rowCapacity);
H
Haojun Liao 已提交
727
  blockDataFromBuf1(pInfo->binfo.pRes, page, pInfo->rowCapacity);
H
Haojun Liao 已提交
728 729

  pInfo->pageIndex += 1;
730
  releaseBufPage(pInfo->pBuf, page);
H
Haojun Liao 已提交
731

732
  pInfo->binfo.pRes->info.dataLoad = 1;
733
  blockDataUpdateTsWindow(pInfo->binfo.pRes, 0);
H
Haojun Liao 已提交
734
  pInfo->binfo.pRes->info.id.groupId = pGroupInfo->groupId;
735 736

  pOperator->resultInfo.totalRows += pInfo->binfo.pRes->info.rows;
H
Haojun Liao 已提交
737 738 739
  return pInfo->binfo.pRes;
}

740
static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
741 742
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
743 744
  }

745 746 747
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

  SPartitionOperatorInfo* pInfo = pOperator->info;
748
  SSDataBlock*            pRes = pInfo->binfo.pRes;
749

H
Haojun Liao 已提交
750
  if (pOperator->status == OP_RES_TO_RETURN) {
H
Haojun Liao 已提交
751 752
    blockDataCleanup(pRes);
    return buildPartitionResult(pOperator);
H
Haojun Liao 已提交
753 754
  }

755
  int64_t        st = taosGetTimestampUs();
H
Haojun Liao 已提交
756
  SOperatorInfo* downstream = pOperator->pDownstream[0];
H
Haojun Liao 已提交
757

H
Haojun Liao 已提交
758
  while (1) {
759
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
H
Haojun Liao 已提交
760 761 762
    if (pBlock == NULL) {
      break;
    }
H
Haojun Liao 已提交
763

764
    // there is an scalar expression that needs to be calculated right before apply the group aggregation.
765
    if (pInfo->scalarSup.pExprInfo != NULL) {
766 767
      pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
                                              pInfo->scalarSup.numOfExprs, NULL);
768
      if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
769
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
770 771 772
      }
    }

wmmhello's avatar
wmmhello 已提交
773
    terrno = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
774
    doHashPartition(pOperator, pBlock);
wmmhello's avatar
wmmhello 已提交
775
    if (terrno != TSDB_CODE_SUCCESS) {  // group by json error
776
      T_LONG_JMP(pTaskInfo->env, terrno);
wmmhello's avatar
wmmhello 已提交
777
    }
H
Haojun Liao 已提交
778 779
  }

780
  SArray* groupArray = taosArrayInit(taosHashGetSize(pInfo->pGroupSet), sizeof(SDataGroupInfo));
781 782

  void* pGroupIter = taosHashIterate(pInfo->pGroupSet, NULL);
783 784 785 786 787 788 789 790 791 792 793
  while (pGroupIter != NULL) {
    SDataGroupInfo* pGroupInfo = pGroupIter;
    taosArrayPush(groupArray, pGroupInfo);
    pGroupIter = taosHashIterate(pInfo->pGroupSet, pGroupIter);
  }

  taosArraySort(groupArray, compareDataGroupInfo);
  pInfo->sortedGroupArray = groupArray;
  pInfo->groupIndex = -1;
  taosHashClear(pInfo->pGroupSet);

794 795
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;

H
Haojun Liao 已提交
796
  pOperator->status = OP_RES_TO_RETURN;
H
Haojun Liao 已提交
797 798 799 800
  blockDataEnsureCapacity(pRes, 4096);
  return buildPartitionResult(pOperator);
}

801
static void destroyPartitionOperatorInfo(void* param) {
H
Haojun Liao 已提交
802
  SPartitionOperatorInfo* pInfo = (SPartitionOperatorInfo*)param;
803
  cleanupBasicInfo(&pInfo->binfo);
H
Haojun Liao 已提交
804
  taosArrayDestroy(pInfo->pGroupCols);
805

806
  for (int i = 0; i < taosArrayGetSize(pInfo->pGroupColVals); i++) {
wmmhello's avatar
wmmhello 已提交
807 808 809
    SGroupKeys key = *(SGroupKeys*)taosArrayGet(pInfo->pGroupColVals, i);
    taosMemoryFree(key.pData);
  }
810

H
Haojun Liao 已提交
811
  taosArrayDestroy(pInfo->pGroupColVals);
H
Haojun Liao 已提交
812
  taosMemoryFree(pInfo->keyBuf);
dengyihao's avatar
dengyihao 已提交
813 814 815 816 817 818

  int32_t size = taosArrayGetSize(pInfo->sortedGroupArray);
  for (int32_t i = 0; i < size; i++) {
    SDataGroupInfo* pGp = taosArrayGet(pInfo->sortedGroupArray, i);
    taosArrayDestroy(pGp->pPageList);
  }
819
  taosArrayDestroy(pInfo->sortedGroupArray);
D
dapan1121 已提交
820 821 822 823 824 825 826 827

  void* pGroupIter = taosHashIterate(pInfo->pGroupSet, NULL);
  while (pGroupIter != NULL) {
    SDataGroupInfo* pGroupInfo = pGroupIter;
    taosArrayDestroy(pGroupInfo->pPageList);
    pGroupIter = taosHashIterate(pInfo->pGroupSet, pGroupIter);
  }

wmmhello's avatar
wmmhello 已提交
828
  taosHashCleanup(pInfo->pGroupSet);
H
Haojun Liao 已提交
829
  taosMemoryFree(pInfo->columnOffset);
830

831
  cleanupExprSupp(&pInfo->scalarSup);
H
Haojun Liao 已提交
832
  destroyDiskbasedBuf(pInfo->pBuf);
D
dapan1121 已提交
833
  taosMemoryFreeClear(param);
H
Haojun Liao 已提交
834 835
}

836 837
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode,
                                           SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
838
  SPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SPartitionOperatorInfo));
839
  SOperatorInfo*          pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
840
  if (pInfo == NULL || pOperator == NULL) {
dengyihao's avatar
dengyihao 已提交
841 842
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    pTaskInfo->code = terrno;
H
Haojun Liao 已提交
843 844
    goto _error;
  }
845

846
  int32_t    numOfCols = 0;
847 848 849 850
  SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &numOfCols);
  pInfo->pGroupCols = extractPartitionColInfo(pPartNode->pPartitionKeys);

  if (pPartNode->pExprs != NULL) {
851
    int32_t    num = 0;
852
    SExprInfo* pExprInfo1 = createExprInfo(pPartNode->pExprs, NULL, &num);
853
    int32_t    code = initExprSupp(&pInfo->scalarSup, pExprInfo1, num);
854
    if (code != TSDB_CODE_SUCCESS) {
dengyihao's avatar
dengyihao 已提交
855 856
      terrno = code;
      pTaskInfo->code = terrno;
857 858
      goto _error;
    }
859
  }
H
Haojun Liao 已提交
860 861 862 863

  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
  pInfo->pGroupSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK);
  if (pInfo->pGroupSet == NULL) {
dengyihao's avatar
dengyihao 已提交
864 865
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    pTaskInfo->code = terrno;
H
Haojun Liao 已提交
866 867 868
    goto _error;
  }

869
  uint32_t defaultPgsz = 0;
870
  uint32_t defaultBufsz = 0;
H
Haojun Liao 已提交
871

H
Haojun Liao 已提交
872
  pInfo->binfo.pRes = createDataBlockFromDescNode(pPartNode->node.pOutputDataBlockDesc);
H
Haojun Liao 已提交
873
  getBufferPgSize(pInfo->binfo.pRes->info.rowSize, &defaultPgsz, &defaultBufsz);
874

wafwerar's avatar
wafwerar 已提交
875 876 877 878 879 880
  if (!osTempSpaceAvailable()) {
    terrno = TSDB_CODE_NO_AVAIL_DISK;
    pTaskInfo->code = terrno;
    qError("Create partition operator info failed since %s", terrstr(terrno));
    goto _error;
  }
H
Haojun Liao 已提交
881

wafwerar's avatar
wafwerar 已提交
882
  int32_t code = createDiskbasedBuf(&pInfo->pBuf, defaultPgsz, defaultBufsz, pTaskInfo->id.str, tsTempDir);
H
Haojun Liao 已提交
883
  if (code != TSDB_CODE_SUCCESS) {
dengyihao's avatar
dengyihao 已提交
884 885
    terrno = code;
    pTaskInfo->code = code;
H
Haojun Liao 已提交
886 887 888
    goto _error;
  }

H
Haojun Liao 已提交
889 890
  pInfo->rowCapacity = blockDataGetCapacityInRow(pInfo->binfo.pRes, getBufPageSize(pInfo->pBuf));
  pInfo->columnOffset = setupColumnOffset(pInfo->binfo.pRes, pInfo->rowCapacity);
891
  code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pInfo->pGroupCols);
H
Haojun Liao 已提交
892
  if (code != TSDB_CODE_SUCCESS) {
dengyihao's avatar
dengyihao 已提交
893 894
    terrno = code;
    pTaskInfo->code = code;
H
Haojun Liao 已提交
895 896
    goto _error;
  }
H
Haojun Liao 已提交
897

L
Liu Jicong 已提交
898 899
  setOperatorInfo(pOperator, "PartitionOperator", QUERY_NODE_PHYSICAL_PLAN_PARTITION, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
900 901
  pOperator->exprSupp.numOfExprs = numOfCols;
  pOperator->exprSupp.pExprInfo = pExprInfo;
902

dengyihao's avatar
dengyihao 已提交
903 904
  pOperator->fpSet =
      createOperatorFpSet(optrDummyOpenFn, hashPartition, NULL, destroyPartitionOperatorInfo, optrDefaultBufFn, NULL);
H
Haojun Liao 已提交
905

H
Haojun Liao 已提交
906
  code = appendDownstream(pOperator, &downstream, 1);
dengyihao's avatar
dengyihao 已提交
907 908 909 910 911 912
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    pTaskInfo->code = code;
    goto _error;
  }

913 914
  return pOperator;

915
_error:
H
Haojun Liao 已提交
916 917 918
  if (pInfo != NULL) {
    destroyPartitionOperatorInfo(pInfo);
  }
H
Haojun Liao 已提交
919
  taosMemoryFreeClear(pOperator);
920
  return NULL;
921 922
}

923 924 925
int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData,
                                int16_t bytes, uint64_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup) {
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
926
  SResultRowInfo* pResultRowInfo = &binfo->resultRowInfo;
927
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
928 929

  SResultRow* pResultRow =
D
dapan1121 已提交
930
      doSetResultOutBufByKey(pBuf, pResultRowInfo, (char*)pData, bytes, true, groupId, pTaskInfo, false, pAggSup, false);
931

932
  setResultRowInitCtx(pResultRow, pCtx, numOfCols, pOperator->exprSupp.rowEntryInfoOffset);
933
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
934
}
935 936 937

uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId) {
  if (pExprSup->pExprInfo != NULL) {
938 939
    int32_t code =
        projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
940 941 942 943 944
    if (code != TSDB_CODE_SUCCESS) {
      qError("calaculate group id error, code:%d", code);
    }
  }
  recordNewGroupKeys(pParSup->pGroupCols, pParSup->pGroupColVals, pBlock, rowId);
945
  int32_t  len = buildGroupKeys(pParSup->keyBuf, pParSup->pGroupColVals);
946 947 948 949
  uint64_t groupId = calcGroupId(pParSup->keyBuf, len);
  return groupId;
}

950
static bool hasRemainPartion(SStreamPartitionOperatorInfo* pInfo) { return pInfo->parIte != NULL; }
5
54liuyao 已提交
951
static bool hasRemainTbName(SStreamPartitionOperatorInfo* pInfo) { return pInfo->pTbNameIte != NULL; }
952 953 954

static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
  SStreamPartitionOperatorInfo* pInfo = pOperator->info;
955
  SSDataBlock*                  pDest = pInfo->binfo.pRes;
956
  ASSERT(hasRemainPartion(pInfo));
957 958
  SPartitionDataInfo* pParInfo = (SPartitionDataInfo*)pInfo->parIte;
  blockDataCleanup(pDest);
959
  int32_t      rows = taosArrayGetSize(pParInfo->rowIds);
960 961 962 963
  SSDataBlock* pSrc = pInfo->pInputDataBlock;
  for (int32_t i = 0; i < rows; i++) {
    int32_t rowIndex = *(int32_t*)taosArrayGet(pParInfo->rowIds, i);
    for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; j++) {
964
      int32_t          slotId = pOperator->exprSupp.pExprInfo[j].base.pParam[0].pCol->slotId;
965 966
      SColumnInfoData* pSrcCol = taosArrayGet(pSrc->pDataBlock, slotId);
      SColumnInfoData* pDestCol = taosArrayGet(pDest->pDataBlock, j);
967 968
      bool             isNull = colDataIsNull(pSrcCol, pSrc->info.rows, rowIndex, NULL);
      char*            pSrcData = colDataGetData(pSrcCol, rowIndex);
969
      colDataSetVal(pDestCol, pDest->info.rows, pSrcData, isNull);
970 971
    }
    pDest->info.rows++;
5
54liuyao 已提交
972 973 974 975 976 977 978
  }
  pDest->info.parTbName[0] = 0;
  if (pInfo->tbnameCalSup.numOfExprs > 0) {
    void* tbname = NULL;
    if (streamStateGetParName(pOperator->pTaskInfo->streamInfo.pState, pParInfo->groupId, &tbname) == 0) {
      memcpy(pDest->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
      tdbFree(tbname);
979
    }
980
  }
981 982
  taosArrayDestroy(pParInfo->rowIds);
  pParInfo->rowIds = NULL;
H
Haojun Liao 已提交
983 984
  pDest->info.dataLoad = 1;

985
  blockDataUpdateTsWindow(pDest, pInfo->tsColIndex);
H
Haojun Liao 已提交
986
  pDest->info.id.groupId = pParInfo->groupId;
987 988
  pOperator->resultInfo.totalRows += pDest->info.rows;
  pInfo->parIte = taosHashIterate(pInfo->pPartitions, pInfo->parIte);
989
  ASSERT(pDest->info.rows > 0);
990 991 992 993
  printDataBlock(pDest, "stream partitionby");
  return pDest;
}

5
54liuyao 已提交
994
void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* pTagSup, uint64_t groupId,
5
54liuyao 已提交
995 996
                          SSDataBlock* pSrcBlock, int32_t rowId, SSDataBlock* pDestBlock) {
  void* pValue = NULL;
5
54liuyao 已提交
997
  if (streamStateGetParName(pState, groupId, &pValue) != 0) {
5
54liuyao 已提交
998
    SSDataBlock* pTmpBlock = blockCopyOneRow(pSrcBlock, rowId);
5
54liuyao 已提交
999 1000
    memset(pTmpBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN);
    pTmpBlock->info.id.groupId = groupId;
5
54liuyao 已提交
1001
    char* tbName = pSrcBlock->info.parTbName;
5
54liuyao 已提交
1002 1003 1004 1005
    if (pTableSup->numOfExprs > 0) {
      projectApplyFunctions(pTableSup->pExprInfo, pDestBlock, pTmpBlock, pTableSup->pCtx, pTableSup->numOfExprs, NULL);
      SColumnInfoData* pTbCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX);
      memset(tbName, 0, TSDB_TABLE_NAME_LEN);
5
54liuyao 已提交
1006 1007
      int32_t len = 0;
      if (colDataIsNull_s(pTbCol, pDestBlock->info.rows - 1)) {
5
54liuyao 已提交
1008 1009
        len = 1;
        tbName[0] = 0;
5
54liuyao 已提交
1010 1011 1012 1013
      } else {
        void* pData = colDataGetData(pTbCol, pDestBlock->info.rows - 1);
        len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1);
        memcpy(tbName, varDataVal(pData), len);
5
54liuyao 已提交
1014
        streamStatePutParName(pState, groupId, tbName);
5
54liuyao 已提交
1015
      }
5
54liuyao 已提交
1016
      memcpy(pTmpBlock->info.parTbName, tbName, len);
5
54liuyao 已提交
1017 1018 1019
      pDestBlock->info.rows--;
    } else {
      void* pTbNameCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX);
X
Xiaoyu Wang 已提交
1020
      colDataSetNULL(pTbNameCol, pDestBlock->info.rows);
5
54liuyao 已提交
1021
      tbName[0] = 0;
5
54liuyao 已提交
1022 1023 1024 1025 1026
    }

    if (pTagSup->numOfExprs > 0) {
      projectApplyFunctions(pTagSup->pExprInfo, pDestBlock, pTmpBlock, pTagSup->pCtx, pTagSup->numOfExprs, NULL);
      pDestBlock->info.rows--;
5
54liuyao 已提交
1027 1028
    } else {
      memcpy(pDestBlock->info.parTbName, pTmpBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
5
54liuyao 已提交
1029 1030 1031 1032 1033 1034
    }

    void* pGpIdCol = taosArrayGet(pDestBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX);
    colDataAppend(pGpIdCol, pDestBlock->info.rows, (const char*)&groupId, false);
    pDestBlock->info.rows++;
    blockDataDestroy(pTmpBlock);
5
54liuyao 已提交
1035 1036
  } else {
    memcpy(pSrcBlock->info.parTbName, pValue, TSDB_TABLE_NAME_LEN);
5
54liuyao 已提交
1037 1038 1039 1040 1041 1042
  }
  streamStateReleaseBuf(pState, NULL, pValue);
}

static SSDataBlock* buildStreamCreateTableResult(SOperatorInfo* pOperator) {
  SStreamPartitionOperatorInfo* pInfo = pOperator->info;
1043 1044
  if ((pInfo->tbnameCalSup.numOfExprs == 0 && pInfo->tagCalSup.numOfExprs == 0) ||
      taosHashGetSize(pInfo->pPartitions) == 0) {
5
54liuyao 已提交
1045 1046 1047 1048 1049 1050
    return NULL;
  }
  blockDataCleanup(pInfo->pCreateTbRes);
  blockDataEnsureCapacity(pInfo->pCreateTbRes, taosHashGetSize(pInfo->pPartitions));
  SSDataBlock* pSrc = pInfo->pInputDataBlock;

5
54liuyao 已提交
1051
  if (pInfo->pTbNameIte != NULL) {
5
54liuyao 已提交
1052
    SPartitionDataInfo* pParInfo = (SPartitionDataInfo*)pInfo->pTbNameIte;
1053
    int32_t             rowId = *(int32_t*)taosArrayGet(pParInfo->rowIds, 0);
5
54liuyao 已提交
1054
    appendCreateTableRow(pOperator->pTaskInfo->streamInfo.pState, &pInfo->tbnameCalSup, &pInfo->tagCalSup,
1055
                         pParInfo->groupId, pSrc, rowId, pInfo->pCreateTbRes);
5
54liuyao 已提交
1056 1057 1058 1059 1060
    pInfo->pTbNameIte = taosHashIterate(pInfo->pPartitions, pInfo->pTbNameIte);
  }
  return pInfo->pCreateTbRes->info.rows > 0 ? pInfo->pCreateTbRes : NULL;
}

1061 1062 1063 1064
static void doStreamHashPartitionImpl(SStreamPartitionOperatorInfo* pInfo, SSDataBlock* pBlock) {
  pInfo->pInputDataBlock = pBlock;
  for (int32_t i = 0; i < pBlock->info.rows; ++i) {
    recordNewGroupKeys(pInfo->partitionSup.pGroupCols, pInfo->partitionSup.pGroupColVals, pBlock, i);
1065 1066 1067
    int32_t             keyLen = buildGroupKeys(pInfo->partitionSup.keyBuf, pInfo->partitionSup.pGroupColVals);
    SPartitionDataInfo* pParData =
        (SPartitionDataInfo*)taosHashGet(pInfo->pPartitions, pInfo->partitionSup.keyBuf, keyLen);
1068 1069 1070 1071 1072 1073 1074
    if (pParData) {
      taosArrayPush(pParData->rowIds, &i);
    } else {
      SPartitionDataInfo newParData = {0};
      newParData.groupId = calcGroupId(pInfo->partitionSup.keyBuf, keyLen);
      newParData.rowIds = taosArrayInit(64, sizeof(int32_t));
      taosArrayPush(newParData.rowIds, &i);
1075
      taosHashPut(pInfo->pPartitions, pInfo->partitionSup.keyBuf, keyLen, &newParData, sizeof(SPartitionDataInfo));
1076 1077 1078 1079 1080 1081 1082 1083 1084
    }
  }
}

static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

1085
  SExecTaskInfo*                pTaskInfo = pOperator->pTaskInfo;
1086
  SStreamPartitionOperatorInfo* pInfo = pOperator->info;
5
54liuyao 已提交
1087 1088 1089 1090 1091 1092 1093 1094 1095
  SSDataBlock*                  pCtRes = NULL;

  if (hasRemainTbName(pInfo)) {
    pCtRes = buildStreamCreateTableResult(pOperator);
    if (pCtRes != NULL) {
      return pCtRes;
    }
  }

1096 1097 1098 1099
  if (hasRemainPartion(pInfo)) {
    return buildStreamPartitionResult(pOperator);
  }

1100
  int64_t        st = taosGetTimestampUs();
1101 1102 1103 1104 1105
  SOperatorInfo* downstream = pOperator->pDownstream[0];
  {
    pInfo->pInputDataBlock = NULL;
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
    if (pBlock == NULL) {
H
Haojun Liao 已提交
1106
      setOperatorCompleted(pOperator);
1107 1108 1109 1110 1111 1112 1113 1114 1115
      return NULL;
    }
    printDataBlock(pBlock, "stream partitionby recv");
    switch (pBlock->info.type) {
      case STREAM_NORMAL:
      case STREAM_PULL_DATA:
      case STREAM_INVALID:
        pInfo->binfo.pRes->info.type = pBlock->info.type;
        break;
1116 1117 1118
      case STREAM_DELETE_DATA: {
        copyDataBlock(pInfo->pDelRes, pBlock);
        pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;
5
54liuyao 已提交
1119
        printDataBlock(pInfo->pDelRes, "stream partitionby delete");
1120
        return pInfo->pDelRes;
1121
      } break;
1122
      default:
5
54liuyao 已提交
1123
        ASSERTS(pBlock->info.type == STREAM_CREATE_CHILD_TABLE, "invalid SSDataBlock type");
1124 1125 1126 1127 1128
        return pBlock;
    }

    // there is an scalar expression that needs to be calculated right before apply the group aggregation.
    if (pInfo->scalarSup.pExprInfo != NULL) {
5
54liuyao 已提交
1129
      projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
1130
                                              pInfo->scalarSup.numOfExprs, NULL);
1131 1132 1133 1134 1135
    }
    taosHashClear(pInfo->pPartitions);
    doStreamHashPartitionImpl(pInfo, pBlock);
  }
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
1136

1137
  pInfo->parIte = taosHashIterate(pInfo->pPartitions, NULL);
5
54liuyao 已提交
1138 1139 1140 1141 1142
  pInfo->pTbNameIte = taosHashIterate(pInfo->pPartitions, NULL);
  pCtRes = buildStreamCreateTableResult(pOperator);
  if (pCtRes != NULL) {
    return pCtRes;
  }
1143 1144 1145 1146 1147 1148 1149 1150
  return buildStreamPartitionResult(pOperator);
}

static void destroyStreamPartitionOperatorInfo(void* param) {
  SStreamPartitionOperatorInfo* pInfo = (SStreamPartitionOperatorInfo*)param;
  cleanupBasicInfo(&pInfo->binfo);
  taosArrayDestroy(pInfo->partitionSup.pGroupCols);

1151
  for (int i = 0; i < taosArrayGetSize(pInfo->partitionSup.pGroupColVals); i++) {
1152 1153 1154 1155 1156 1157 1158
    SGroupKeys key = *(SGroupKeys*)taosArrayGet(pInfo->partitionSup.pGroupColVals, i);
    taosMemoryFree(key.pData);
  }
  taosArrayDestroy(pInfo->partitionSup.pGroupColVals);

  taosMemoryFree(pInfo->partitionSup.keyBuf);
  cleanupExprSupp(&pInfo->scalarSup);
L
Liu Jicong 已提交
1159 1160
  cleanupExprSupp(&pInfo->tbnameCalSup);
  cleanupExprSupp(&pInfo->tagCalSup);
1161
  blockDataDestroy(pInfo->pDelRes);
1162
  taosHashCleanup(pInfo->pPartitions);
5
54liuyao 已提交
1163
  blockDataDestroy(pInfo->pCreateTbRes);
1164 1165 1166 1167 1168 1169 1170 1171 1172 1173
  taosMemoryFreeClear(param);
}

void initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pParSup, SExprSupp* pExpr) {
  if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
    return;
  }
  SStreamScanInfo* pScanInfo = downstream->info;
  pScanInfo->partitionSup = *pParSup;
  pScanInfo->pPartScalarSup = pExpr;
5
54liuyao 已提交
1174
  if (!pScanInfo->igCheckUpdate && !pScanInfo->pUpdateInfo) {
5
54liuyao 已提交
1175 1176
    pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, 0);
  }
1177 1178
}

5
54liuyao 已提交
1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218
SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag) {
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
  pBlock->info.hasVarCol = false;
  pBlock->info.id.groupId = 0;
  pBlock->info.rows = 0;
  pBlock->info.type = STREAM_CREATE_CHILD_TABLE;
  pBlock->info.watermark = INT64_MIN;

  pBlock->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData));
  SColumnInfoData infoData = {0};
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
  if (tbName->numOfExprs > 0) {
    infoData.info.bytes = tbName->pExprInfo->base.resSchema.bytes;
  } else {
    infoData.info.bytes = 1;
  }
  pBlock->info.rowSize += infoData.info.bytes;
  // sub table name
  taosArrayPush(pBlock->pDataBlock, &infoData);

  SColumnInfoData gpIdData = {0};
  gpIdData.info.type = TSDB_DATA_TYPE_UBIGINT;
  gpIdData.info.bytes = 8;
  pBlock->info.rowSize += gpIdData.info.bytes;
  // group id
  taosArrayPush(pBlock->pDataBlock, &gpIdData);

  for (int32_t i = 0; i < tag->numOfExprs; i++) {
    SColumnInfoData tagCol = {0};
    tagCol.info.type = tag->pExprInfo[i].base.resSchema.type;
    tagCol.info.bytes = tag->pExprInfo[i].base.resSchema.bytes;
    tagCol.info.precision = tag->pExprInfo[i].base.resSchema.precision;
    // tag info
    taosArrayPush(pBlock->pDataBlock, &tagCol);
    pBlock->info.rowSize += tagCol.info.bytes;
  }

  return pBlock;
}

1219 1220
SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPartitionPhysiNode* pPartNode,
                                                 SExecTaskInfo* pTaskInfo) {
1221
  int32_t                       code = TSDB_CODE_SUCCESS;
1222 1223 1224
  SStreamPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamPartitionOperatorInfo));
  SOperatorInfo*                pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
1225
    code = TSDB_CODE_OUT_OF_MEMORY;
1226 1227
    goto _error;
  }
1228

1229
  pInfo->partitionSup.pGroupCols = extractPartitionColInfo(pPartNode->part.pPartitionKeys);
1230

1231
  if (pPartNode->part.pExprs != NULL) {
1232
    int32_t    num = 0;
1233
    SExprInfo* pCalExprInfo = createExprInfo(pPartNode->part.pExprs, NULL, &num);
1234 1235 1236 1237 1238 1239
    code = initExprSupp(&pInfo->scalarSup, pCalExprInfo, num);
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }
  }

5
54liuyao 已提交
1240
  pInfo->tbnameCalSup.numOfExprs = 0;
1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254
  if (pPartNode->pSubtable != NULL) {
    SExprInfo* pSubTableExpr = taosMemoryCalloc(1, sizeof(SExprInfo));
    if (pSubTableExpr == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _error;
    }
    pInfo->tbnameCalSup.pExprInfo = pSubTableExpr;
    createExprFromOneNode(pSubTableExpr, pPartNode->pSubtable, 0);
    code = initExprSupp(&pInfo->tbnameCalSup, pSubTableExpr, 1);
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }
  }

5
54liuyao 已提交
1255
  pInfo->tagCalSup.numOfExprs = 0;
L
Liu Jicong 已提交
1256 1257
  if (pPartNode->pTags != NULL) {
    int32_t    numOfTags;
5
54liuyao 已提交
1258
    SExprInfo* pTagExpr = createExpr(pPartNode->pTags, &numOfTags);
L
Liu Jicong 已提交
1259
    if (pTagExpr == NULL) {
1260
      code = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
1261 1262 1263
      goto _error;
    }
    if (initExprSupp(&pInfo->tagCalSup, pTagExpr, numOfTags) != 0) {
1264
      code = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
1265 1266 1267 1268
      goto _error;
    }
  }

5
54liuyao 已提交
1269 1270 1271 1272 1273 1274
  if (pInfo->tbnameCalSup.numOfExprs != 0 || pInfo->tagCalSup.numOfExprs != 0) {
    pInfo->pCreateTbRes = buildCreateTableBlock(&pInfo->tbnameCalSup, &pInfo->tagCalSup);
  } else {
    pInfo->pCreateTbRes = NULL;
  }

1275
  int32_t keyLen = 0;
1276 1277
  code = initGroupOptrInfo(&pInfo->partitionSup.pGroupColVals, &keyLen, &pInfo->partitionSup.keyBuf,
                           pInfo->partitionSup.pGroupCols);
1278 1279 1280 1281 1282
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
  pInfo->partitionSup.needCalc = true;

H
Haojun Liao 已提交
1283
  pInfo->binfo.pRes = createDataBlockFromDescNode(pPartNode->part.node.pOutputDataBlockDesc);
1284
  if (pInfo->binfo.pRes == NULL) {
1285
    code = TSDB_CODE_OUT_OF_MEMORY;
1286 1287
    goto _error;
  }
1288 1289 1290

  blockDataEnsureCapacity(pInfo->binfo.pRes, 4096);

1291
  pInfo->parIte = NULL;
5
54liuyao 已提交
1292
  pInfo->pTbNameIte = NULL;
1293
  pInfo->pInputDataBlock = NULL;
1294

1295
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
1296 1297 1298
  pInfo->pPartitions = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
  pInfo->tsColIndex = 0;
  pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
1299

1300
  int32_t    numOfCols = 0;
1301
  SExprInfo* pExprInfo = createExprInfo(pPartNode->part.pTargets, NULL, &numOfCols);
1302

L
Liu Jicong 已提交
1303 1304
  setOperatorInfo(pOperator, "StreamPartitionOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
1305 1306
  pOperator->exprSupp.numOfExprs = numOfCols;
  pOperator->exprSupp.pExprInfo = pExprInfo;
dengyihao's avatar
dengyihao 已提交
1307 1308
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamHashPartition, NULL,
                                         destroyStreamPartitionOperatorInfo, optrDefaultBufFn, NULL);
1309 1310 1311 1312 1313

  initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup);
  code = appendDownstream(pOperator, &downstream, 1);
  return pOperator;

1314
_error:
1315
  pTaskInfo->code = code;
1316
  destroyStreamPartitionOperatorInfo(pInfo);
1317 1318 1319
  taosMemoryFreeClear(pOperator);
  return NULL;
}
H
Haojun Liao 已提交
1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352

SArray* extractColumnInfo(SNodeList* pNodeList) {
  size_t  numOfCols = LIST_LENGTH(pNodeList);
  SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn));
  if (pList == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  for (int32_t i = 0; i < numOfCols; ++i) {
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);

    if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN) {
      SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;

      SColumn c = extractColumnFromColumnNode(pColNode);
      taosArrayPush(pList, &c);
    } else if (nodeType(pNode->pExpr) == QUERY_NODE_VALUE) {
      SValueNode* pValNode = (SValueNode*)pNode->pExpr;
      SColumn     c = {0};
      c.slotId = pNode->slotId;
      c.colId = pNode->slotId;
      c.type = pValNode->node.type;
      c.bytes = pValNode->node.resType.bytes;
      c.scale = pValNode->node.resType.scale;
      c.precision = pValNode->node.resType.precision;

      taosArrayPush(pList, &c);
    }
  }

  return pList;
}