groupoperator.c 40.0 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "filter.h"
H
Haojun Liao 已提交
17
#include "function.h"
18
#include "os.h"
H
Haojun Liao 已提交
19 20 21 22 23
#include "tname.h"

#include "tdatablock.h"
#include "tmsg.h"

24
#include "executorInt.h"
H
Haojun Liao 已提交
25 26 27 28 29
#include "executorimpl.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"

30
static void*    getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len);
H
Haojun Liao 已提交
31
static int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity);
32 33
static int32_t  setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData,
                                        int16_t bytes, uint64_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup);
H
Haojun Liao 已提交
34
static SArray*  extractColumnInfo(SNodeList* pNodeList);
H
Haojun Liao 已提交
35

H
Haojun Liao 已提交
36
static void freeGroupKey(void* param) {
37
  SGroupKeys* pKey = (SGroupKeys*)param;
H
Haojun Liao 已提交
38 39 40
  taosMemoryFree(pKey->pData);
}

41
static void destroyGroupOperatorInfo(void* param) {
H
Haojun Liao 已提交
42
  SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param;
43 44 45 46
  if (pInfo == NULL) {
    return;
  }

47
  cleanupBasicInfo(&pInfo->binfo);
H
Haojun Liao 已提交
48 49
  taosMemoryFreeClear(pInfo->keyBuf);
  taosArrayDestroy(pInfo->pGroupCols);
H
Haojun Liao 已提交
50
  taosArrayDestroyEx(pInfo->pGroupColVals, freeGroupKey);
51
  cleanupExprSupp(&pInfo->scalarSup);
H
Haojun Liao 已提交
52 53 54

  cleanupGroupResInfo(&pInfo->groupResInfo);
  cleanupAggSup(&pInfo->aggSup);
D
dapan1121 已提交
55
  taosMemoryFreeClear(param);
H
Haojun Liao 已提交
56 57
}

wmmhello's avatar
wmmhello 已提交
58
static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) {
H
Haojun Liao 已提交
59 60
  *pGroupColVals = taosArrayInit(4, sizeof(SGroupKeys));
  if ((*pGroupColVals) == NULL) {
H
Haojun Liao 已提交
61 62 63 64 65
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  int32_t numOfGroupCols = taosArrayGetSize(pGroupColList);
  for (int32_t i = 0; i < numOfGroupCols; ++i) {
5
54liuyao 已提交
66
    SColumn* pCol = (SColumn*)taosArrayGet(pGroupColList, i);
67
    (*keyLen) += pCol->bytes;  // actual data + null_flag
H
Haojun Liao 已提交
68

69
    SGroupKeys key = {0};
70 71
    key.bytes = pCol->bytes;
    key.type = pCol->type;
H
Haojun Liao 已提交
72
    key.isNull = false;
73
    key.pData = taosMemoryCalloc(1, pCol->bytes);
H
Haojun Liao 已提交
74 75 76 77
    if (key.pData == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

H
Haojun Liao 已提交
78
    taosArrayPush((*pGroupColVals), &key);
H
Haojun Liao 已提交
79 80 81
  }

  int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols;
82
  (*keyLen) += nullFlagSize;
H
Haojun Liao 已提交
83

84
  (*keyBuf) = taosMemoryCalloc(1, (*keyLen));
H
Haojun Liao 已提交
85
  if ((*keyBuf) == NULL) {
H
Haojun Liao 已提交
86 87 88 89 90 91
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  return TSDB_CODE_SUCCESS;
}

92 93
static bool groupKeyCompare(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex,
                            int32_t numOfGroupCols) {
H
Haojun Liao 已提交
94 95
  SColumnDataAgg* pColAgg = NULL;
  for (int32_t i = 0; i < numOfGroupCols; ++i) {
H
Haojun Liao 已提交
96
    SColumn*         pCol = taosArrayGet(pGroupCols, i);
H
Haojun Liao 已提交
97 98
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
    if (pBlock->pBlockAgg != NULL) {
99
      pColAgg = pBlock->pBlockAgg[pCol->slotId];  // TODO is agg data matched?
H
Haojun Liao 已提交
100 101 102 103
    }

    bool isNull = colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg);

H
Haojun Liao 已提交
104
    SGroupKeys* pkey = taosArrayGet(pGroupColVals, i);
H
Haojun Liao 已提交
105 106 107 108 109 110 111 112 113 114
    if (pkey->isNull && isNull) {
      continue;
    }

    if (isNull || pkey->isNull) {
      return false;
    }

    char* val = colDataGetData(pColInfoData, rowIndex);

wmmhello's avatar
wmmhello 已提交
115 116 117
    if (pkey->type == TSDB_DATA_TYPE_JSON) {
      int32_t dataLen = getJsonValueLen(val);

118
      if (memcmp(pkey->pData, val, dataLen) == 0) {
wmmhello's avatar
wmmhello 已提交
119 120 121 122 123
        continue;
      } else {
        return false;
      }
    } else if (IS_VAR_DATA_TYPE(pkey->type)) {
H
Haojun Liao 已提交
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
      int32_t len = varDataLen(val);
      if (len == varDataLen(pkey->pData) && memcmp(varDataVal(pkey->pData), varDataVal(val), len) == 0) {
        continue;
      } else {
        return false;
      }
    } else {
      if (memcmp(pkey->pData, val, pkey->bytes) != 0) {
        return false;
      }
    }
  }

  return true;
}

wmmhello's avatar
wmmhello 已提交
140
static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex) {
H
Haojun Liao 已提交
141 142
  SColumnDataAgg* pColAgg = NULL;

143 144
  size_t numOfGroupCols = taosArrayGetSize(pGroupCols);

H
Haojun Liao 已提交
145
  for (int32_t i = 0; i < numOfGroupCols; ++i) {
H
Haojun Liao 已提交
146
    SColumn*         pCol = taosArrayGet(pGroupCols, i);
H
Haojun Liao 已提交
147 148 149
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);

    if (pBlock->pBlockAgg != NULL) {
150
      pColAgg = pBlock->pBlockAgg[pCol->slotId];  // TODO is agg data matched?
H
Haojun Liao 已提交
151 152
    }

H
Haojun Liao 已提交
153
    SGroupKeys* pkey = taosArrayGet(pGroupColVals, i);
H
Haojun Liao 已提交
154 155 156
    if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
      pkey->isNull = true;
    } else {
157
      pkey->isNull = false;
H
Haojun Liao 已提交
158
      char* val = colDataGetData(pColInfoData, rowIndex);
wmmhello's avatar
wmmhello 已提交
159
      if (pkey->type == TSDB_DATA_TYPE_JSON) {
160
        if (tTagIsJson(val)) {
wmmhello's avatar
wmmhello 已提交
161 162 163
          terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
          return;
        }
wmmhello's avatar
wmmhello 已提交
164 165 166
        int32_t dataLen = getJsonValueLen(val);
        memcpy(pkey->pData, val, dataLen);
      } else if (IS_VAR_DATA_TYPE(pkey->type)) {
H
Haojun Liao 已提交
167
        memcpy(pkey->pData, val, varDataTLen(val));
168
        ASSERT(varDataTLen(val) <= pkey->bytes);
H
Haojun Liao 已提交
169 170 171 172 173 174 175
      } else {
        memcpy(pkey->pData, val, pkey->bytes);
      }
    }
  }
}

wmmhello's avatar
wmmhello 已提交
176
static int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) {
H
Haojun Liao 已提交
177 178 179 180 181 182 183 184 185 186 187 188 189
  ASSERT(pKey != NULL);
  size_t numOfGroupCols = taosArrayGetSize(pGroupColVals);

  char* isNull = (char*)pKey;
  char* pStart = (char*)pKey + sizeof(int8_t) * numOfGroupCols;
  for (int32_t i = 0; i < numOfGroupCols; ++i) {
    SGroupKeys* pkey = taosArrayGet(pGroupColVals, i);
    if (pkey->isNull) {
      isNull[i] = 1;
      continue;
    }

    isNull[i] = 0;
wmmhello's avatar
wmmhello 已提交
190 191 192 193 194
    if (pkey->type == TSDB_DATA_TYPE_JSON) {
      int32_t dataLen = getJsonValueLen(pkey->pData);
      memcpy(pStart, (pkey->pData), dataLen);
      pStart += dataLen;
    } else if (IS_VAR_DATA_TYPE(pkey->type)) {
H
Haojun Liao 已提交
195 196 197 198 199 200 201 202 203
      varDataCopy(pStart, pkey->pData);
      pStart += varDataTLen(pkey->pData);
      ASSERT(varDataTLen(pkey->pData) <= pkey->bytes);
    } else {
      memcpy(pStart, pkey->pData, pkey->bytes);
      pStart += pkey->bytes;
    }
  }

204
  return (int32_t)(pStart - (char*)pKey);
H
Haojun Liao 已提交
205 206 207 208 209
}

// assign the group keys or user input constant values if required
static void doAssignGroupKeys(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t totalRows, int32_t rowIndex) {
  for (int32_t i = 0; i < numOfOutput; ++i) {
210
    if (pCtx[i].functionId == -1) {  // select count(*),key from t group by key.
H
Haojun Liao 已提交
211 212 213
      SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[i]);

      SColumnInfoData* pColInfoData = pCtx[i].input.pData[0];
214
      // todo OPT all/all not NULL
H
Haojun Liao 已提交
215 216 217 218
      if (!colDataIsNull(pColInfoData, totalRows, rowIndex, NULL)) {
        char* dest = GET_ROWCELL_INTERBUF(pEntryInfo);
        char* data = colDataGetData(pColInfoData, rowIndex);

wmmhello's avatar
wmmhello 已提交
219 220 221 222
        if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
          int32_t dataLen = getJsonValueLen(data);
          memcpy(dest, data, dataLen);
        } else if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
223 224 225 226
          varDataCopy(dest, data);
        } else {
          memcpy(dest, data, pColInfoData->info.bytes);
        }
227
      } else {  // it is a NULL value
H
Haojun Liao 已提交
228
        pEntryInfo->isNullRes = 1;
H
Haojun Liao 已提交
229
      }
H
Haojun Liao 已提交
230 231

      pEntryInfo->numOfRes = 1;
H
Haojun Liao 已提交
232 233 234 235 236 237 238 239
    }
  }
}

static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
  SExecTaskInfo*        pTaskInfo = pOperator->pTaskInfo;
  SGroupbyOperatorInfo* pInfo = pOperator->info;

240
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
H
Haojun Liao 已提交
241 242 243 244 245 246 247 248 249
  int32_t         numOfGroupCols = taosArrayGetSize(pInfo->pGroupCols);
  //  if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) {
  // qError("QInfo:0x%"PRIx64" group by not supported on double/float columns, abort", GET_TASKID(pRuntimeEnv));
  //    return;
  //  }

  int32_t     len = 0;
  STimeWindow w = TSWINDOW_INITIALIZER;

wmmhello's avatar
wmmhello 已提交
250
  terrno = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
251 252 253 254
  int32_t num = 0;
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
    // Compare with the previous row of this column, and do not set the output buffer again if they are identical.
    if (!pInfo->isInit) {
255
      recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
wmmhello's avatar
wmmhello 已提交
256
      if (terrno != TSDB_CODE_SUCCESS) {  // group by json error
257
        T_LONG_JMP(pTaskInfo->env, terrno);
wmmhello's avatar
wmmhello 已提交
258
      }
H
Haojun Liao 已提交
259 260 261 262 263
      pInfo->isInit = true;
      num++;
      continue;
    }

H
Haojun Liao 已提交
264
    bool equal = groupKeyCompare(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols);
H
Haojun Liao 已提交
265 266 267 268 269
    if (equal) {
      num++;
      continue;
    }

H
Haojun Liao 已提交
270
    // The first row of a new block does not belongs to the previous existed group
271
    if (j == 0) {
H
Haojun Liao 已提交
272
      num++;
273
      recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
wmmhello's avatar
wmmhello 已提交
274
      if (terrno != TSDB_CODE_SUCCESS) {  // group by json error
275
        T_LONG_JMP(pTaskInfo->env, terrno);
wmmhello's avatar
wmmhello 已提交
276
      }
H
Haojun Liao 已提交
277 278 279
      continue;
    }

H
Haojun Liao 已提交
280
    len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
281 282
    int32_t ret = setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf,
                                          len, pBlock->info.groupId, pInfo->aggSup.pResultBuf, &pInfo->aggSup);
H
Haojun Liao 已提交
283
    if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
284
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
H
Haojun Liao 已提交
285 286 287
    }

    int32_t rowIndex = j - num;
288
    doApplyFunctions(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows, pOperator->exprSupp.numOfExprs);
H
Haojun Liao 已提交
289 290

    // assign the group keys or user input constant values if required
291
    doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex);
292
    recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
H
Haojun Liao 已提交
293 294 295 296
    num = 1;
  }

  if (num > 0) {
H
Haojun Liao 已提交
297
    len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
298 299
    int32_t ret = setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf,
                                          len, pBlock->info.groupId, pInfo->aggSup.pResultBuf, &pInfo->aggSup);
H
Haojun Liao 已提交
300
    if (ret != TSDB_CODE_SUCCESS) {
301
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
H
Haojun Liao 已提交
302 303 304
    }

    int32_t rowIndex = pBlock->info.rows - num;
305
    doApplyFunctions(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows, pOperator->exprSupp.numOfExprs);
306
    doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex);
H
Haojun Liao 已提交
307 308 309
  }
}

310 311 312 313
static SSDataBlock* buildGroupResultDataBlock(SOperatorInfo* pOperator) {
  SGroupbyOperatorInfo* pInfo = pOperator->info;

  SSDataBlock* pRes = pInfo->binfo.pRes;
314
  while (1) {
315
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
H
Haojun Liao 已提交
316
    doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
317

318
    if (!hasRemainResults(&pInfo->groupResInfo)) {
H
Haojun Liao 已提交
319
      setOperatorCompleted(pOperator);
320 321 322 323 324 325 326 327 328
      break;
    }

    if (pRes->info.rows > 0) {
      break;
    }
  }

  pOperator->resultInfo.totalRows += pRes->info.rows;
329
  return (pRes->info.rows == 0) ? NULL : pRes;
330 331
}

332
static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
333 334 335 336
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

337 338
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

H
Haojun Liao 已提交
339 340
  SGroupbyOperatorInfo* pInfo = pOperator->info;
  if (pOperator->status == OP_RES_TO_RETURN) {
341
    return buildGroupResultDataBlock(pOperator);
H
Haojun Liao 已提交
342 343
  }

344 345 346
  int32_t order = TSDB_ORDER_ASC;
  int32_t scanFlag = MAIN_SCAN;

347
  int64_t        st = taosGetTimestampUs();
H
Haojun Liao 已提交
348 349 350
  SOperatorInfo* downstream = pOperator->pDownstream[0];

  while (1) {
351
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
H
Haojun Liao 已提交
352 353 354 355
    if (pBlock == NULL) {
      break;
    }

356 357
    int32_t code = getTableScanInfo(pOperator, &order, &scanFlag);
    if (code != TSDB_CODE_SUCCESS) {
358
      T_LONG_JMP(pTaskInfo->env, code);
359 360
    }

H
Haojun Liao 已提交
361
    // the pDataBlock are always the same one, no need to call this again
362
    setInputDataBlock(&pOperator->exprSupp, pBlock, order, scanFlag, true);
363

364
    // there is an scalar expression that needs to be calculated right before apply the group aggregation.
365
    if (pInfo->scalarSup.pExprInfo != NULL) {
366 367
      pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
                                              pInfo->scalarSup.numOfExprs, NULL);
368
      if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
369
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
370
      }
371 372
    }

H
Haojun Liao 已提交
373 374 375 376
    doHashGroupbyAgg(pOperator, pBlock);
  }

  pOperator->status = OP_RES_TO_RETURN;
H
Haojun Liao 已提交
377

378 379 380 381 382 383 384 385 386 387 388 389 390 391
#if 0
  if(pOperator->fpSet.encodeResultRow){
    char *result = NULL;
    int32_t length = 0;
    pOperator->fpSet.encodeResultRow(pOperator, &result, &length);
    SAggSupporter* pSup = &pInfo->aggSup;
    taosHashClear(pSup->pResultRowHashTable);
    pInfo->binfo.resultRowInfo.size = 0;
    pOperator->fpSet.decodeResultRow(pOperator, result);
    if(result){
      taosMemoryFree(result);
    }
  }
#endif
392
  initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, 0);
393

394
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
395
  return buildGroupResultDataBlock(pOperator);
H
Haojun Liao 已提交
396 397
}

5
54liuyao 已提交
398
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
399 400 401 402 403 404
  SGroupbyOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupbyOperatorInfo));
  SOperatorInfo*        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

H
Haojun Liao 已提交
405 406 407 408 409 410 411 412 413 414
  SSDataBlock* pResBlock = createResDataBlock(pAggNode->node.pOutputDataBlockDesc);
  initBasicInfo(&pInfo->binfo, pResBlock);

  int32_t    numOfScalarExpr = 0;
  SExprInfo* pScalarExprInfo = NULL;
  if (pAggNode->pExprs != NULL) {
    pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr);
  }

  pInfo->pGroupCols = extractColumnInfo(pAggNode->pGroupKeys);
415 416 417 418
  int32_t code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
419

H
Haojun Liao 已提交
420
  initResultSizeInfo(&pOperator->resultInfo, 4096);
H
Haojun Liao 已提交
421 422
  blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);

H
Haojun Liao 已提交
423
  code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pInfo->pGroupCols);
H
Haojun Liao 已提交
424 425 426 427
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
428 429
  int32_t    num = 0;
  SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
H
Haojun Liao 已提交
430
  code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, pInfo->groupKeyLen, pTaskInfo->id.str);
431 432 433 434
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

435 436 437 438 439
  code = filterInitFromNode((SNode*)pAggNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

440
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
441
  setOperatorInfo(pOperator, "GroupbyAggOperator", 0, true, OP_NOT_OPENED, pInfo, pTaskInfo);
H
Haojun Liao 已提交
442

5
54liuyao 已提交
443
  pOperator->fpSet =
H
Haojun Liao 已提交
444
      createOperatorFpSet(operatorDummyOpenFn, hashGroupbyAggregate, NULL, destroyGroupOperatorInfo, NULL);
H
Haojun Liao 已提交
445
  code = appendDownstream(pOperator, &downstream, 1);
446 447 448 449
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
450 451
  return pOperator;

452
_error:
H
Haojun Liao 已提交
453
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
454 455 456
  if (pInfo != NULL) {
    destroyGroupOperatorInfo(pInfo);
  }
H
Haojun Liao 已提交
457 458
  taosMemoryFreeClear(pOperator);
  return NULL;
459 460
}

H
Haojun Liao 已提交
461 462 463 464
static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
  SPartitionOperatorInfo* pInfo = pOperator->info;

  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
465
    recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
H
Haojun Liao 已提交
466 467
    int32_t len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);

468
    SDataGroupInfo* pGroupInfo = NULL;
469
    void*           pPage = getCurrentDataGroupInfo(pInfo, &pGroupInfo, len);
H
Haojun Liao 已提交
470

471 472 473 474 475
    pGroupInfo->numOfRows += 1;

    // group id
    if (pGroupInfo->groupId == 0) {
      pGroupInfo->groupId = calcGroupId(pInfo->keyBuf, len);
H
Haojun Liao 已提交
476 477
    }

478
    // number of rows
479
    int32_t* rows = (int32_t*)pPage;
H
Haojun Liao 已提交
480

481
    size_t numOfCols = pOperator->exprSupp.numOfExprs;
482
    for (int32_t i = 0; i < numOfCols; ++i) {
483
      SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i];
484
      int32_t    slotId = pExpr->base.pParam[0].pCol->slotId;
485 486

      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
H
Haojun Liao 已提交
487

H
Haojun Liao 已提交
488 489
      int32_t bytes = pColInfoData->info.bytes;
      int32_t startOffset = pInfo->columnOffset[i];
H
Haojun Liao 已提交
490

491
      int32_t* columnLen = NULL;
492
      int32_t  contentLen = 0;
H
Haojun Liao 已提交
493 494

      if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
wafwerar's avatar
wafwerar 已提交
495
        int32_t* offset = (int32_t*)((char*)pPage + startOffset);
496 497
        columnLen = (int32_t*)((char*)pPage + startOffset + sizeof(int32_t) * pInfo->rowCapacity);
        char* data = (char*)((char*)columnLen + sizeof(int32_t));
H
Haojun Liao 已提交
498 499 500 501

        if (colDataIsNull_s(pColInfoData, j)) {
          offset[(*rows)] = -1;
          contentLen = 0;
502
        } else if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
wmmhello's avatar
wmmhello 已提交
503
          offset[*rows] = (*columnLen);
504
          char*   src = colDataGetData(pColInfoData, j);
wmmhello's avatar
wmmhello 已提交
505 506 507 508 509 510 511
          int32_t dataLen = getJsonValueLen(src);

          memcpy(data + (*columnLen), src, dataLen);
          int32_t v = (data + (*columnLen) + dataLen - (char*)pPage);
          ASSERT(v > 0);

          contentLen = dataLen;
H
Haojun Liao 已提交
512 513 514 515
        } else {
          offset[*rows] = (*columnLen);
          char* src = colDataGetData(pColInfoData, j);
          memcpy(data + (*columnLen), src, varDataTLen(src));
516 517 518
          int32_t v = (data + (*columnLen) + varDataTLen(src) - (char*)pPage);
          ASSERT(v > 0);

H
Haojun Liao 已提交
519 520
          contentLen = varDataTLen(src);
        }
H
Haojun Liao 已提交
521
      } else {
wafwerar's avatar
wafwerar 已提交
522
        char* bitmap = (char*)pPage + startOffset;
523 524
        columnLen = (int32_t*)((char*)pPage + startOffset + BitmapLen(pInfo->rowCapacity));
        char* data = (char*)columnLen + sizeof(int32_t);
H
Haojun Liao 已提交
525 526 527

        bool isNull = colDataIsNull_f(pColInfoData->nullbitmap, j);
        if (isNull) {
H
Haojun Liao 已提交
528
          colDataSetNull_f(bitmap, (*rows));
H
Haojun Liao 已提交
529
        } else {
H
Haojun Liao 已提交
530
          memcpy(data + (*columnLen), colDataGetData(pColInfoData, j), bytes);
H
Haojun Liao 已提交
531
          ASSERT((data + (*columnLen) + bytes - (char*)pPage) <= getBufPageSize(pInfo->pBuf));
H
Haojun Liao 已提交
532
        }
H
Haojun Liao 已提交
533
        contentLen = bytes;
H
Haojun Liao 已提交
534
      }
H
Haojun Liao 已提交
535 536

      (*columnLen) += contentLen;
537
      ASSERT(*columnLen >= 0);
H
Haojun Liao 已提交
538 539
    }

H
Haojun Liao 已提交
540 541
    (*rows) += 1;

H
Haojun Liao 已提交
542 543 544
    setBufPageDirty(pPage, true);
    releaseBufPage(pInfo->pBuf, pPage);
  }
H
Haojun Liao 已提交
545 546 547 548 549 550
}

void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len) {
  SDataGroupInfo* p = taosHashGet(pInfo->pGroupSet, pInfo->keyBuf, len);

  void* pPage = NULL;
551
  if (p == NULL) {  // it is a new group
H
Haojun Liao 已提交
552 553 554 555 556 557 558
    SDataGroupInfo gi = {0};
    gi.pPageList = taosArrayInit(100, sizeof(int32_t));
    taosHashPut(pInfo->pGroupSet, pInfo->keyBuf, len, &gi, sizeof(SDataGroupInfo));

    p = taosHashGet(pInfo->pGroupSet, pInfo->keyBuf, len);

    int32_t pageId = 0;
559
    pPage = getNewBufPage(pInfo->pBuf, &pageId);
H
Haojun Liao 已提交
560 561
    taosArrayPush(p->pPageList, &pageId);

562
    *(int32_t*)pPage = 0;
H
Haojun Liao 已提交
563 564 565 566
  } else {
    int32_t* curId = taosArrayGetLast(p->pPageList);
    pPage = getBufPage(pInfo->pBuf, *curId);

567
    int32_t* rows = (int32_t*)pPage;
H
Haojun Liao 已提交
568
    if (*rows >= pInfo->rowCapacity) {
569 570 571
      // release buffer
      releaseBufPage(pInfo->pBuf, pPage);

H
Haojun Liao 已提交
572 573
      // add a new page for current group
      int32_t pageId = 0;
574
      pPage = getNewBufPage(pInfo->pBuf, &pageId);
H
Haojun Liao 已提交
575
      taosArrayPush(p->pPageList, &pageId);
576
      memset(pPage, 0, getBufPageSize(pInfo->pBuf));
H
Haojun Liao 已提交
577 578
    }
  }
H
Haojun Liao 已提交
579

H
Haojun Liao 已提交
580 581 582 583 584 585 586 587 588 589 590 591 592 593
  *pGroupInfo = p;
  return pPage;
}

uint64_t calcGroupId(char* pData, int32_t len) {
  T_MD5_CTX context;
  tMD5Init(&context);
  tMD5Update(&context, (uint8_t*)pData, len);
  tMD5Final(&context);

  // NOTE: only extract the initial 8 bytes of the final MD5 digest
  uint64_t id = 0;
  memcpy(&id, context.digest, sizeof(uint64_t));
  return id;
H
Haojun Liao 已提交
594 595
}

H
Haojun Liao 已提交
596
int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity) {
597
  size_t   numOfCols = taosArrayGetSize(pBlock->pDataBlock);
598
  int32_t* offset = taosMemoryCalloc(numOfCols, sizeof(int32_t));
H
Haojun Liao 已提交
599

600 601
  offset[0] = sizeof(int32_t) +
              sizeof(uint64_t);  // the number of rows in current page, ref to SSDataBlock paged serialization format
H
Haojun Liao 已提交
602

603
  for (int32_t i = 0; i < numOfCols - 1; ++i) {
H
Haojun Liao 已提交
604 605 606 607
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);

    int32_t bytes = pColInfoData->info.bytes;
    int32_t payloadLen = bytes * rowCapacity;
608

H
Haojun Liao 已提交
609 610 611 612 613 614 615 616 617 618 619 620
    if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
      // offset segment + content length + payload
      offset[i + 1] = rowCapacity * sizeof(int32_t) + sizeof(int32_t) + payloadLen + offset[i];
    } else {
      // bitmap + content length + payload
      offset[i + 1] = BitmapLen(rowCapacity) + sizeof(int32_t) + payloadLen + offset[i];
    }
  }

  return offset;
}

5
54liuyao 已提交
621
static void clearPartitionOperator(SPartitionOperatorInfo* pInfo) {
622 623 624 625
  int32_t size = taosArrayGetSize(pInfo->sortedGroupArray);
  for (int32_t i = 0; i < size; i++) {
    SDataGroupInfo* pGp = taosArrayGet(pInfo->sortedGroupArray, i);
    taosArrayDestroy(pGp->pPageList);
5
54liuyao 已提交
626
  }
627
  taosArrayClear(pInfo->sortedGroupArray);
5
54liuyao 已提交
628 629 630
  clearDiskbasedBuf(pInfo->pBuf);
}

631 632 633
static int compareDataGroupInfo(const void* group1, const void* group2) {
  const SDataGroupInfo* pGroupInfo1 = group1;
  const SDataGroupInfo* pGroupInfo2 = group2;
634 635 636 637 638 639

  if (pGroupInfo1->groupId == pGroupInfo2->groupId) {
    ASSERT(0);
    return 0;
  }

640
  return (pGroupInfo1->groupId < pGroupInfo2->groupId) ? -1 : 1;
641 642
}

H
Haojun Liao 已提交
643 644 645
static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
  SPartitionOperatorInfo* pInfo = pOperator->info;

646 647
  SDataGroupInfo* pGroupInfo =
      (pInfo->groupIndex != -1) ? taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex) : NULL;
648
  if (pInfo->groupIndex == -1 || pInfo->pageIndex >= taosArrayGetSize(pGroupInfo->pPageList)) {
H
Haojun Liao 已提交
649
    // try next group data
650 651
    ++pInfo->groupIndex;
    if (pInfo->groupIndex >= taosArrayGetSize(pInfo->sortedGroupArray)) {
H
Haojun Liao 已提交
652
      setOperatorCompleted(pOperator);
5
54liuyao 已提交
653
      clearPartitionOperator(pInfo);
H
Haojun Liao 已提交
654 655 656
      return NULL;
    }

657
    pGroupInfo = taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex);
H
Haojun Liao 已提交
658 659 660 661
    pInfo->pageIndex = 0;
  }

  int32_t* pageId = taosArrayGet(pGroupInfo->pPageList, pInfo->pageIndex);
662
  void*    page = getBufPage(pInfo->pBuf, *pageId);
H
Haojun Liao 已提交
663

664
  blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->rowCapacity);
H
Haojun Liao 已提交
665
  blockDataFromBuf1(pInfo->binfo.pRes, page, pInfo->rowCapacity);
H
Haojun Liao 已提交
666 667

  pInfo->pageIndex += 1;
668
  releaseBufPage(pInfo->pBuf, page);
H
Haojun Liao 已提交
669

670
  blockDataUpdateTsWindow(pInfo->binfo.pRes, 0);
H
Haojun Liao 已提交
671
  pInfo->binfo.pRes->info.groupId = pGroupInfo->groupId;
672 673

  pOperator->resultInfo.totalRows += pInfo->binfo.pRes->info.rows;
H
Haojun Liao 已提交
674 675 676
  return pInfo->binfo.pRes;
}

677
static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
678 679
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
680 681
  }

682 683 684
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

  SPartitionOperatorInfo* pInfo = pOperator->info;
685
  SSDataBlock*            pRes = pInfo->binfo.pRes;
686

H
Haojun Liao 已提交
687
  if (pOperator->status == OP_RES_TO_RETURN) {
H
Haojun Liao 已提交
688 689
    blockDataCleanup(pRes);
    return buildPartitionResult(pOperator);
H
Haojun Liao 已提交
690 691
  }

692
  int64_t        st = taosGetTimestampUs();
H
Haojun Liao 已提交
693
  SOperatorInfo* downstream = pOperator->pDownstream[0];
H
Haojun Liao 已提交
694

H
Haojun Liao 已提交
695
  while (1) {
696
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
H
Haojun Liao 已提交
697 698 699
    if (pBlock == NULL) {
      break;
    }
H
Haojun Liao 已提交
700

701
    // there is an scalar expression that needs to be calculated right before apply the group aggregation.
702
    if (pInfo->scalarSup.pExprInfo != NULL) {
703 704
      pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
                                              pInfo->scalarSup.numOfExprs, NULL);
705
      if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
706
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
707 708 709
      }
    }

wmmhello's avatar
wmmhello 已提交
710
    terrno = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
711
    doHashPartition(pOperator, pBlock);
wmmhello's avatar
wmmhello 已提交
712
    if (terrno != TSDB_CODE_SUCCESS) {  // group by json error
713
      T_LONG_JMP(pTaskInfo->env, terrno);
wmmhello's avatar
wmmhello 已提交
714
    }
H
Haojun Liao 已提交
715 716
  }

717
  SArray* groupArray = taosArrayInit(taosHashGetSize(pInfo->pGroupSet), sizeof(SDataGroupInfo));
718 719

  void* pGroupIter = taosHashIterate(pInfo->pGroupSet, NULL);
720 721 722 723 724 725 726 727 728 729 730
  while (pGroupIter != NULL) {
    SDataGroupInfo* pGroupInfo = pGroupIter;
    taosArrayPush(groupArray, pGroupInfo);
    pGroupIter = taosHashIterate(pInfo->pGroupSet, pGroupIter);
  }

  taosArraySort(groupArray, compareDataGroupInfo);
  pInfo->sortedGroupArray = groupArray;
  pInfo->groupIndex = -1;
  taosHashClear(pInfo->pGroupSet);

731 732
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;

H
Haojun Liao 已提交
733
  pOperator->status = OP_RES_TO_RETURN;
H
Haojun Liao 已提交
734 735 736 737
  blockDataEnsureCapacity(pRes, 4096);
  return buildPartitionResult(pOperator);
}

738
static void destroyPartitionOperatorInfo(void* param) {
H
Haojun Liao 已提交
739
  SPartitionOperatorInfo* pInfo = (SPartitionOperatorInfo*)param;
740
  cleanupBasicInfo(&pInfo->binfo);
H
Haojun Liao 已提交
741
  taosArrayDestroy(pInfo->pGroupCols);
742

743
  for (int i = 0; i < taosArrayGetSize(pInfo->pGroupColVals); i++) {
wmmhello's avatar
wmmhello 已提交
744 745 746
    SGroupKeys key = *(SGroupKeys*)taosArrayGet(pInfo->pGroupColVals, i);
    taosMemoryFree(key.pData);
  }
747

H
Haojun Liao 已提交
748
  taosArrayDestroy(pInfo->pGroupColVals);
H
Haojun Liao 已提交
749
  taosMemoryFree(pInfo->keyBuf);
750
  taosArrayDestroy(pInfo->sortedGroupArray);
D
dapan1121 已提交
751 752 753 754 755 756 757 758

  void* pGroupIter = taosHashIterate(pInfo->pGroupSet, NULL);
  while (pGroupIter != NULL) {
    SDataGroupInfo* pGroupInfo = pGroupIter;
    taosArrayDestroy(pGroupInfo->pPageList);
    pGroupIter = taosHashIterate(pInfo->pGroupSet, pGroupIter);
  }

wmmhello's avatar
wmmhello 已提交
759
  taosHashCleanup(pInfo->pGroupSet);
H
Haojun Liao 已提交
760
  taosMemoryFree(pInfo->columnOffset);
761

762
  cleanupExprSupp(&pInfo->scalarSup);
H
Haojun Liao 已提交
763
  destroyDiskbasedBuf(pInfo->pBuf);
D
dapan1121 已提交
764
  taosMemoryFreeClear(param);
H
Haojun Liao 已提交
765 766
}

767 768
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode,
                                           SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
769
  SPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SPartitionOperatorInfo));
770
  SOperatorInfo*          pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
771
  if (pInfo == NULL || pOperator == NULL) {
H
Haojun Liao 已提交
772 773
    goto _error;
  }
774

775
  int32_t    numOfCols = 0;
776 777 778 779
  SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &numOfCols);
  pInfo->pGroupCols = extractPartitionColInfo(pPartNode->pPartitionKeys);

  if (pPartNode->pExprs != NULL) {
780
    int32_t    num = 0;
781
    SExprInfo* pExprInfo1 = createExprInfo(pPartNode->pExprs, NULL, &num);
782
    int32_t    code = initExprSupp(&pInfo->scalarSup, pExprInfo1, num);
783 784 785
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }
786
  }
H
Haojun Liao 已提交
787 788 789 790 791 792 793

  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
  pInfo->pGroupSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK);
  if (pInfo->pGroupSet == NULL) {
    goto _error;
  }

794
  uint32_t defaultPgsz = 0;
795
  uint32_t defaultBufsz = 0;
H
Haojun Liao 已提交
796

H
Haojun Liao 已提交
797 798
  pInfo->binfo.pRes = createResDataBlock(pPartNode->node.pOutputDataBlockDesc);
  getBufferPgSize(pInfo->binfo.pRes->info.rowSize, &defaultPgsz, &defaultBufsz);
799

wafwerar's avatar
wafwerar 已提交
800 801 802 803 804 805
  if (!osTempSpaceAvailable()) {
    terrno = TSDB_CODE_NO_AVAIL_DISK;
    pTaskInfo->code = terrno;
    qError("Create partition operator info failed since %s", terrstr(terrno));
    goto _error;
  }
H
Haojun Liao 已提交
806

wafwerar's avatar
wafwerar 已提交
807
  int32_t code = createDiskbasedBuf(&pInfo->pBuf, defaultPgsz, defaultBufsz, pTaskInfo->id.str, tsTempDir);
H
Haojun Liao 已提交
808 809 810 811
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
812 813
  pInfo->rowCapacity = blockDataGetCapacityInRow(pInfo->binfo.pRes, getBufPageSize(pInfo->pBuf));
  pInfo->columnOffset = setupColumnOffset(pInfo->binfo.pRes, pInfo->rowCapacity);
814
  code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pInfo->pGroupCols);
H
Haojun Liao 已提交
815 816 817
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
H
Haojun Liao 已提交
818

L
Liu Jicong 已提交
819 820
  setOperatorInfo(pOperator, "PartitionOperator", QUERY_NODE_PHYSICAL_PLAN_PARTITION, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
821 822
  pOperator->exprSupp.numOfExprs = numOfCols;
  pOperator->exprSupp.pExprInfo = pExprInfo;
823

L
Liu Jicong 已提交
824
  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashPartition, NULL, destroyPartitionOperatorInfo, NULL);
H
Haojun Liao 已提交
825

H
Haojun Liao 已提交
826
  code = appendDownstream(pOperator, &downstream, 1);
827 828
  return pOperator;

829
_error:
H
Haojun Liao 已提交
830
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
831 832 833
  if (pInfo != NULL) {
    destroyPartitionOperatorInfo(pInfo);
  }
H
Haojun Liao 已提交
834
  taosMemoryFreeClear(pOperator);
835
  return NULL;
836 837
}

838 839 840
int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData,
                                int16_t bytes, uint64_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup) {
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
841
  SResultRowInfo* pResultRowInfo = &binfo->resultRowInfo;
842
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
843 844 845 846 847

  SResultRow* pResultRow =
      doSetResultOutBufByKey(pBuf, pResultRowInfo, (char*)pData, bytes, true, groupId, pTaskInfo, false, pAggSup);
  assert(pResultRow != NULL);

848
  setResultRowInitCtx(pResultRow, pCtx, numOfCols, pOperator->exprSupp.rowEntryInfoOffset);
849
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
850
}
851 852 853

uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId) {
  if (pExprSup->pExprInfo != NULL) {
854 855
    int32_t code =
        projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
856 857 858 859 860
    if (code != TSDB_CODE_SUCCESS) {
      qError("calaculate group id error, code:%d", code);
    }
  }
  recordNewGroupKeys(pParSup->pGroupCols, pParSup->pGroupColVals, pBlock, rowId);
861
  int32_t  len = buildGroupKeys(pParSup->keyBuf, pParSup->pGroupColVals);
862 863 864 865
  uint64_t groupId = calcGroupId(pParSup->keyBuf, len);
  return groupId;
}

866
static bool hasRemainPartion(SStreamPartitionOperatorInfo* pInfo) { return pInfo->parIte != NULL; }
867 868 869

static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
  SStreamPartitionOperatorInfo* pInfo = pOperator->info;
870
  SSDataBlock*                  pDest = pInfo->binfo.pRes;
871 872 873
  ASSERT(hasRemainPartion(pInfo));
  SPartitionDataInfo* pParInfo = (SPartitionDataInfo*)pInfo->parIte;
  blockDataCleanup(pDest);
874
  int32_t      rows = taosArrayGetSize(pParInfo->rowIds);
875 876 877 878
  SSDataBlock* pSrc = pInfo->pInputDataBlock;
  for (int32_t i = 0; i < rows; i++) {
    int32_t rowIndex = *(int32_t*)taosArrayGet(pParInfo->rowIds, i);
    for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; j++) {
879
      int32_t          slotId = pOperator->exprSupp.pExprInfo[j].base.pParam[0].pCol->slotId;
880 881
      SColumnInfoData* pSrcCol = taosArrayGet(pSrc->pDataBlock, slotId);
      SColumnInfoData* pDestCol = taosArrayGet(pDest->pDataBlock, j);
882 883
      bool             isNull = colDataIsNull(pSrcCol, pSrc->info.rows, rowIndex, NULL);
      char*            pSrcData = colDataGetData(pSrcCol, rowIndex);
884 885 886
      colDataAppend(pDestCol, pDest->info.rows, pSrcData, isNull);
    }
    pDest->info.rows++;
L
Liu Jicong 已提交
887
    if (pInfo->tbnameCalSup.numOfExprs > 0 && i == 0) {
888 889 890 891 892 893 894 895 896 897 898
      SSDataBlock* pTmpBlock = blockCopyOneRow(pSrc, rowIndex);
      SSDataBlock* pResBlock = createDataBlock();
      pResBlock->info.rowSize = TSDB_TABLE_NAME_LEN;
      SColumnInfoData data = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, TSDB_TABLE_NAME_LEN, 0);
      taosArrayPush(pResBlock->pDataBlock, &data);
      blockDataEnsureCapacity(pResBlock, 1);
      projectApplyFunctions(pInfo->tbnameCalSup.pExprInfo, pResBlock, pTmpBlock, pInfo->tbnameCalSup.pCtx, 1, NULL);
      ASSERT(pResBlock->info.rows == 1);
      ASSERT(taosArrayGetSize(pResBlock->pDataBlock) == 1);
      SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, 0);
      ASSERT(pCol->info.type == TSDB_DATA_TYPE_VARCHAR);
899 900
      void* pData = colDataGetVarData(pCol, 0);
      // TODO check tbname validity
901
      if (pData != (void*)-1) {
902
        memset(pDest->info.parTbName, 0, TSDB_TABLE_NAME_LEN);
L
Liu Jicong 已提交
903
        int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1);
904 905
        memcpy(pDest->info.parTbName, varDataVal(pData), len);
        /*pDest->info.parTbName[len + 1] = 0;*/
906 907 908
      } else {
        pDest->info.parTbName[0] = 0;
      }
L
Liu Jicong 已提交
909 910 911
      /*printf("\n\n set name %s\n\n", pDest->info.parTbName);*/
      blockDataDestroy(pTmpBlock);
      blockDataDestroy(pResBlock);
912
    }
913
  }
914 915
  taosArrayDestroy(pParInfo->rowIds);
  pParInfo->rowIds = NULL;
916 917 918 919 920 921 922 923 924 925 926 927 928
  blockDataUpdateTsWindow(pDest, pInfo->tsColIndex);
  pDest->info.groupId = pParInfo->groupId;
  pOperator->resultInfo.totalRows += pDest->info.rows;
  pInfo->parIte = taosHashIterate(pInfo->pPartitions, pInfo->parIte);
  ASSERT(pDest->info.rows > 0);
  printDataBlock(pDest, "stream partitionby");
  return pDest;
}

static void doStreamHashPartitionImpl(SStreamPartitionOperatorInfo* pInfo, SSDataBlock* pBlock) {
  pInfo->pInputDataBlock = pBlock;
  for (int32_t i = 0; i < pBlock->info.rows; ++i) {
    recordNewGroupKeys(pInfo->partitionSup.pGroupCols, pInfo->partitionSup.pGroupColVals, pBlock, i);
929 930 931
    int32_t             keyLen = buildGroupKeys(pInfo->partitionSup.keyBuf, pInfo->partitionSup.pGroupColVals);
    SPartitionDataInfo* pParData =
        (SPartitionDataInfo*)taosHashGet(pInfo->pPartitions, pInfo->partitionSup.keyBuf, keyLen);
932 933 934 935 936 937 938
    if (pParData) {
      taosArrayPush(pParData->rowIds, &i);
    } else {
      SPartitionDataInfo newParData = {0};
      newParData.groupId = calcGroupId(pInfo->partitionSup.keyBuf, keyLen);
      newParData.rowIds = taosArrayInit(64, sizeof(int32_t));
      taosArrayPush(newParData.rowIds, &i);
939
      taosHashPut(pInfo->pPartitions, pInfo->partitionSup.keyBuf, keyLen, &newParData, sizeof(SPartitionDataInfo));
940 941 942 943 944 945 946 947 948
    }
  }
}

static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

949
  SExecTaskInfo*                pTaskInfo = pOperator->pTaskInfo;
950 951 952 953 954
  SStreamPartitionOperatorInfo* pInfo = pOperator->info;
  if (hasRemainPartion(pInfo)) {
    return buildStreamPartitionResult(pOperator);
  }

955
  int64_t        st = taosGetTimestampUs();
956 957 958 959 960
  SOperatorInfo* downstream = pOperator->pDownstream[0];
  {
    pInfo->pInputDataBlock = NULL;
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
    if (pBlock == NULL) {
H
Haojun Liao 已提交
961
      setOperatorCompleted(pOperator);
962 963 964 965 966 967 968 969 970
      return NULL;
    }
    printDataBlock(pBlock, "stream partitionby recv");
    switch (pBlock->info.type) {
      case STREAM_NORMAL:
      case STREAM_PULL_DATA:
      case STREAM_INVALID:
        pInfo->binfo.pRes->info.type = pBlock->info.type;
        break;
971 972 973
      case STREAM_DELETE_DATA: {
        copyDataBlock(pInfo->pDelRes, pBlock);
        pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;
5
54liuyao 已提交
974
        printDataBlock(pInfo->pDelRes, "stream partitionby delete");
975
        return pInfo->pDelRes;
976
      } break;
977 978 979 980 981 982
      default:
        return pBlock;
    }

    // there is an scalar expression that needs to be calculated right before apply the group aggregation.
    if (pInfo->scalarSup.pExprInfo != NULL) {
983 984
      pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
                                              pInfo->scalarSup.numOfExprs, NULL);
985 986 987 988 989 990 991 992
      if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
        longjmp(pTaskInfo->env, pTaskInfo->code);
      }
    }
    taosHashClear(pInfo->pPartitions);
    doStreamHashPartitionImpl(pInfo, pBlock);
  }
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
993

994 995 996 997 998 999 1000 1001 1002
  pInfo->parIte = taosHashIterate(pInfo->pPartitions, NULL);
  return buildStreamPartitionResult(pOperator);
}

static void destroyStreamPartitionOperatorInfo(void* param) {
  SStreamPartitionOperatorInfo* pInfo = (SStreamPartitionOperatorInfo*)param;
  cleanupBasicInfo(&pInfo->binfo);
  taosArrayDestroy(pInfo->partitionSup.pGroupCols);

1003
  for (int i = 0; i < taosArrayGetSize(pInfo->partitionSup.pGroupColVals); i++) {
1004 1005 1006 1007 1008 1009 1010
    SGroupKeys key = *(SGroupKeys*)taosArrayGet(pInfo->partitionSup.pGroupColVals, i);
    taosMemoryFree(key.pData);
  }
  taosArrayDestroy(pInfo->partitionSup.pGroupColVals);

  taosMemoryFree(pInfo->partitionSup.keyBuf);
  cleanupExprSupp(&pInfo->scalarSup);
L
Liu Jicong 已提交
1011 1012
  cleanupExprSupp(&pInfo->tbnameCalSup);
  cleanupExprSupp(&pInfo->tagCalSup);
1013
  blockDataDestroy(pInfo->pDelRes);
1014
  taosHashCleanup(pInfo->pPartitions);
1015 1016 1017 1018 1019 1020 1021 1022 1023 1024
  taosMemoryFreeClear(param);
}

void initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pParSup, SExprSupp* pExpr) {
  if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
    return;
  }
  SStreamScanInfo* pScanInfo = downstream->info;
  pScanInfo->partitionSup = *pParSup;
  pScanInfo->pPartScalarSup = pExpr;
5
54liuyao 已提交
1025 1026 1027
  if (!pScanInfo->pUpdateInfo) {
    pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, 0);
  }
1028 1029
}

1030 1031
SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPartitionPhysiNode* pPartNode,
                                                 SExecTaskInfo* pTaskInfo) {
1032 1033 1034 1035 1036 1037
  SStreamPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamPartitionOperatorInfo));
  SOperatorInfo*                pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
  int32_t code = TSDB_CODE_SUCCESS;
1038
  pInfo->partitionSup.pGroupCols = extractPartitionColInfo(pPartNode->part.pPartitionKeys);
1039

1040
  if (pPartNode->part.pExprs != NULL) {
1041
    int32_t    num = 0;
1042
    SExprInfo* pCalExprInfo = createExprInfo(pPartNode->part.pExprs, NULL, &num);
1043 1044 1045 1046 1047 1048
    code = initExprSupp(&pInfo->scalarSup, pCalExprInfo, num);
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }
  }

1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062
  if (pPartNode->pSubtable != NULL) {
    SExprInfo* pSubTableExpr = taosMemoryCalloc(1, sizeof(SExprInfo));
    if (pSubTableExpr == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _error;
    }
    pInfo->tbnameCalSup.pExprInfo = pSubTableExpr;
    createExprFromOneNode(pSubTableExpr, pPartNode->pSubtable, 0);
    code = initExprSupp(&pInfo->tbnameCalSup, pSubTableExpr, 1);
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }
  }

L
Liu Jicong 已提交
1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075
  if (pPartNode->pTags != NULL) {
    int32_t    numOfTags;
    SExprInfo* pTagExpr = createExprInfo(pPartNode->pTags, NULL, &numOfTags);
    if (pTagExpr == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto _error;
    }
    if (initExprSupp(&pInfo->tagCalSup, pTagExpr, numOfTags) != 0) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto _error;
    }
  }

1076
  int32_t keyLen = 0;
1077 1078
  code = initGroupOptrInfo(&pInfo->partitionSup.pGroupColVals, &keyLen, &pInfo->partitionSup.keyBuf,
                           pInfo->partitionSup.pGroupCols);
1079 1080 1081 1082 1083
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
  pInfo->partitionSup.needCalc = true;

1084 1085
  pInfo->binfo.pRes = createResDataBlock(pPartNode->part.node.pOutputDataBlockDesc);
  if (pInfo->binfo.pRes == NULL) {
1086 1087
    goto _error;
  }
1088 1089 1090

  blockDataEnsureCapacity(pInfo->binfo.pRes, 4096);

1091 1092
  pInfo->parIte = NULL;
  pInfo->pInputDataBlock = NULL;
1093

1094
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
1095 1096 1097
  pInfo->pPartitions = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
  pInfo->tsColIndex = 0;
  pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
1098

1099
  int32_t    numOfCols = 0;
1100
  SExprInfo* pExprInfo = createExprInfo(pPartNode->part.pTargets, NULL, &numOfCols);
1101

L
Liu Jicong 已提交
1102 1103
  setOperatorInfo(pOperator, "StreamPartitionOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
1104 1105
  pOperator->exprSupp.numOfExprs = numOfCols;
  pOperator->exprSupp.pExprInfo = pExprInfo;
L
Liu Jicong 已提交
1106 1107
  pOperator->fpSet =
      createOperatorFpSet(operatorDummyOpenFn, doStreamHashPartition, NULL, destroyStreamPartitionOperatorInfo, NULL);
1108 1109 1110 1111 1112

  initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup);
  code = appendDownstream(pOperator, &downstream, 1);
  return pOperator;

1113
_error:
1114
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
1115
  destroyStreamPartitionOperatorInfo(pInfo);
1116 1117 1118
  taosMemoryFreeClear(pOperator);
  return NULL;
}
H
Haojun Liao 已提交
1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151

SArray* extractColumnInfo(SNodeList* pNodeList) {
  size_t  numOfCols = LIST_LENGTH(pNodeList);
  SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn));
  if (pList == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  for (int32_t i = 0; i < numOfCols; ++i) {
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);

    if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN) {
      SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;

      SColumn c = extractColumnFromColumnNode(pColNode);
      taosArrayPush(pList, &c);
    } else if (nodeType(pNode->pExpr) == QUERY_NODE_VALUE) {
      SValueNode* pValNode = (SValueNode*)pNode->pExpr;
      SColumn     c = {0};
      c.slotId = pNode->slotId;
      c.colId = pNode->slotId;
      c.type = pValNode->node.type;
      c.bytes = pValNode->node.resType.bytes;
      c.scale = pValNode->node.resType.scale;
      c.precision = pValNode->node.resType.precision;

      taosArrayPush(pList, &c);
    }
  }

  return pList;
}