tsdbCacheRead.c 6.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "taoserror.h"
#include "tarray.h"
#include "tcommon.h"
#include "tsdb.h"

21
typedef struct SCacheRowsReader {
22 23 24
  SVnode*   pVnode;
  STSchema* pSchema;
  uint64_t  uid;
25 26 27 28 29
  char**    transferBuf;  // todo remove it soon
  int32_t   numOfCols;
  int32_t   type;
  int32_t   tableIndex;  // currently returned result tables
  SArray*   pTableList;  // table id list
30
} SCacheRowsReader;
31

32
static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds) {
33
  ASSERT(pReader->numOfCols <= taosArrayGetSize(pBlock->pDataBlock));
34 35 36
  int32_t numOfRows = pBlock->info.rows;

  SColVal colVal = {0};
37
  for (int32_t i = 0; i < pReader->numOfCols; ++i) {
38 39
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);

40 41 42
    if (slotIds[i] == -1) {
      colDataAppend(pColInfoData, numOfRows, (const char*)&pRow->ts, false);
    } else {
43 44 45
      int32_t slotId = slotIds[i];

      tTSRowGetVal(pRow, pReader->pSchema, slotId, &colVal);
46 47

      if (IS_VAR_DATA_TYPE(colVal.type)) {
48
        if (colVal.isNull || colVal.isNone) {
49 50
          colDataAppendNULL(pColInfoData, numOfRows);
        } else {
51 52 53
          varDataSetLen(pReader->transferBuf[slotId], colVal.value.nData);
          memcpy(varDataVal(pReader->transferBuf[slotId]), colVal.value.pData, colVal.value.nData);
          colDataAppend(pColInfoData, numOfRows, pReader->transferBuf[slotId], false);
54 55
        }
      } else {
56
        colDataAppend(pColInfoData, numOfRows, (const char*)&colVal.value, colVal.isNull || colVal.isNone);
57 58
      }
    }
59 60 61 62 63
  }

  pBlock->info.rows += 1;
}

64 65 66 67
int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, int32_t numOfCols, void** pReader) {
  *pReader = NULL;

  SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader));
68 69 70 71
  if (p == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

72 73 74
  p->type = type;
  p->pVnode = pVnode;
  p->numOfCols = numOfCols;
75

76 77 78 79 80
  if (taosArrayGetSize(pTableIdList) == 0) {
    *pReader = p;
    return TSDB_CODE_SUCCESS;
  }

81
  STableKeyInfo* pKeyInfo = taosArrayGet(pTableIdList, 0);
82 83
  p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, pKeyInfo->uid, -1);
  p->pTableList = pTableIdList;
84

85
  p->transferBuf = taosMemoryCalloc(p->pSchema->numOfCols, POINTER_BYTES);
86 87 88 89
  if (p->transferBuf == NULL) {
    return  TSDB_CODE_OUT_OF_MEMORY;
  }

90
  for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) {
M
Minglei Jin 已提交
91
    if (IS_VAR_DATA_TYPE(p->pSchema->columns[i].type)) {
92
      p->transferBuf[i] = taosMemoryMalloc(p->pSchema->columns[i].bytes);
93 94 95 96
      if (p->transferBuf[i] == NULL) {
        tsdbCacherowsReaderClose(p);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
97 98
    }
  }
99

100 101 102 103
  *pReader = p;
  return TSDB_CODE_SUCCESS;
}

104 105
int32_t tsdbCacherowsReaderClose(void* pReader) {
  SCacheRowsReader* p = pReader;
106

107 108 109 110 111 112 113
  if (p->pSchema != NULL) {
    for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) {
      taosMemoryFreeClear(p->transferBuf[i]);
    }

    taosMemoryFree(p->transferBuf);
    taosMemoryFree(p->pSchema);
114 115 116 117 118 119
  }

  taosMemoryFree(pReader);
  return TSDB_CODE_SUCCESS;
}

120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint64_t uid, STSRow** pRow, LRUHandle** h) {
  int32_t code = TSDB_CODE_SUCCESS;
  if ((pr->type & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW) {
    code = tsdbCacheGetLastrowH(lruCache, uid, pr->pVnode->pTsdb, h);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    // no data in the table of Uid
    if (*h != NULL) {
      *pRow = (STSRow*)taosLRUCacheValue(lruCache, *h);
    }
  } else {
    code = tsdbCacheGetLastH(lruCache, uid, pr->pVnode->pTsdb, h);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    // no data in the table of Uid
    if (*h != NULL) {
      SArray* pLast = (SArray*)taosLRUCacheValue(lruCache, *h);
      tsdbCacheLastArray2Row(pLast, pRow, pr->pSchema);
    }
  }

  return code;
}

int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds, SArray* pTableUidList) {
149
  if (pReader == NULL || pResBlock == NULL) {
150 151 152
    return TSDB_CODE_INVALID_PARA;
  }

153
  SCacheRowsReader* pr = pReader;
154

155
  int32_t    code = TSDB_CODE_SUCCESS;
156
  SLRUCache* lruCache = pr->pVnode->pTsdb->lruCache;
157 158 159
  LRUHandle* h = NULL;
  STSRow*    pRow = NULL;
  size_t     numOfTables = taosArrayGetSize(pr->pTableList);
160 161

  // retrieve the only one last row of all tables in the uid list.
162
  if ((pr->type & CACHESCAN_RETRIEVE_TYPE_SINGLE) == CACHESCAN_RETRIEVE_TYPE_SINGLE) {
163 164 165
    int64_t lastKey = INT64_MIN;
    bool    internalResult = false;
    for (int32_t i = 0; i < numOfTables; ++i) {
166
      STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i);
167

168 169
      code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
      if (code != TSDB_CODE_SUCCESS)  {
170 171 172
        return code;
      }

173
      if (h == NULL) {
174 175 176 177 178 179 180 181
        continue;
      }

      if (pRow->ts > lastKey) {
        // Set result row into the same rowIndex repeatly, so we need to check if the internal result row has already
        // appended or not.
        if (internalResult) {
          pResBlock->info.rows -= 1;
182
          taosArrayClear(pTableUidList);
183 184
        }

185
        saveOneRow(pRow, pResBlock, pr, slotIds);
186
        taosArrayPush(pTableUidList, &pKeyInfo->uid);
187 188 189
        internalResult = true;
        lastKey = pRow->ts;
      }
190

191
      tsdbCacheRelease(lruCache, h);
192
    }
193
  } else if ((pr->type & CACHESCAN_RETRIEVE_TYPE_ALL) == CACHESCAN_RETRIEVE_TYPE_ALL) {
194 195
    for (int32_t i = pr->tableIndex; i < numOfTables; ++i) {
      STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i);
196 197
      code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
      if (code != TSDB_CODE_SUCCESS)  {
198 199 200
        return code;
      }

201
      if (h == NULL) {
202 203 204
        continue;
      }

205
      saveOneRow(pRow, pResBlock, pr, slotIds);
206
      taosArrayPush(pTableUidList, &pKeyInfo->uid);
207

208
      tsdbCacheRelease(lruCache, h);
209

210 211 212 213
      pr->tableIndex += 1;
      if (pResBlock->info.rows >= pResBlock->info.capacity) {
        return TSDB_CODE_SUCCESS;
      }
214 215 216 217 218 219 220
    }
  } else {
    return TSDB_CODE_INVALID_PARA;
  }

  return TSDB_CODE_SUCCESS;
}