cachescanoperator.c 9.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "os.h"
#include "function.h"
#include "tname.h"

#include "tdatablock.h"
#include "tmsg.h"

#include "executorimpl.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"

28
static SSDataBlock* doScanCache(SOperatorInfo* pOperator);
29
static void destroyLastrowScanOperator(void* param);
30 31
static int32_t extractTargetSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds);

32 33 34
SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle,
                                           SExecTaskInfo* pTaskInfo) {
  int32_t           code = TSDB_CODE_SUCCESS;
35 36 37
  SLastrowScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SLastrowScanInfo));
  SOperatorInfo*    pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
38
    code = TSDB_CODE_OUT_OF_MEMORY;
39 40 41 42
    goto _error;
  }

  pInfo->readHandle = *readHandle;
43
  pInfo->pRes       = createResDataBlock(pScanNode->scan.node.pOutputDataBlockDesc);
44 45

  int32_t numOfCols = 0;
X
Xiaoyu Wang 已提交
46
  pInfo->pColMatchInfo = extractColMatchInfo(pScanNode->scan.pScanCols, pScanNode->scan.node.pOutputDataBlockDesc, &numOfCols,
47
                                             COL_MATCH_FROM_COL_ID);
48
  code = extractTargetSlotId(pInfo->pColMatchInfo, pTaskInfo, &pInfo->pSlotIds);
49 50 51 52
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

53 54
  STableListInfo* pTableList = &pTaskInfo->tableqinfoList;

55
  initResultSizeInfo(&pOperator->resultInfo, 4096);
56 57 58 59 60
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
  pInfo->pUidList = taosArrayInit(4, sizeof(int64_t));

  // partition by tbname
  if (taosArrayGetSize(pTableList->pGroupList) == taosArrayGetSize(pTableList->pTableList)) {
61 62 63 64 65 66 67
    pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_ALL|CACHESCAN_RETRIEVE_LAST_ROW;
    code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pTableList->pTableList,
                                   taosArrayGetSize(pInfo->pColMatchInfo), &pInfo->pLastrowReader);
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }

68 69 70
    pInfo->pBufferredRes = createOneDataBlock(pInfo->pRes, false);
    blockDataEnsureCapacity(pInfo->pBufferredRes, pOperator->resultInfo.capacity);
  } else { // by tags
71
    pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_SINGLE|CACHESCAN_RETRIEVE_LAST_ROW;
72
  }
73

X
Xiaoyu Wang 已提交
74
  if (pScanNode->scan.pScanPseudoCols != NULL) {
75 76
    SExprSupp* pPseudoExpr = &pInfo->pseudoExprSup;

X
Xiaoyu Wang 已提交
77
    pPseudoExpr->pExprInfo = createExprInfo(pScanNode->scan.pScanPseudoCols, NULL, &pPseudoExpr->numOfExprs);
78 79
    pPseudoExpr->pCtx = createSqlFunctionCtx(pPseudoExpr->pExprInfo, pPseudoExpr->numOfExprs, &pPseudoExpr->rowEntryInfoOffset);
  }
80

81
  pOperator->name         = "LastrowScanOperator";
82
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN;
83 84 85 86
  pOperator->blocking     = false;
  pOperator->status       = OP_NOT_OPENED;
  pOperator->info         = pInfo;
  pOperator->pTaskInfo    = pTaskInfo;
87 88 89
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);

  pOperator->fpSet =
90
      createOperatorFpSet(operatorDummyOpenFn, doScanCache, NULL, NULL, destroyLastrowScanOperator, NULL, NULL, NULL);
91

92 93 94 95
  pOperator->cost.openCost = 0;
  return pOperator;

  _error:
96 97
  pTaskInfo->code = code;
  destroyLastrowScanOperator(pInfo);
98 99 100 101
  taosMemoryFree(pOperator);
  return NULL;
}

102
SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
103 104 105 106 107 108
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SLastrowScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
109 110
  STableListInfo*   pTableList = &pTaskInfo->tableqinfoList;
  int32_t           size = taosArrayGetSize(pTableList->pTableList);
111
  if (size == 0) {
112
    doSetOperatorCompleted(pOperator);
113 114 115
    return NULL;
  }

116 117
  blockDataCleanup(pInfo->pRes);

118
  // check if it is a group by tbname
119
  if ((pInfo->retrieveType & CACHESCAN_RETRIEVE_TYPE_ALL) == CACHESCAN_RETRIEVE_TYPE_ALL) {
120 121 122 123
    if (pInfo->indexOfBufferedRes >= pInfo->pBufferredRes->info.rows) {
      blockDataCleanup(pInfo->pBufferredRes);
      taosArrayClear(pInfo->pUidList);

124
      int32_t code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pBufferredRes, pInfo->pSlotIds, pInfo->pUidList);
125
      if (code != TSDB_CODE_SUCCESS) {
126
        T_LONG_JMP(pTaskInfo->env, code);
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
      }

      // check for tag values
      int32_t resultRows = pInfo->pBufferredRes->info.rows;
      ASSERT(resultRows == taosArrayGetSize(pInfo->pUidList));
      pInfo->indexOfBufferedRes = 0;
    }

    if (pInfo->indexOfBufferedRes < pInfo->pBufferredRes->info.rows) {
      for(int32_t i = 0; i < taosArrayGetSize(pInfo->pColMatchInfo); ++i) {
        SColMatchInfo* pMatchInfo = taosArrayGet(pInfo->pColMatchInfo, i);
        int32_t slotId = pMatchInfo->targetSlotId;

        SColumnInfoData* pSrc = taosArrayGet(pInfo->pBufferredRes->pDataBlock, slotId);
        SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, slotId);

        char* p = colDataGetData(pSrc, pInfo->indexOfBufferedRes);
        bool isNull = colDataIsNull_s(pSrc, pInfo->indexOfBufferedRes);
        colDataAppend(pDst, 0, p, isNull);
      }

148 149 150
      pInfo->pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, pInfo->indexOfBufferedRes);
      pInfo->pRes->info.rows = 1;

151 152
      if (pInfo->pseudoExprSup.numOfExprs > 0) {
        SExprSupp* pSup = &pInfo->pseudoExprSup;
153
        int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes,
154
                               GET_TASKID(pTaskInfo));
155 156 157 158
        if (code != TSDB_CODE_SUCCESS) {
          pTaskInfo->code = code;
          return NULL;
        }
159 160
      }

161 162 163 164 165 166 167 168
      if (pTableList->map != NULL) {
        int64_t* groupId = taosHashGet(pTableList->map, &pInfo->pRes->info.uid, sizeof(int64_t));
        pInfo->pRes->info.groupId = *groupId;
      } else {
        ASSERT(taosArrayGetSize(pTableList->pTableList) == 1);
        STableKeyInfo* pKeyInfo = taosArrayGet(pTableList->pTableList, 0);
        pInfo->pRes->info.groupId = pKeyInfo->groupId;
      }
169 170 171 172 173 174

      pInfo->indexOfBufferedRes += 1;
      return pInfo->pRes;
    } else {
      doSetOperatorCompleted(pOperator);
      return NULL;
175
    }
176 177 178 179 180 181
  } else {
    size_t totalGroups = taosArrayGetSize(pTableList->pGroupList);

    while (pInfo->currentGroupIndex < totalGroups) {
      SArray* pGroupTableList = taosArrayGetP(pTableList->pGroupList, pInfo->currentGroupIndex);

182
      tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pGroupTableList,
183 184 185
                            taosArrayGetSize(pInfo->pColMatchInfo), &pInfo->pLastrowReader);
      taosArrayClear(pInfo->pUidList);

186
      int32_t code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList);
187 188 189
      if (code != TSDB_CODE_SUCCESS) {
        longjmp(pTaskInfo->env, code);
      }
190

191 192 193 194 195 196 197 198 199 200 201
      pInfo->currentGroupIndex += 1;

      // check for tag values
      if (pInfo->pRes->info.rows > 0) {
        if (pInfo->pseudoExprSup.numOfExprs > 0) {
          SExprSupp* pSup = &pInfo->pseudoExprSup;
          pInfo->pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, 0);

          STableKeyInfo* pKeyInfo = taosArrayGet(pGroupTableList, 0);
          pInfo->pRes->info.groupId = pKeyInfo->groupId;

202
          code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes,
203
                                 GET_TASKID(pTaskInfo));
204 205 206 207
          if  (code != TSDB_CODE_SUCCESS) {
            pTaskInfo->code = code;
            return NULL;
          }
208 209
        }

H
Haojun Liao 已提交
210
        pInfo->pLastrowReader = tsdbCacherowsReaderClose(pInfo->pLastrowReader);
211 212
        return pInfo->pRes;
      }
213 214 215
    }

    doSetOperatorCompleted(pOperator);
216
    return NULL;
217 218 219
  }
}

220
void destroyLastrowScanOperator(void* param) {
221 222
  SLastrowScanInfo* pInfo = (SLastrowScanInfo*)param;
  blockDataDestroy(pInfo->pRes);
H
Haojun Liao 已提交
223 224 225 226 227 228 229 230 231
  blockDataDestroy(pInfo->pBufferredRes);
  taosMemoryFree(pInfo->pSlotIds);
  taosArrayDestroy(pInfo->pUidList);
  taosArrayDestroy(pInfo->pColMatchInfo);

  if (pInfo->pLastrowReader != NULL) {
    pInfo->pLastrowReader = tsdbCacherowsReaderClose(pInfo->pLastrowReader);
  }

232 233 234 235 236 237 238 239 240 241 242 243 244
  taosMemoryFreeClear(param);
}

int32_t extractTargetSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds) {
  size_t   numOfCols = taosArrayGetSize(pColMatchInfo);

  *pSlotIds = taosMemoryMalloc(numOfCols * sizeof(int32_t));
  if (*pSlotIds == NULL)  {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  for (int32_t i = 0; i < numOfCols; ++i) {
    SColMatchInfo* pColMatch = taosArrayGet(pColMatchInfo, i);
245 246
    for (int32_t j = 0; j < pTaskInfo->schemaInfo.sw->nCols; ++j) {
      if (pColMatch->colId == pTaskInfo->schemaInfo.sw->pSchema[j].colId &&
247 248 249 250 251
          pColMatch->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
        (*pSlotIds)[pColMatch->targetSlotId] = -1;
        break;
      }

252
      if (pColMatch->colId == pTaskInfo->schemaInfo.sw->pSchema[j].colId) {
253 254 255 256 257 258 259 260
        (*pSlotIds)[pColMatch->targetSlotId] = j;
        break;
      }
    }
  }

  return TSDB_CODE_SUCCESS;
}