tsdbCacheRead.c 6.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "taoserror.h"
#include "tarray.h"
#include "tcommon.h"
#include "tsdb.h"

21
typedef struct SCacheRowsReader {
22 23 24
  SVnode*   pVnode;
  STSchema* pSchema;
  uint64_t  uid;
25 26 27 28 29
  char**    transferBuf;  // todo remove it soon
  int32_t   numOfCols;
  int32_t   type;
  int32_t   tableIndex;  // currently returned result tables
  SArray*   pTableList;  // table id list
30
} SCacheRowsReader;
31

32
static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds) {
33
  ASSERT(pReader->numOfCols <= taosArrayGetSize(pBlock->pDataBlock));
34 35 36
  int32_t numOfRows = pBlock->info.rows;

  SColVal colVal = {0};
37
  for (int32_t i = 0; i < pReader->numOfCols; ++i) {
38 39
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);

40 41 42
    if (slotIds[i] == -1) {
      colDataAppend(pColInfoData, numOfRows, (const char*)&pRow->ts, false);
    } else {
43 44 45
      int32_t slotId = slotIds[i];

      tTSRowGetVal(pRow, pReader->pSchema, slotId, &colVal);
46 47

      if (IS_VAR_DATA_TYPE(colVal.type)) {
H
Haojun Liao 已提交
48
        if (!COL_VAL_IS_VALUE(&colVal)) {
49 50
          colDataAppendNULL(pColInfoData, numOfRows);
        } else {
51 52 53
          varDataSetLen(pReader->transferBuf[slotId], colVal.value.nData);
          memcpy(varDataVal(pReader->transferBuf[slotId]), colVal.value.pData, colVal.value.nData);
          colDataAppend(pColInfoData, numOfRows, pReader->transferBuf[slotId], false);
54 55
        }
      } else {
H
Haojun Liao 已提交
56
        colDataAppend(pColInfoData, numOfRows, (const char*)&colVal.value, !COL_VAL_IS_VALUE(&colVal));
57 58
      }
    }
59 60 61 62 63
  }

  pBlock->info.rows += 1;
}

64 65 66 67
int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, int32_t numOfCols, void** pReader) {
  *pReader = NULL;

  SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader));
68 69 70 71
  if (p == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

72 73 74
  p->type = type;
  p->pVnode = pVnode;
  p->numOfCols = numOfCols;
75

76 77 78 79 80
  if (taosArrayGetSize(pTableIdList) == 0) {
    *pReader = p;
    return TSDB_CODE_SUCCESS;
  }

81
  STableKeyInfo* pKeyInfo = taosArrayGet(pTableIdList, 0);
82
  p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, pKeyInfo->uid, -1, 1);
83
  p->pTableList = pTableIdList;
84

85
  p->transferBuf = taosMemoryCalloc(p->pSchema->numOfCols, POINTER_BYTES);
86
  if (p->transferBuf == NULL) {
H
Haojun Liao 已提交
87
    tsdbCacherowsReaderClose(p);
H
Haojun Liao 已提交
88
    return TSDB_CODE_OUT_OF_MEMORY;
89 90
  }

91
  for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) {
M
Minglei Jin 已提交
92
    if (IS_VAR_DATA_TYPE(p->pSchema->columns[i].type)) {
93
      p->transferBuf[i] = taosMemoryMalloc(p->pSchema->columns[i].bytes);
94 95 96 97
      if (p->transferBuf[i] == NULL) {
        tsdbCacherowsReaderClose(p);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
98 99
    }
  }
100

101 102 103 104
  *pReader = p;
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
105
void* tsdbCacherowsReaderClose(void* pReader) {
106
  SCacheRowsReader* p = pReader;
107

108 109 110 111 112 113 114
  if (p->pSchema != NULL) {
    for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) {
      taosMemoryFreeClear(p->transferBuf[i]);
    }

    taosMemoryFree(p->transferBuf);
    taosMemoryFree(p->pSchema);
115 116 117
  }

  taosMemoryFree(pReader);
H
Haojun Liao 已提交
118
  return NULL;
119 120
}

H
Haojun Liao 已提交
121 122
static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint64_t uid, STSRow** pRow,
                                 LRUHandle** h) {
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
  int32_t code = TSDB_CODE_SUCCESS;
  if ((pr->type & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW) {
    code = tsdbCacheGetLastrowH(lruCache, uid, pr->pVnode->pTsdb, h);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    // no data in the table of Uid
    if (*h != NULL) {
      *pRow = (STSRow*)taosLRUCacheValue(lruCache, *h);
    }
  } else {
    code = tsdbCacheGetLastH(lruCache, uid, pr->pVnode->pTsdb, h);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    // no data in the table of Uid
    if (*h != NULL) {
      SArray* pLast = (SArray*)taosLRUCacheValue(lruCache, *h);
      tsdbCacheLastArray2Row(pLast, pRow, pr->pSchema);
    }
  }

  return code;
}

int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds, SArray* pTableUidList) {
151
  if (pReader == NULL || pResBlock == NULL) {
152 153 154
    return TSDB_CODE_INVALID_PARA;
  }

155
  SCacheRowsReader* pr = pReader;
156

157
  int32_t    code = TSDB_CODE_SUCCESS;
158
  SLRUCache* lruCache = pr->pVnode->pTsdb->lruCache;
159 160 161
  LRUHandle* h = NULL;
  STSRow*    pRow = NULL;
  size_t     numOfTables = taosArrayGetSize(pr->pTableList);
162 163

  // retrieve the only one last row of all tables in the uid list.
164
  if ((pr->type & CACHESCAN_RETRIEVE_TYPE_SINGLE) == CACHESCAN_RETRIEVE_TYPE_SINGLE) {
165 166 167
    int64_t lastKey = INT64_MIN;
    bool    internalResult = false;
    for (int32_t i = 0; i < numOfTables; ++i) {
168
      STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i);
169

170
      code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
H
Haojun Liao 已提交
171
      if (code != TSDB_CODE_SUCCESS) {
172 173 174
        return code;
      }

175
      if (h == NULL) {
176 177 178 179 180 181 182 183
        continue;
      }

      if (pRow->ts > lastKey) {
        // Set result row into the same rowIndex repeatly, so we need to check if the internal result row has already
        // appended or not.
        if (internalResult) {
          pResBlock->info.rows -= 1;
184
          taosArrayClear(pTableUidList);
185 186
        }

187
        saveOneRow(pRow, pResBlock, pr, slotIds);
188
        taosArrayPush(pTableUidList, &pKeyInfo->uid);
189 190 191
        internalResult = true;
        lastKey = pRow->ts;
      }
192

193
      tsdbCacheRelease(lruCache, h);
194
    }
195
  } else if ((pr->type & CACHESCAN_RETRIEVE_TYPE_ALL) == CACHESCAN_RETRIEVE_TYPE_ALL) {
196 197
    for (int32_t i = pr->tableIndex; i < numOfTables; ++i) {
      STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i);
198
      code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
H
Haojun Liao 已提交
199
      if (code != TSDB_CODE_SUCCESS) {
200 201 202
        return code;
      }

203
      if (h == NULL) {
204 205 206
        continue;
      }

207
      saveOneRow(pRow, pResBlock, pr, slotIds);
208
      taosArrayPush(pTableUidList, &pKeyInfo->uid);
209

210
      tsdbCacheRelease(lruCache, h);
211

212 213 214 215
      pr->tableIndex += 1;
      if (pResBlock->info.rows >= pResBlock->info.capacity) {
        return TSDB_CODE_SUCCESS;
      }
216 217 218 219 220 221 222
    }
  } else {
    return TSDB_CODE_INVALID_PARA;
  }

  return TSDB_CODE_SUCCESS;
}