groupoperator.c 22.8 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "os.h"
#include "function.h"
#include "tname.h"

#include "tdatablock.h"
#include "tmsg.h"

#include "executorimpl.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"

H
Haojun Liao 已提交
28
static int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity);
H
Haojun Liao 已提交
29 30
static void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len);
static uint64_t calcGroupId(char* pData, int32_t len);
H
Haojun Liao 已提交
31

H
Haojun Liao 已提交
32
static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) {
H
Haojun Liao 已提交
33 34 35 36 37 38 39
  SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param;
  doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
  taosMemoryFreeClear(pInfo->keyBuf);
  taosArrayDestroy(pInfo->pGroupCols);
  taosArrayDestroy(pInfo->pGroupColVals);
}

H
Haojun Liao 已提交
40 41 42
static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) {
  *pGroupColVals = taosArrayInit(4, sizeof(SGroupKeys));
  if ((*pGroupColVals) == NULL) {
H
Haojun Liao 已提交
43 44 45 46 47 48
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  int32_t numOfGroupCols = taosArrayGetSize(pGroupColList);
  for (int32_t i = 0; i < numOfGroupCols; ++i) {
    SColumn* pCol = taosArrayGet(pGroupColList, i);
49
    (*keyLen) += pCol->bytes; // actual data + null_flag
H
Haojun Liao 已提交
50

51
    SGroupKeys key = {0};
H
Haojun Liao 已提交
52 53
    key.bytes  = pCol->bytes;
    key.type   = pCol->type;
H
Haojun Liao 已提交
54
    key.isNull = false;
H
Haojun Liao 已提交
55
    key.pData  = taosMemoryCalloc(1, pCol->bytes);
H
Haojun Liao 已提交
56 57 58 59
    if (key.pData == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

H
Haojun Liao 已提交
60
    taosArrayPush((*pGroupColVals), &key);
H
Haojun Liao 已提交
61 62 63
  }

  int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols;
64
  (*keyLen) += nullFlagSize;
H
Haojun Liao 已提交
65

66
  (*keyBuf) = taosMemoryCalloc(1, (*keyLen));
H
Haojun Liao 已提交
67
  if ((*keyBuf) == NULL) {
H
Haojun Liao 已提交
68 69 70 71 72 73
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
74
static bool groupKeyCompare(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols) {
H
Haojun Liao 已提交
75 76
  SColumnDataAgg* pColAgg = NULL;
  for (int32_t i = 0; i < numOfGroupCols; ++i) {
H
Haojun Liao 已提交
77
    SColumn*         pCol = taosArrayGet(pGroupCols, i);
H
Haojun Liao 已提交
78 79
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
    if (pBlock->pBlockAgg != NULL) {
80
      pColAgg = pBlock->pBlockAgg[pCol->slotId];  // TODO is agg data matched?
H
Haojun Liao 已提交
81 82 83 84
    }

    bool isNull = colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg);

H
Haojun Liao 已提交
85
    SGroupKeys* pkey = taosArrayGet(pGroupColVals, i);
H
Haojun Liao 已提交
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
    if (pkey->isNull && isNull) {
      continue;
    }

    if (isNull || pkey->isNull) {
      return false;
    }

    char* val = colDataGetData(pColInfoData, rowIndex);

    if (IS_VAR_DATA_TYPE(pkey->type)) {
      int32_t len = varDataLen(val);
      if (len == varDataLen(pkey->pData) && memcmp(varDataVal(pkey->pData), varDataVal(val), len) == 0) {
        continue;
      } else {
        return false;
      }
    } else {
      if (memcmp(pkey->pData, val, pkey->bytes) != 0) {
        return false;
      }
    }
  }

  return true;
}

113
static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex) {
H
Haojun Liao 已提交
114 115
  SColumnDataAgg* pColAgg = NULL;

116 117
  size_t numOfGroupCols = taosArrayGetSize(pGroupCols);

H
Haojun Liao 已提交
118
  for (int32_t i = 0; i < numOfGroupCols; ++i) {
H
Haojun Liao 已提交
119
    SColumn*         pCol = taosArrayGet(pGroupCols, i);
H
Haojun Liao 已提交
120 121 122
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);

    if (pBlock->pBlockAgg != NULL) {
123
      pColAgg = pBlock->pBlockAgg[pCol->slotId];  // TODO is agg data matched?
H
Haojun Liao 已提交
124 125
    }

H
Haojun Liao 已提交
126
    SGroupKeys* pkey = taosArrayGet(pGroupColVals, i);
H
Haojun Liao 已提交
127 128 129
    if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
      pkey->isNull = true;
    } else {
130
      pkey->isNull = false;
H
Haojun Liao 已提交
131 132 133
      char* val = colDataGetData(pColInfoData, rowIndex);
      if (IS_VAR_DATA_TYPE(pkey->type)) {
        memcpy(pkey->pData, val, varDataTLen(val));
134
        ASSERT(varDataTLen(val) <= pkey->bytes);
H
Haojun Liao 已提交
135 136 137 138 139 140 141
      } else {
        memcpy(pkey->pData, val, pkey->bytes);
      }
    }
  }
}

H
Haojun Liao 已提交
142
static int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) {
H
Haojun Liao 已提交
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
  ASSERT(pKey != NULL);
  size_t numOfGroupCols = taosArrayGetSize(pGroupColVals);

  char* isNull = (char*)pKey;
  char* pStart = (char*)pKey + sizeof(int8_t) * numOfGroupCols;
  for (int32_t i = 0; i < numOfGroupCols; ++i) {
    SGroupKeys* pkey = taosArrayGet(pGroupColVals, i);
    if (pkey->isNull) {
      isNull[i] = 1;
      continue;
    }

    isNull[i] = 0;
    if (IS_VAR_DATA_TYPE(pkey->type)) {
      varDataCopy(pStart, pkey->pData);
      pStart += varDataTLen(pkey->pData);
      ASSERT(varDataTLen(pkey->pData) <= pkey->bytes);
    } else {
      memcpy(pStart, pkey->pData, pkey->bytes);
      pStart += pkey->bytes;
    }
  }

H
Haojun Liao 已提交
166
  return (int32_t) (pStart - (char*)pKey);
H
Haojun Liao 已提交
167 168 169 170 171
}

// assign the group keys or user input constant values if required
static void doAssignGroupKeys(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t totalRows, int32_t rowIndex) {
  for (int32_t i = 0; i < numOfOutput; ++i) {
172
    if (pCtx[i].functionId == -1) {       // select count(*),key from t group by key.
H
Haojun Liao 已提交
173 174 175
      SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[i]);

      SColumnInfoData* pColInfoData = pCtx[i].input.pData[0];
176
      // todo OPT all/all not NULL
H
Haojun Liao 已提交
177 178 179 180
      if (!colDataIsNull(pColInfoData, totalRows, rowIndex, NULL)) {
        char* dest = GET_ROWCELL_INTERBUF(pEntryInfo);
        char* data = colDataGetData(pColInfoData, rowIndex);

181 182 183 184 185
        if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
          varDataCopy(dest, data);
        } else {
          memcpy(dest, data, pColInfoData->info.bytes);
        }
H
Haojun Liao 已提交
186 187
      } else { // it is a NULL value
        pEntryInfo->isNullRes = 1;
H
Haojun Liao 已提交
188
      }
H
Haojun Liao 已提交
189 190

      pEntryInfo->numOfRes = 1;
H
Haojun Liao 已提交
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212
    }
  }
}

static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
  SExecTaskInfo*        pTaskInfo = pOperator->pTaskInfo;
  SGroupbyOperatorInfo* pInfo = pOperator->info;

  SqlFunctionCtx* pCtx = pInfo->binfo.pCtx;
  int32_t         numOfGroupCols = taosArrayGetSize(pInfo->pGroupCols);
  //  if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) {
  // qError("QInfo:0x%"PRIx64" group by not supported on double/float columns, abort", GET_TASKID(pRuntimeEnv));
  //    return;
  //  }

  int32_t     len = 0;
  STimeWindow w = TSWINDOW_INITIALIZER;

  int32_t num = 0;
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
    // Compare with the previous row of this column, and do not set the output buffer again if they are identical.
    if (!pInfo->isInit) {
213
      recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
H
Haojun Liao 已提交
214 215 216 217 218
      pInfo->isInit = true;
      num++;
      continue;
    }

H
Haojun Liao 已提交
219
    bool equal = groupKeyCompare(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols);
H
Haojun Liao 已提交
220 221 222 223 224
    if (equal) {
      num++;
      continue;
    }

H
Haojun Liao 已提交
225
    // The first row of a new block does not belongs to the previous existed group
226
    if (j == 0) {
H
Haojun Liao 已提交
227
      num++;
228
      recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
H
Haojun Liao 已提交
229 230 231
      continue;
    }

H
Haojun Liao 已提交
232
    len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
233
    int32_t ret = setGroupResultOutputBuf(&(pInfo->binfo), pOperator->numOfExprs, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
H
Haojun Liao 已提交
234 235 236 237 238
    if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
      longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
    }

    int32_t rowIndex = j - num;
239
    doApplyFunctions(pTaskInfo, pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfExprs, TSDB_ORDER_ASC);
H
Haojun Liao 已提交
240 241

    // assign the group keys or user input constant values if required
242
    doAssignGroupKeys(pCtx, pOperator->numOfExprs, pBlock->info.rows, rowIndex);
243
    recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
H
Haojun Liao 已提交
244 245 246 247
    num = 1;
  }

  if (num > 0) {
H
Haojun Liao 已提交
248
    len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
H
Haojun Liao 已提交
249
    int32_t ret =
250
        setGroupResultOutputBuf(&(pInfo->binfo), pOperator->numOfExprs, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len,
H
Haojun Liao 已提交
251 252 253 254 255 256
                                   0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
    if (ret != TSDB_CODE_SUCCESS) {
      longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
    }

    int32_t rowIndex = pBlock->info.rows - num;
257
    doApplyFunctions(pTaskInfo, pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfExprs, TSDB_ORDER_ASC);
258
    doAssignGroupKeys(pCtx, pOperator->numOfExprs, pBlock->info.rows, rowIndex);
H
Haojun Liao 已提交
259 260 261
  }
}

262
static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
263 264 265 266
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

267 268
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

H
Haojun Liao 已提交
269
  SGroupbyOperatorInfo* pInfo = pOperator->info;
270 271
  SSDataBlock* pRes = pInfo->binfo.pRes;

H
Haojun Liao 已提交
272
  if (pOperator->status == OP_RES_TO_RETURN) {
273
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
274 275 276

    size_t rows = pRes->info.rows;
    if (rows == 0 || !hashRemainDataInGroupInfo(&pInfo->groupResInfo)) {
277
      doSetOperatorCompleted(pOperator);
H
Haojun Liao 已提交
278
    }
279 280

    pOperator->resultInfo.totalRows += rows;
H
Haojun Liao 已提交
281
    return (pRes->info.rows == 0)? NULL:pRes;
H
Haojun Liao 已提交
282 283
  }

284 285 286
  int32_t order = TSDB_ORDER_ASC;
  int32_t scanFlag = MAIN_SCAN;

287
  int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
288 289 290
  SOperatorInfo* downstream = pOperator->pDownstream[0];

  while (1) {
291
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
H
Haojun Liao 已提交
292 293 294 295
    if (pBlock == NULL) {
      break;
    }

296 297 298 299 300
    int32_t code = getTableScanInfo(pOperator, &order, &scanFlag);
    if (code != TSDB_CODE_SUCCESS) {
      longjmp(pTaskInfo->env, code);
    }

H
Haojun Liao 已提交
301
    // the pDataBlock are always the same one, no need to call this again
302
    setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, scanFlag, true);
303

304
    // there is an scalar expression that needs to be calculated right before apply the group aggregation.
305
    if (pInfo->pScalarExprInfo != NULL) {
306 307 308 309
      pTaskInfo->code = projectApplyFunctions(pInfo->pScalarExprInfo, pBlock, pBlock, pInfo->pScalarFuncCtx, pInfo->numOfScalarExpr, NULL);
      if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
        longjmp(pTaskInfo->env, pTaskInfo->code);
      }
310 311
    }

H
Haojun Liao 已提交
312 313 314 315 316 317
    doHashGroupbyAgg(pOperator, pBlock);
  }

  pOperator->status = OP_RES_TO_RETURN;
  closeAllResultRows(&pInfo->binfo.resultRowInfo);
  //  if (!stableQuery) { // finalize include the update of result rows
318
  //    finalizeQueryResult(pInfo->binfo.pCtx, pOperator->numOfExprs);
H
Haojun Liao 已提交
319
  //  } else {
320
  //    updateNumOfRowsInResultRows(pInfo->binfo.pCtx, pOperator->numOfExprs, &pInfo->binfo.resultRowInfo,
H
Haojun Liao 已提交
321 322
  //    pInfo->binfo.rowCellInfoOffset);
  //  }
323 324 325 326 327 328 329 330 331 332 333 334 335 336
#if 0
  if(pOperator->fpSet.encodeResultRow){
    char *result = NULL;
    int32_t length = 0;
    pOperator->fpSet.encodeResultRow(pOperator, &result, &length);
    SAggSupporter* pSup = &pInfo->aggSup;
    taosHashClear(pSup->pResultRowHashTable);
    pInfo->binfo.resultRowInfo.size = 0;
    pOperator->fpSet.decodeResultRow(pOperator, result);
    if(result){
      taosMemoryFree(result);
    }
  }
#endif
337
  blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
338
  initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, 0);
339

340 341
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;

342
  while(1) {
343
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
344
    doFilter(pInfo->pCondition, pRes, NULL);
345

346
    bool hasRemain = hashRemainDataInGroupInfo(&pInfo->groupResInfo);
347
    if (!hasRemain) {
348
      doSetOperatorCompleted(pOperator);
349 350 351 352 353 354
      break;
    }

    if (pRes->info.rows > 0) {
      break;
    }
H
Haojun Liao 已提交
355 356
  }

357 358 359 360
  size_t rows = pRes->info.rows;
  pOperator->resultInfo.totalRows += rows;

  return (rows == 0)? NULL:pRes;
H
Haojun Liao 已提交
361 362
}

363
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList,
wmmhello's avatar
wmmhello 已提交
364
    SNode* pCondition, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
365 366 367 368 369 370
  SGroupbyOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupbyOperatorInfo));
  SOperatorInfo*        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

371 372 373 374 375
  pInfo->pGroupCols      = pGroupColList;
  pInfo->pCondition      = pCondition;

  pInfo->pScalarExprInfo = pScalarExprInfo;
  pInfo->numOfScalarExpr = numOfScalarExpr;
376
  pInfo->pScalarFuncCtx  = createSqlFunctionCtx(pScalarExprInfo, numOfScalarExpr, &pInfo->rowCellInfoOffset);
377

H
Haojun Liao 已提交
378
  int32_t code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList);
H
Haojun Liao 已提交
379 380 381 382
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

383 384
  initResultSizeInfo(pOperator, 4096);
  initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResultBlock, pInfo->groupKeyLen, pTaskInfo->id.str);
385 386
  initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);

H
Haojun Liao 已提交
387
  pOperator->name         = "GroupbyAggOperator";
388
  pOperator->blocking     = true;
H
Haojun Liao 已提交
389
  pOperator->status       = OP_NOT_OPENED;
390
  // pOperator->operatorType = OP_Groupby;
H
Haojun Liao 已提交
391
  pOperator->pExpr        = pExprInfo;
392
  pOperator->numOfExprs  = numOfCols;
H
Haojun Liao 已提交
393
  pOperator->info         = pInfo;
394
  pOperator->pTaskInfo    = pTaskInfo;
H
Haojun Liao 已提交
395

396
  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashGroupbyAggregate, NULL, NULL, destroyGroupOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
H
Haojun Liao 已提交
397 398 399 400
  code = appendDownstream(pOperator, &downstream, 1);
  return pOperator;

  _error:
H
Haojun Liao 已提交
401
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
402 403 404
  taosMemoryFreeClear(pInfo);
  taosMemoryFreeClear(pOperator);
  return NULL;
405 406
}

H
Haojun Liao 已提交
407
static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
H
Haojun Liao 已提交
408
//  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
H
Haojun Liao 已提交
409 410 411 412 413

  SPartitionOperatorInfo* pInfo = pOperator->info;

  int32_t numOfGroupCols = taosArrayGetSize(pInfo->pGroupCols);
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
414
    recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
H
Haojun Liao 已提交
415 416
    int32_t len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);

H
Haojun Liao 已提交
417 418
    SDataGroupInfo* pGInfo = NULL;
    void *pPage = getCurrentDataGroupInfo(pInfo, &pGInfo, len);
H
Haojun Liao 已提交
419

H
Haojun Liao 已提交
420 421 422
    pGInfo->numOfRows += 1;
    if (pGInfo->groupId == 0) {
      pGInfo->groupId = calcGroupId(pInfo->keyBuf, len);
H
Haojun Liao 已提交
423 424
    }

425
    // number of rows
H
Haojun Liao 已提交
426
    int32_t* rows = (int32_t*) pPage;
H
Haojun Liao 已提交
427

428 429
    // group id

430
    size_t numOfCols = pOperator->numOfExprs;
H
Haojun Liao 已提交
431
    for(int32_t i = 0; i < numOfCols; ++i) {
432 433 434 435
      SExprInfo* pExpr = &pOperator->pExpr[i];
      int32_t slotId = pExpr->base.pParam[0].pCol->slotId;

      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
H
Haojun Liao 已提交
436

H
Haojun Liao 已提交
437 438
      int32_t bytes = pColInfoData->info.bytes;
      int32_t startOffset = pInfo->columnOffset[i];
H
Haojun Liao 已提交
439

440 441
      int32_t* columnLen  = NULL;
      int32_t  contentLen = 0;
H
Haojun Liao 已提交
442 443

      if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
wafwerar's avatar
wafwerar 已提交
444
        int32_t* offset = (int32_t*)((char*)pPage + startOffset);
445 446
        columnLen       = (int32_t*) ((char*)pPage + startOffset + sizeof(int32_t) * pInfo->rowCapacity);
        char*    data   = (char*)((char*) columnLen + sizeof(int32_t));
H
Haojun Liao 已提交
447 448 449 450 451 452 453 454

        if (colDataIsNull_s(pColInfoData, j)) {
          offset[(*rows)] = -1;
          contentLen = 0;
        } else {
          offset[*rows] = (*columnLen);
          char* src = colDataGetData(pColInfoData, j);
          memcpy(data + (*columnLen), src, varDataTLen(src));
455 456 457
          int32_t v = (data + (*columnLen) + varDataTLen(src) - (char*)pPage);
          ASSERT(v > 0);

H
Haojun Liao 已提交
458 459
          contentLen = varDataTLen(src);
        }
H
Haojun Liao 已提交
460
      } else {
wafwerar's avatar
wafwerar 已提交
461
        char* bitmap = (char*)pPage + startOffset;
462
        columnLen    = (int32_t*) ((char*)pPage + startOffset + BitmapLen(pInfo->rowCapacity));
H
Haojun Liao 已提交
463
        char* data   = (char*) columnLen + sizeof(int32_t);
H
Haojun Liao 已提交
464 465 466

        bool isNull = colDataIsNull_f(pColInfoData->nullbitmap, j);
        if (isNull) {
H
Haojun Liao 已提交
467
          colDataSetNull_f(bitmap, (*rows));
H
Haojun Liao 已提交
468
        } else {
H
Haojun Liao 已提交
469
          memcpy(data + (*columnLen), colDataGetData(pColInfoData, j), bytes);
H
Haojun Liao 已提交
470
        }
H
Haojun Liao 已提交
471
        contentLen = bytes;
H
Haojun Liao 已提交
472
      }
H
Haojun Liao 已提交
473 474

      (*columnLen) += contentLen;
475
      ASSERT(*columnLen >= 0);
H
Haojun Liao 已提交
476 477
    }

H
Haojun Liao 已提交
478 479
    (*rows) += 1;

H
Haojun Liao 已提交
480 481 482
    setBufPageDirty(pPage, true);
    releaseBufPage(pInfo->pBuf, pPage);
  }
H
Haojun Liao 已提交
483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506
}

void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len) {
  SDataGroupInfo* p = taosHashGet(pInfo->pGroupSet, pInfo->keyBuf, len);

  void* pPage = NULL;
  if (p == NULL) { // it is a new group
    SDataGroupInfo gi = {0};
    gi.pPageList = taosArrayInit(100, sizeof(int32_t));
    taosHashPut(pInfo->pGroupSet, pInfo->keyBuf, len, &gi, sizeof(SDataGroupInfo));

    p = taosHashGet(pInfo->pGroupSet, pInfo->keyBuf, len);

    int32_t pageId = 0;
    pPage = getNewBufPage(pInfo->pBuf, 0, &pageId);
    taosArrayPush(p->pPageList, &pageId);

    *(int32_t *) pPage = 0;
  } else {
    int32_t* curId = taosArrayGetLast(p->pPageList);
    pPage = getBufPage(pInfo->pBuf, *curId);

    int32_t *rows = (int32_t*) pPage;
    if (*rows >= pInfo->rowCapacity) {
507 508 509
      // release buffer
      releaseBufPage(pInfo->pBuf, pPage);

H
Haojun Liao 已提交
510 511 512 513
      // add a new page for current group
      int32_t pageId = 0;
      pPage = getNewBufPage(pInfo->pBuf, 0, &pageId);
      taosArrayPush(p->pPageList, &pageId);
514
      memset(pPage, 0, getBufPageSize(pInfo->pBuf));
H
Haojun Liao 已提交
515 516
    }
  }
H
Haojun Liao 已提交
517

H
Haojun Liao 已提交
518 519 520 521 522 523 524 525 526 527 528 529 530 531
  *pGroupInfo = p;
  return pPage;
}

uint64_t calcGroupId(char* pData, int32_t len) {
  T_MD5_CTX context;
  tMD5Init(&context);
  tMD5Update(&context, (uint8_t*)pData, len);
  tMD5Final(&context);

  // NOTE: only extract the initial 8 bytes of the final MD5 digest
  uint64_t id = 0;
  memcpy(&id, context.digest, sizeof(uint64_t));
  return id;
H
Haojun Liao 已提交
532 533
}

H
Haojun Liao 已提交
534 535 536 537
int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity) {
  size_t numOfCols = pBlock->info.numOfCols;
  int32_t* offset = taosMemoryCalloc(pBlock->info.numOfCols, sizeof(int32_t));

538
  offset[0] = sizeof(int32_t) + sizeof(uint64_t);  // the number of rows in current page, ref to SSDataBlock paged serialization format
H
Haojun Liao 已提交
539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557

  for(int32_t i = 0; i < numOfCols - 1; ++i) {
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);

    int32_t bytes = pColInfoData->info.bytes;
    int32_t payloadLen = bytes * rowCapacity;
    
    if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
      // offset segment + content length + payload
      offset[i + 1] = rowCapacity * sizeof(int32_t) + sizeof(int32_t) + payloadLen + offset[i];
    } else {
      // bitmap + content length + payload
      offset[i + 1] = BitmapLen(rowCapacity) + sizeof(int32_t) + payloadLen + offset[i];
    }
  }

  return offset;
}

H
Haojun Liao 已提交
558 559 560 561 562 563 564 565
static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
  SPartitionOperatorInfo* pInfo = pOperator->info;

  SDataGroupInfo* pGroupInfo = pInfo->pGroupIter;
  if (pInfo->pGroupIter == NULL || pInfo->pageIndex >= taosArrayGetSize(pGroupInfo->pPageList)) {
    // try next group data
    pInfo->pGroupIter = taosHashIterate(pInfo->pGroupSet, pInfo->pGroupIter);
    if (pInfo->pGroupIter == NULL) {
566
      doSetOperatorCompleted(pOperator);
H
Haojun Liao 已提交
567 568 569 570 571 572 573 574 575 576
      return NULL;
    }

    pGroupInfo = pInfo->pGroupIter;
    pInfo->pageIndex = 0;
  }

  int32_t* pageId = taosArrayGet(pGroupInfo->pPageList, pInfo->pageIndex);
  void* page = getBufPage(pInfo->pBuf, *pageId);

H
Haojun Liao 已提交
577
  blockDataFromBuf1(pInfo->binfo.pRes, page, pInfo->rowCapacity);
H
Haojun Liao 已提交
578 579

  pInfo->pageIndex += 1;
580
  releaseBufPage(pInfo->pBuf, page);
H
Haojun Liao 已提交
581

582
  blockDataUpdateTsWindow(pInfo->binfo.pRes, 0);
H
Haojun Liao 已提交
583
  pInfo->binfo.pRes->info.groupId = pGroupInfo->groupId;
584 585

  pOperator->resultInfo.totalRows += pInfo->binfo.pRes->info.rows;
H
Haojun Liao 已提交
586 587 588
  return pInfo->binfo.pRes;
}

589
static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
590 591
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
592 593
  }

H
Haojun Liao 已提交
594 595
  SGroupbyOperatorInfo* pInfo = pOperator->info;
  SSDataBlock* pRes = pInfo->binfo.pRes;
596

H
Haojun Liao 已提交
597
  if (pOperator->status == OP_RES_TO_RETURN) {
H
Haojun Liao 已提交
598 599
    blockDataCleanup(pRes);
    return buildPartitionResult(pOperator);
H
Haojun Liao 已提交
600 601
  }

602
  int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
603
  SOperatorInfo* downstream = pOperator->pDownstream[0];
H
Haojun Liao 已提交
604

H
Haojun Liao 已提交
605
  while (1) {
606
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
H
Haojun Liao 已提交
607 608 609
    if (pBlock == NULL) {
      break;
    }
H
Haojun Liao 已提交
610

H
Haojun Liao 已提交
611
    doHashPartition(pOperator, pBlock);
H
Haojun Liao 已提交
612 613
  }

614 615
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;

H
Haojun Liao 已提交
616
  pOperator->status = OP_RES_TO_RETURN;
H
Haojun Liao 已提交
617 618 619 620 621
  blockDataEnsureCapacity(pRes, 4096);
  return buildPartitionResult(pOperator);
}

static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) {
H
Haojun Liao 已提交
622
  SPartitionOperatorInfo* pInfo = (SPartitionOperatorInfo*)param;
H
Haojun Liao 已提交
623 624 625
  doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
  taosArrayDestroy(pInfo->pGroupCols);
  taosArrayDestroy(pInfo->pGroupColVals);
H
Haojun Liao 已提交
626 627
  taosMemoryFree(pInfo->keyBuf);
  taosMemoryFree(pInfo->columnOffset);
H
Haojun Liao 已提交
628 629
}

630
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList,
wmmhello's avatar
wmmhello 已提交
631
                                           SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
632 633
  SPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SPartitionOperatorInfo));
  SOperatorInfo*        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
634
  if (pInfo == NULL || pOperator == NULL) {
H
Haojun Liao 已提交
635 636
    goto _error;
  }
637

H
Haojun Liao 已提交
638 639 640 641 642 643 644 645
  pInfo->pGroupCols = pGroupColList;

  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
  pInfo->pGroupSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK);
  if (pInfo->pGroupSet == NULL) {
    goto _error;
  }

646 647 648 649 650
  uint32_t defaultPgsz  = 0;
  uint32_t defaultBufsz = 0;
  getBufferPgSize(pResultBlock->info.rowSize, &defaultPgsz, &defaultBufsz);

  int32_t code = createDiskbasedBuf(&pInfo->pBuf, defaultPgsz, defaultBufsz, pTaskInfo->id.str, TD_TMP_DIR_PATH);
H
Haojun Liao 已提交
651 652 653 654
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
655 656
  pInfo->rowCapacity = blockDataGetCapacityInRow(pResultBlock, getBufPageSize(pInfo->pBuf));
  pInfo->columnOffset = setupColumnOffset(pResultBlock, pInfo->rowCapacity);
H
Haojun Liao 已提交
657 658 659 660
  code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
H
Haojun Liao 已提交
661 662

  pOperator->name         = "PartitionOperator";
663
  pOperator->blocking     = true;
664
  pOperator->status       = OP_NOT_OPENED;
H
Haojun Liao 已提交
665 666
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION;
  pInfo->binfo.pRes       = pResultBlock;
667
  pOperator->numOfExprs   = numOfCols;
668
  pOperator->pExpr        = pExprInfo;
H
Haojun Liao 已提交
669
  pOperator->info         = pInfo;
670
  pOperator->pTaskInfo    = pTaskInfo;
671 672 673

  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashPartition, NULL, NULL, destroyPartitionOperatorInfo,
                                         NULL, NULL, NULL);
H
Haojun Liao 已提交
674

H
Haojun Liao 已提交
675
  code = appendDownstream(pOperator, &downstream, 1);
676 677 678
  return pOperator;

  _error:
H
Haojun Liao 已提交
679
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
680 681
  taosMemoryFreeClear(pInfo);
  taosMemoryFreeClear(pOperator);
682
  return NULL;
H
Haojun Liao 已提交
683
}