groupoperator.c 27.6 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "os.h"
#include "function.h"
#include "tname.h"

#include "tdatablock.h"
#include "tmsg.h"

#include "executorimpl.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"
wmmhello's avatar
wmmhello 已提交
27
#include "executorInt.h"
H
Haojun Liao 已提交
28

29
static void*    getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len);
H
Haojun Liao 已提交
30
static int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity);
31
static int32_t  setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t bytes,
32
                                        uint64_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup);
H
Haojun Liao 已提交
33

H
Haojun Liao 已提交
34 35 36 37 38
static void freeGroupKey(void* param) {
  SGroupKeys* pKey = (SGroupKeys*) param;
  taosMemoryFree(pKey->pData);
}

H
Haojun Liao 已提交
39
static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) {
H
Haojun Liao 已提交
40
  SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param;
41
  cleanupBasicInfo(&pInfo->binfo);
H
Haojun Liao 已提交
42 43
  taosMemoryFreeClear(pInfo->keyBuf);
  taosArrayDestroy(pInfo->pGroupCols);
H
Haojun Liao 已提交
44
  taosArrayDestroyEx(pInfo->pGroupColVals, freeGroupKey);
45
  cleanupExprSupp(&pInfo->scalarSup);
H
Haojun Liao 已提交
46 47 48

  cleanupGroupResInfo(&pInfo->groupResInfo);
  cleanupAggSup(&pInfo->aggSup);
D
dapan1121 已提交
49
  taosMemoryFreeClear(param);
H
Haojun Liao 已提交
50 51
}

wmmhello's avatar
wmmhello 已提交
52
static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) {
H
Haojun Liao 已提交
53 54
  *pGroupColVals = taosArrayInit(4, sizeof(SGroupKeys));
  if ((*pGroupColVals) == NULL) {
H
Haojun Liao 已提交
55 56 57 58 59 60
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  int32_t numOfGroupCols = taosArrayGetSize(pGroupColList);
  for (int32_t i = 0; i < numOfGroupCols; ++i) {
    SColumn* pCol = taosArrayGet(pGroupColList, i);
61
    (*keyLen) += pCol->bytes; // actual data + null_flag
H
Haojun Liao 已提交
62

63
    SGroupKeys key = {0};
H
Haojun Liao 已提交
64 65
    key.bytes  = pCol->bytes;
    key.type   = pCol->type;
H
Haojun Liao 已提交
66
    key.isNull = false;
H
Haojun Liao 已提交
67
    key.pData  = taosMemoryCalloc(1, pCol->bytes);
H
Haojun Liao 已提交
68 69 70 71
    if (key.pData == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

H
Haojun Liao 已提交
72
    taosArrayPush((*pGroupColVals), &key);
H
Haojun Liao 已提交
73 74 75
  }

  int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols;
76
  (*keyLen) += nullFlagSize;
H
Haojun Liao 已提交
77

78
  (*keyBuf) = taosMemoryCalloc(1, (*keyLen));
H
Haojun Liao 已提交
79
  if ((*keyBuf) == NULL) {
H
Haojun Liao 已提交
80 81 82 83 84 85
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
86
static bool groupKeyCompare(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols) {
H
Haojun Liao 已提交
87 88
  SColumnDataAgg* pColAgg = NULL;
  for (int32_t i = 0; i < numOfGroupCols; ++i) {
H
Haojun Liao 已提交
89
    SColumn*         pCol = taosArrayGet(pGroupCols, i);
H
Haojun Liao 已提交
90 91
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
    if (pBlock->pBlockAgg != NULL) {
92
      pColAgg = pBlock->pBlockAgg[pCol->slotId];  // TODO is agg data matched?
H
Haojun Liao 已提交
93 94 95 96
    }

    bool isNull = colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg);

H
Haojun Liao 已提交
97
    SGroupKeys* pkey = taosArrayGet(pGroupColVals, i);
H
Haojun Liao 已提交
98 99 100 101 102 103 104 105 106 107
    if (pkey->isNull && isNull) {
      continue;
    }

    if (isNull || pkey->isNull) {
      return false;
    }

    char* val = colDataGetData(pColInfoData, rowIndex);

wmmhello's avatar
wmmhello 已提交
108 109 110 111 112 113 114 115 116
    if (pkey->type == TSDB_DATA_TYPE_JSON) {
      int32_t dataLen = getJsonValueLen(val);

      if (memcmp(pkey->pData, val, dataLen) == 0){
        continue;
      } else {
        return false;
      }
    } else if (IS_VAR_DATA_TYPE(pkey->type)) {
H
Haojun Liao 已提交
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
      int32_t len = varDataLen(val);
      if (len == varDataLen(pkey->pData) && memcmp(varDataVal(pkey->pData), varDataVal(val), len) == 0) {
        continue;
      } else {
        return false;
      }
    } else {
      if (memcmp(pkey->pData, val, pkey->bytes) != 0) {
        return false;
      }
    }
  }

  return true;
}

wmmhello's avatar
wmmhello 已提交
133
static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex) {
H
Haojun Liao 已提交
134 135
  SColumnDataAgg* pColAgg = NULL;

136 137
  size_t numOfGroupCols = taosArrayGetSize(pGroupCols);

H
Haojun Liao 已提交
138
  for (int32_t i = 0; i < numOfGroupCols; ++i) {
H
Haojun Liao 已提交
139
    SColumn*         pCol = taosArrayGet(pGroupCols, i);
H
Haojun Liao 已提交
140 141 142
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);

    if (pBlock->pBlockAgg != NULL) {
143
      pColAgg = pBlock->pBlockAgg[pCol->slotId];  // TODO is agg data matched?
H
Haojun Liao 已提交
144 145
    }

H
Haojun Liao 已提交
146
    SGroupKeys* pkey = taosArrayGet(pGroupColVals, i);
H
Haojun Liao 已提交
147 148 149
    if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
      pkey->isNull = true;
    } else {
150
      pkey->isNull = false;
H
Haojun Liao 已提交
151
      char* val = colDataGetData(pColInfoData, rowIndex);
wmmhello's avatar
wmmhello 已提交
152
      if (pkey->type == TSDB_DATA_TYPE_JSON) {
wmmhello's avatar
wmmhello 已提交
153 154 155 156
        if(tTagIsJson(val)){
          terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
          return;
        }
wmmhello's avatar
wmmhello 已提交
157 158 159
        int32_t dataLen = getJsonValueLen(val);
        memcpy(pkey->pData, val, dataLen);
      } else if (IS_VAR_DATA_TYPE(pkey->type)) {
H
Haojun Liao 已提交
160
        memcpy(pkey->pData, val, varDataTLen(val));
161
        ASSERT(varDataTLen(val) <= pkey->bytes);
H
Haojun Liao 已提交
162 163 164 165 166 167 168
      } else {
        memcpy(pkey->pData, val, pkey->bytes);
      }
    }
  }
}

wmmhello's avatar
wmmhello 已提交
169
static int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) {
H
Haojun Liao 已提交
170 171 172 173 174 175 176 177 178 179 180 181 182
  ASSERT(pKey != NULL);
  size_t numOfGroupCols = taosArrayGetSize(pGroupColVals);

  char* isNull = (char*)pKey;
  char* pStart = (char*)pKey + sizeof(int8_t) * numOfGroupCols;
  for (int32_t i = 0; i < numOfGroupCols; ++i) {
    SGroupKeys* pkey = taosArrayGet(pGroupColVals, i);
    if (pkey->isNull) {
      isNull[i] = 1;
      continue;
    }

    isNull[i] = 0;
wmmhello's avatar
wmmhello 已提交
183 184 185 186 187
    if (pkey->type == TSDB_DATA_TYPE_JSON) {
      int32_t dataLen = getJsonValueLen(pkey->pData);
      memcpy(pStart, (pkey->pData), dataLen);
      pStart += dataLen;
    } else if (IS_VAR_DATA_TYPE(pkey->type)) {
H
Haojun Liao 已提交
188 189 190 191 192 193 194 195 196
      varDataCopy(pStart, pkey->pData);
      pStart += varDataTLen(pkey->pData);
      ASSERT(varDataTLen(pkey->pData) <= pkey->bytes);
    } else {
      memcpy(pStart, pkey->pData, pkey->bytes);
      pStart += pkey->bytes;
    }
  }

H
Haojun Liao 已提交
197
  return (int32_t) (pStart - (char*)pKey);
H
Haojun Liao 已提交
198 199 200 201 202
}

// assign the group keys or user input constant values if required
static void doAssignGroupKeys(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t totalRows, int32_t rowIndex) {
  for (int32_t i = 0; i < numOfOutput; ++i) {
203
    if (pCtx[i].functionId == -1) {       // select count(*),key from t group by key.
H
Haojun Liao 已提交
204 205 206
      SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[i]);

      SColumnInfoData* pColInfoData = pCtx[i].input.pData[0];
207
      // todo OPT all/all not NULL
H
Haojun Liao 已提交
208 209 210 211
      if (!colDataIsNull(pColInfoData, totalRows, rowIndex, NULL)) {
        char* dest = GET_ROWCELL_INTERBUF(pEntryInfo);
        char* data = colDataGetData(pColInfoData, rowIndex);

wmmhello's avatar
wmmhello 已提交
212 213 214 215
        if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
          int32_t dataLen = getJsonValueLen(data);
          memcpy(dest, data, dataLen);
        } else if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
216 217 218 219
          varDataCopy(dest, data);
        } else {
          memcpy(dest, data, pColInfoData->info.bytes);
        }
H
Haojun Liao 已提交
220 221
      } else { // it is a NULL value
        pEntryInfo->isNullRes = 1;
H
Haojun Liao 已提交
222
      }
H
Haojun Liao 已提交
223 224

      pEntryInfo->numOfRes = 1;
H
Haojun Liao 已提交
225 226 227 228 229 230 231 232
    }
  }
}

static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
  SExecTaskInfo*        pTaskInfo = pOperator->pTaskInfo;
  SGroupbyOperatorInfo* pInfo = pOperator->info;

233
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
H
Haojun Liao 已提交
234 235 236 237 238 239 240 241 242
  int32_t         numOfGroupCols = taosArrayGetSize(pInfo->pGroupCols);
  //  if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) {
  // qError("QInfo:0x%"PRIx64" group by not supported on double/float columns, abort", GET_TASKID(pRuntimeEnv));
  //    return;
  //  }

  int32_t     len = 0;
  STimeWindow w = TSWINDOW_INITIALIZER;

wmmhello's avatar
wmmhello 已提交
243
  terrno = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
244 245 246 247
  int32_t num = 0;
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
    // Compare with the previous row of this column, and do not set the output buffer again if they are identical.
    if (!pInfo->isInit) {
248
      recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
wmmhello's avatar
wmmhello 已提交
249 250 251
      if (terrno != TSDB_CODE_SUCCESS) {  // group by json error
        longjmp(pTaskInfo->env, terrno);
      }
H
Haojun Liao 已提交
252 253 254 255 256
      pInfo->isInit = true;
      num++;
      continue;
    }

H
Haojun Liao 已提交
257
    bool equal = groupKeyCompare(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols);
H
Haojun Liao 已提交
258 259 260 261 262
    if (equal) {
      num++;
      continue;
    }

H
Haojun Liao 已提交
263
    // The first row of a new block does not belongs to the previous existed group
264
    if (j == 0) {
H
Haojun Liao 已提交
265
      num++;
266
      recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
wmmhello's avatar
wmmhello 已提交
267 268 269
      if (terrno != TSDB_CODE_SUCCESS) {  // group by json error
        longjmp(pTaskInfo->env, terrno);
      }
H
Haojun Liao 已提交
270 271 272
      continue;
    }

H
Haojun Liao 已提交
273
    len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
274
    int32_t ret = setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf, len, pBlock->info.groupId, pInfo->aggSup.pResultBuf, &pInfo->aggSup);
H
Haojun Liao 已提交
275 276 277 278 279
    if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
      longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
    }

    int32_t rowIndex = j - num;
280
    doApplyFunctions(pTaskInfo, pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->exprSupp.numOfExprs, TSDB_ORDER_ASC);
H
Haojun Liao 已提交
281 282

    // assign the group keys or user input constant values if required
283
    doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex);
284
    recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
H
Haojun Liao 已提交
285 286 287 288
    num = 1;
  }

  if (num > 0) {
H
Haojun Liao 已提交
289
    len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
H
Haojun Liao 已提交
290
    int32_t ret =
291
        setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf, len,
292
                                pBlock->info.groupId, pInfo->aggSup.pResultBuf, &pInfo->aggSup);
H
Haojun Liao 已提交
293 294 295 296 297
    if (ret != TSDB_CODE_SUCCESS) {
      longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
    }

    int32_t rowIndex = pBlock->info.rows - num;
298 299
    doApplyFunctions(pTaskInfo, pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->exprSupp.numOfExprs, TSDB_ORDER_ASC);
    doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex);
H
Haojun Liao 已提交
300 301 302
  }
}

303 304 305 306 307 308
static SSDataBlock* buildGroupResultDataBlock(SOperatorInfo* pOperator) {
  SGroupbyOperatorInfo* pInfo = pOperator->info;

  SSDataBlock* pRes = pInfo->binfo.pRes;
  while(1) {
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
309
    doFilter(pInfo->pCondition, pRes, NULL);
310

311
    if (!hasRemainResults(&pInfo->groupResInfo)) {
312 313 314 315 316 317 318 319 320 321 322 323 324
      doSetOperatorCompleted(pOperator);
      break;
    }

    if (pRes->info.rows > 0) {
      break;
    }
  }

  pOperator->resultInfo.totalRows += pRes->info.rows;
  return (pRes->info.rows == 0)? NULL:pRes;
}

325
static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
326 327 328 329
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

330 331
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

H
Haojun Liao 已提交
332
  SGroupbyOperatorInfo* pInfo = pOperator->info;
333 334
  SSDataBlock* pRes = pInfo->binfo.pRes;

H
Haojun Liao 已提交
335
  if (pOperator->status == OP_RES_TO_RETURN) {
336
    return buildGroupResultDataBlock(pOperator);
H
Haojun Liao 已提交
337 338
  }

339 340 341
  int32_t order = TSDB_ORDER_ASC;
  int32_t scanFlag = MAIN_SCAN;

342
  int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
343 344 345
  SOperatorInfo* downstream = pOperator->pDownstream[0];

  while (1) {
346
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
H
Haojun Liao 已提交
347 348 349 350
    if (pBlock == NULL) {
      break;
    }

351 352 353 354 355
    int32_t code = getTableScanInfo(pOperator, &order, &scanFlag);
    if (code != TSDB_CODE_SUCCESS) {
      longjmp(pTaskInfo->env, code);
    }

H
Haojun Liao 已提交
356
    // the pDataBlock are always the same one, no need to call this again
357
    setInputDataBlock(pOperator, pOperator->exprSupp.pCtx, pBlock, order, scanFlag, true);
358

359
    // there is an scalar expression that needs to be calculated right before apply the group aggregation.
360 361
    if (pInfo->scalarSup.pExprInfo != NULL) {
      pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx, pInfo->scalarSup.numOfExprs, NULL);
362 363 364
      if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
        longjmp(pTaskInfo->env, pTaskInfo->code);
      }
365 366
    }

H
Haojun Liao 已提交
367 368 369 370
    doHashGroupbyAgg(pOperator, pBlock);
  }

  pOperator->status = OP_RES_TO_RETURN;
H
Haojun Liao 已提交
371

372 373 374 375 376 377 378 379 380 381 382 383 384 385
#if 0
  if(pOperator->fpSet.encodeResultRow){
    char *result = NULL;
    int32_t length = 0;
    pOperator->fpSet.encodeResultRow(pOperator, &result, &length);
    SAggSupporter* pSup = &pInfo->aggSup;
    taosHashClear(pSup->pResultRowHashTable);
    pInfo->binfo.resultRowInfo.size = 0;
    pOperator->fpSet.decodeResultRow(pOperator, result);
    if(result){
      taosMemoryFree(result);
    }
  }
#endif
386
  blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
387
  initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, 0);
388

389
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
390
  return buildGroupResultDataBlock(pOperator);
H
Haojun Liao 已提交
391 392
}

H
Haojun Liao 已提交
393 394 395
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
                                       SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition,
                                       SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
396 397 398 399 400 401
  SGroupbyOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupbyOperatorInfo));
  SOperatorInfo*        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

402 403 404
  pInfo->pGroupCols      = pGroupColList;
  pInfo->pCondition      = pCondition;

405 406 407 408
  int32_t code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
409

410
  code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList);
H
Haojun Liao 已提交
411 412 413 414
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

415
  initResultSizeInfo(&pOperator->resultInfo, 4096);
416 417
  initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, pInfo->groupKeyLen, pTaskInfo->id.str);
  initBasicInfo(&pInfo->binfo, pResultBlock);
418
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
419

H
Haojun Liao 已提交
420
  pOperator->name         = "GroupbyAggOperator";
421
  pOperator->blocking     = true;
H
Haojun Liao 已提交
422
  pOperator->status       = OP_NOT_OPENED;
423
  // pOperator->operatorType = OP_Groupby;
H
Haojun Liao 已提交
424
  pOperator->info         = pInfo;
425
  pOperator->pTaskInfo    = pTaskInfo;
H
Haojun Liao 已提交
426

427
  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashGroupbyAggregate, NULL, NULL, destroyGroupOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
H
Haojun Liao 已提交
428 429 430 431
  code = appendDownstream(pOperator, &downstream, 1);
  return pOperator;

  _error:
H
Haojun Liao 已提交
432
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
433 434 435
  taosMemoryFreeClear(pInfo);
  taosMemoryFreeClear(pOperator);
  return NULL;
436 437
}

H
Haojun Liao 已提交
438
static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
H
Haojun Liao 已提交
439
//  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
H
Haojun Liao 已提交
440 441 442 443

  SPartitionOperatorInfo* pInfo = pOperator->info;

  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
444
    recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
H
Haojun Liao 已提交
445 446
    int32_t len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);

447 448
    SDataGroupInfo* pGroupInfo = NULL;
    void *pPage = getCurrentDataGroupInfo(pInfo, &pGroupInfo, len);
H
Haojun Liao 已提交
449

450 451 452 453 454
    pGroupInfo->numOfRows += 1;

    // group id
    if (pGroupInfo->groupId == 0) {
      pGroupInfo->groupId = calcGroupId(pInfo->keyBuf, len);
H
Haojun Liao 已提交
455 456
    }

457
    // number of rows
H
Haojun Liao 已提交
458
    int32_t* rows = (int32_t*) pPage;
H
Haojun Liao 已提交
459

460
    size_t numOfCols = pOperator->exprSupp.numOfExprs;
H
Haojun Liao 已提交
461
    for(int32_t i = 0; i < numOfCols; ++i) {
462
      SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i];
463 464 465
      int32_t slotId = pExpr->base.pParam[0].pCol->slotId;

      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
H
Haojun Liao 已提交
466

H
Haojun Liao 已提交
467 468
      int32_t bytes = pColInfoData->info.bytes;
      int32_t startOffset = pInfo->columnOffset[i];
H
Haojun Liao 已提交
469

470 471
      int32_t* columnLen  = NULL;
      int32_t  contentLen = 0;
H
Haojun Liao 已提交
472 473

      if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
wafwerar's avatar
wafwerar 已提交
474
        int32_t* offset = (int32_t*)((char*)pPage + startOffset);
475 476
        columnLen       = (int32_t*) ((char*)pPage + startOffset + sizeof(int32_t) * pInfo->rowCapacity);
        char*    data   = (char*)((char*) columnLen + sizeof(int32_t));
H
Haojun Liao 已提交
477 478 479 480

        if (colDataIsNull_s(pColInfoData, j)) {
          offset[(*rows)] = -1;
          contentLen = 0;
wmmhello's avatar
wmmhello 已提交
481 482 483 484 485 486 487 488 489 490
        } else if(pColInfoData->info.type == TSDB_DATA_TYPE_JSON){
          offset[*rows] = (*columnLen);
          char* src = colDataGetData(pColInfoData, j);
          int32_t dataLen = getJsonValueLen(src);

          memcpy(data + (*columnLen), src, dataLen);
          int32_t v = (data + (*columnLen) + dataLen - (char*)pPage);
          ASSERT(v > 0);

          contentLen = dataLen;
H
Haojun Liao 已提交
491 492 493 494
        } else {
          offset[*rows] = (*columnLen);
          char* src = colDataGetData(pColInfoData, j);
          memcpy(data + (*columnLen), src, varDataTLen(src));
495 496 497
          int32_t v = (data + (*columnLen) + varDataTLen(src) - (char*)pPage);
          ASSERT(v > 0);

H
Haojun Liao 已提交
498 499
          contentLen = varDataTLen(src);
        }
H
Haojun Liao 已提交
500
      } else {
wafwerar's avatar
wafwerar 已提交
501
        char* bitmap = (char*)pPage + startOffset;
502
        columnLen    = (int32_t*) ((char*)pPage + startOffset + BitmapLen(pInfo->rowCapacity));
H
Haojun Liao 已提交
503
        char* data   = (char*) columnLen + sizeof(int32_t);
H
Haojun Liao 已提交
504 505 506

        bool isNull = colDataIsNull_f(pColInfoData->nullbitmap, j);
        if (isNull) {
H
Haojun Liao 已提交
507
          colDataSetNull_f(bitmap, (*rows));
H
Haojun Liao 已提交
508
        } else {
H
Haojun Liao 已提交
509
          memcpy(data + (*columnLen), colDataGetData(pColInfoData, j), bytes);
H
Haojun Liao 已提交
510
          ASSERT((data + (*columnLen) + bytes - (char*)pPage) <= getBufPageSize(pInfo->pBuf));
H
Haojun Liao 已提交
511
        }
H
Haojun Liao 已提交
512
        contentLen = bytes;
H
Haojun Liao 已提交
513
      }
H
Haojun Liao 已提交
514 515

      (*columnLen) += contentLen;
516
      ASSERT(*columnLen >= 0);
H
Haojun Liao 已提交
517 518
    }

H
Haojun Liao 已提交
519 520
    (*rows) += 1;

H
Haojun Liao 已提交
521 522 523
    setBufPageDirty(pPage, true);
    releaseBufPage(pInfo->pBuf, pPage);
  }
H
Haojun Liao 已提交
524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547
}

void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len) {
  SDataGroupInfo* p = taosHashGet(pInfo->pGroupSet, pInfo->keyBuf, len);

  void* pPage = NULL;
  if (p == NULL) { // it is a new group
    SDataGroupInfo gi = {0};
    gi.pPageList = taosArrayInit(100, sizeof(int32_t));
    taosHashPut(pInfo->pGroupSet, pInfo->keyBuf, len, &gi, sizeof(SDataGroupInfo));

    p = taosHashGet(pInfo->pGroupSet, pInfo->keyBuf, len);

    int32_t pageId = 0;
    pPage = getNewBufPage(pInfo->pBuf, 0, &pageId);
    taosArrayPush(p->pPageList, &pageId);

    *(int32_t *) pPage = 0;
  } else {
    int32_t* curId = taosArrayGetLast(p->pPageList);
    pPage = getBufPage(pInfo->pBuf, *curId);

    int32_t *rows = (int32_t*) pPage;
    if (*rows >= pInfo->rowCapacity) {
548 549 550
      // release buffer
      releaseBufPage(pInfo->pBuf, pPage);

H
Haojun Liao 已提交
551 552 553 554
      // add a new page for current group
      int32_t pageId = 0;
      pPage = getNewBufPage(pInfo->pBuf, 0, &pageId);
      taosArrayPush(p->pPageList, &pageId);
555
      memset(pPage, 0, getBufPageSize(pInfo->pBuf));
H
Haojun Liao 已提交
556 557
    }
  }
H
Haojun Liao 已提交
558

H
Haojun Liao 已提交
559 560 561 562 563 564 565 566 567 568 569 570 571 572
  *pGroupInfo = p;
  return pPage;
}

uint64_t calcGroupId(char* pData, int32_t len) {
  T_MD5_CTX context;
  tMD5Init(&context);
  tMD5Update(&context, (uint8_t*)pData, len);
  tMD5Final(&context);

  // NOTE: only extract the initial 8 bytes of the final MD5 digest
  uint64_t id = 0;
  memcpy(&id, context.digest, sizeof(uint64_t));
  return id;
H
Haojun Liao 已提交
573 574
}

H
Haojun Liao 已提交
575
int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity) {
576 577
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
  int32_t* offset = taosMemoryCalloc(numOfCols, sizeof(int32_t));
H
Haojun Liao 已提交
578

579
  offset[0] = sizeof(int32_t) + sizeof(uint64_t);  // the number of rows in current page, ref to SSDataBlock paged serialization format
H
Haojun Liao 已提交
580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598

  for(int32_t i = 0; i < numOfCols - 1; ++i) {
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);

    int32_t bytes = pColInfoData->info.bytes;
    int32_t payloadLen = bytes * rowCapacity;
    
    if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
      // offset segment + content length + payload
      offset[i + 1] = rowCapacity * sizeof(int32_t) + sizeof(int32_t) + payloadLen + offset[i];
    } else {
      // bitmap + content length + payload
      offset[i + 1] = BitmapLen(rowCapacity) + sizeof(int32_t) + payloadLen + offset[i];
    }
  }

  return offset;
}

5
54liuyao 已提交
599 600 601 602 603
static void clearPartitionOperator(SPartitionOperatorInfo* pInfo) {
  void *ite = NULL;
  while( (ite = taosHashIterate(pInfo->pGroupSet, ite)) != NULL ) {
    taosArrayDestroy( ((SDataGroupInfo *)ite)->pPageList);
  }
604
  taosArrayClear(pInfo->sortedGroupArray);
5
54liuyao 已提交
605 606 607
  clearDiskbasedBuf(pInfo->pBuf);
}

608 609 610
static int compareDataGroupInfo(const void* group1, const void* group2) {
  const SDataGroupInfo* pGroupInfo1 = group1;
  const SDataGroupInfo* pGroupInfo2 = group2;
611 612 613 614 615 616 617

  if (pGroupInfo1->groupId == pGroupInfo2->groupId) {
    ASSERT(0);
    return 0;
  }

  return (pGroupInfo1->groupId < pGroupInfo2->groupId)? -1:1;
618 619
}

H
Haojun Liao 已提交
620 621 622
static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
  SPartitionOperatorInfo* pInfo = pOperator->info;

623 624
  SDataGroupInfo* pGroupInfo = (pInfo->groupIndex != -1) ? taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex) : NULL;
  if (pInfo->groupIndex == -1 || pInfo->pageIndex >= taosArrayGetSize(pGroupInfo->pPageList)) {
H
Haojun Liao 已提交
625
    // try next group data
626 627
    ++pInfo->groupIndex;
    if (pInfo->groupIndex >= taosArrayGetSize(pInfo->sortedGroupArray)) {
628
      doSetOperatorCompleted(pOperator);
5
54liuyao 已提交
629
      clearPartitionOperator(pInfo);
H
Haojun Liao 已提交
630 631 632
      return NULL;
    }

633
    pGroupInfo = taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex);
H
Haojun Liao 已提交
634 635 636 637 638 639
    pInfo->pageIndex = 0;
  }

  int32_t* pageId = taosArrayGet(pGroupInfo->pPageList, pInfo->pageIndex);
  void* page = getBufPage(pInfo->pBuf, *pageId);

640
  blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->rowCapacity);
H
Haojun Liao 已提交
641
  blockDataFromBuf1(pInfo->binfo.pRes, page, pInfo->rowCapacity);
H
Haojun Liao 已提交
642 643

  pInfo->pageIndex += 1;
644
  releaseBufPage(pInfo->pBuf, page);
H
Haojun Liao 已提交
645

646
  blockDataUpdateTsWindow(pInfo->binfo.pRes, 0);
H
Haojun Liao 已提交
647
  pInfo->binfo.pRes->info.groupId = pGroupInfo->groupId;
648 649

  pOperator->resultInfo.totalRows += pInfo->binfo.pRes->info.rows;
H
Haojun Liao 已提交
650 651 652
  return pInfo->binfo.pRes;
}

653
static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
654 655
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
656 657
  }

658 659 660
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

  SPartitionOperatorInfo* pInfo = pOperator->info;
H
Haojun Liao 已提交
661
  SSDataBlock* pRes = pInfo->binfo.pRes;
662

H
Haojun Liao 已提交
663
  if (pOperator->status == OP_RES_TO_RETURN) {
H
Haojun Liao 已提交
664 665
    blockDataCleanup(pRes);
    return buildPartitionResult(pOperator);
H
Haojun Liao 已提交
666 667
  }

668
  int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
669
  SOperatorInfo* downstream = pOperator->pDownstream[0];
H
Haojun Liao 已提交
670

H
Haojun Liao 已提交
671
  while (1) {
672
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
H
Haojun Liao 已提交
673 674 675
    if (pBlock == NULL) {
      break;
    }
H
Haojun Liao 已提交
676

677
    // there is an scalar expression that needs to be calculated right before apply the group aggregation.
678 679
    if (pInfo->scalarSup.pExprInfo != NULL) {
      pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx, pInfo->scalarSup.numOfExprs, NULL);
680 681 682 683 684
      if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
        longjmp(pTaskInfo->env, pTaskInfo->code);
      }
    }

wmmhello's avatar
wmmhello 已提交
685
    terrno = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
686
    doHashPartition(pOperator, pBlock);
wmmhello's avatar
wmmhello 已提交
687 688 689
    if (terrno != TSDB_CODE_SUCCESS) {  // group by json error
      longjmp(pTaskInfo->env, terrno);
    }
H
Haojun Liao 已提交
690 691
  }

692 693 694 695 696 697 698 699 700 701 702 703 704 705
  SArray* groupArray = taosArrayInit(taosHashGetSize(pInfo->pGroupSet), sizeof(SDataGroupInfo));
  void* pGroupIter = NULL;
  pGroupIter = taosHashIterate(pInfo->pGroupSet, NULL);
  while (pGroupIter != NULL) {
    SDataGroupInfo* pGroupInfo = pGroupIter;
    taosArrayPush(groupArray, pGroupInfo);
    pGroupIter = taosHashIterate(pInfo->pGroupSet, pGroupIter);
  }

  taosArraySort(groupArray, compareDataGroupInfo);
  pInfo->sortedGroupArray = groupArray;
  pInfo->groupIndex = -1;
  taosHashClear(pInfo->pGroupSet);

706 707
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;

H
Haojun Liao 已提交
708
  pOperator->status = OP_RES_TO_RETURN;
H
Haojun Liao 已提交
709 710 711 712 713
  blockDataEnsureCapacity(pRes, 4096);
  return buildPartitionResult(pOperator);
}

static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) {
H
Haojun Liao 已提交
714
  SPartitionOperatorInfo* pInfo = (SPartitionOperatorInfo*)param;
715
  cleanupBasicInfo(&pInfo->binfo);
H
Haojun Liao 已提交
716
  taosArrayDestroy(pInfo->pGroupCols);
717

wmmhello's avatar
wmmhello 已提交
718 719 720 721
  for(int i = 0; i < taosArrayGetSize(pInfo->pGroupColVals); i++){
    SGroupKeys key = *(SGroupKeys*)taosArrayGet(pInfo->pGroupColVals, i);
    taosMemoryFree(key.pData);
  }
722

H
Haojun Liao 已提交
723
  taosArrayDestroy(pInfo->pGroupColVals);
H
Haojun Liao 已提交
724
  taosMemoryFree(pInfo->keyBuf);
725
  taosArrayDestroy(pInfo->sortedGroupArray);
wmmhello's avatar
wmmhello 已提交
726
  taosHashCleanup(pInfo->pGroupSet);
H
Haojun Liao 已提交
727
  taosMemoryFree(pInfo->columnOffset);
728

729
  cleanupExprSupp(&pInfo->scalarSup);
D
dapan1121 已提交
730 731
  
  taosMemoryFreeClear(param);
H
Haojun Liao 已提交
732 733
}

734
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
735 736
  SPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SPartitionOperatorInfo));
  SOperatorInfo*        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
737
  if (pInfo == NULL || pOperator == NULL) {
H
Haojun Liao 已提交
738 739
    goto _error;
  }
740

741 742 743 744 745 746 747 748
  SSDataBlock* pResBlock = createResDataBlock(pPartNode->node.pOutputDataBlockDesc);

  int32_t numOfCols = 0;
  SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &numOfCols);

  pInfo->pGroupCols = extractPartitionColInfo(pPartNode->pPartitionKeys);

  if (pPartNode->pExprs != NULL) {
749 750 751 752 753 754
    int32_t num = 0;
    SExprInfo* pExprInfo1 = createExprInfo(pPartNode->pExprs, NULL, &num);
    int32_t code = initExprSupp(&pInfo->scalarSup, pExprInfo1, num);
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }
755
  }
H
Haojun Liao 已提交
756 757 758 759 760 761 762

  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
  pInfo->pGroupSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK);
  if (pInfo->pGroupSet == NULL) {
    goto _error;
  }

763 764
  uint32_t defaultPgsz  = 0;
  uint32_t defaultBufsz = 0;
765
  getBufferPgSize(pResBlock->info.rowSize, &defaultPgsz, &defaultBufsz);
766 767

  int32_t code = createDiskbasedBuf(&pInfo->pBuf, defaultPgsz, defaultBufsz, pTaskInfo->id.str, TD_TMP_DIR_PATH);
H
Haojun Liao 已提交
768 769 770 771
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

772 773 774
  pInfo->rowCapacity = blockDataGetCapacityInRow(pResBlock, getBufPageSize(pInfo->pBuf));
  pInfo->columnOffset = setupColumnOffset(pResBlock, pInfo->rowCapacity);
  code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pInfo->pGroupCols);
H
Haojun Liao 已提交
775 776 777
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
H
Haojun Liao 已提交
778 779

  pOperator->name         = "PartitionOperator";
780
  pOperator->blocking     = true;
781
  pOperator->status       = OP_NOT_OPENED;
H
Haojun Liao 已提交
782
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION;
783
  pInfo->binfo.pRes       = pResBlock;
784 785
  pOperator->exprSupp.numOfExprs   = numOfCols;
  pOperator->exprSupp.pExprInfo    = pExprInfo;
H
Haojun Liao 已提交
786
  pOperator->info         = pInfo;
787
  pOperator->pTaskInfo    = pTaskInfo;
788 789 790

  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashPartition, NULL, NULL, destroyPartitionOperatorInfo,
                                         NULL, NULL, NULL);
H
Haojun Liao 已提交
791

H
Haojun Liao 已提交
792
  code = appendDownstream(pOperator, &downstream, 1);
793 794 795
  return pOperator;

  _error:
H
Haojun Liao 已提交
796
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
797 798
  taosMemoryFreeClear(pInfo);
  taosMemoryFreeClear(pOperator);
799
  return NULL;
800 801
}

802
int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t bytes,
803
                                uint64_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup) {
804
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
805
  SResultRowInfo* pResultRowInfo = &binfo->resultRowInfo;
806
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
807 808 809 810 811

  SResultRow* pResultRow =
      doSetResultOutBufByKey(pBuf, pResultRowInfo, (char*)pData, bytes, true, groupId, pTaskInfo, false, pAggSup);
  assert(pResultRow != NULL);

812
  setResultRowInitCtx(pResultRow, pCtx, numOfCols, pOperator->exprSupp.rowEntryInfoOffset);
813
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
814
}