tsdbCacheRead.c 12.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "taoserror.h"
#include "tarray.h"
#include "tcommon.h"
#include "tsdb.h"

21
#define HASTYPE(_type, _t) (((_type) & (_t)) == (_t))
22

H
Haojun Liao 已提交
23 24
static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds,
                          void** pRes, const char* idStr) {
25 26
  int32_t numOfRows = pBlock->info.rows;

27
  if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) {
28 29
    bool allNullRow = true;

30 31 32
    for (int32_t i = 0; i < pReader->numOfCols; ++i) {
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
      SFirstLastRes*   p = (SFirstLastRes*)varDataVal(pRes[i]);
33

34 35 36 37 38
      if (slotIds[i] == -1) {  // the primary timestamp
        SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0);
        p->ts = pColVal->ts;
        p->bytes = TSDB_KEYSIZE;
        *(int64_t*)p->buf = pColVal->ts;
39
        allNullRow = false;
40 41 42 43 44 45
      } else {
        int32_t   slotId = slotIds[i];
        SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);

        p->ts = pColVal->ts;
        p->isNull = !COL_VAL_IS_VALUE(&pColVal->colVal);
46 47
        allNullRow = p->isNull & allNullRow;

48 49 50 51
        if (!p->isNull) {
          if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
            varDataSetLen(p->buf, pColVal->colVal.value.nData);
            memcpy(varDataVal(p->buf), pColVal->colVal.value.pData, pColVal->colVal.value.nData);
52
            p->bytes = pColVal->colVal.value.nData + VARSTR_HEADER_SIZE;  // binary needs to plus the header size
53 54 55 56 57 58
          } else {
            memcpy(p->buf, &pColVal->colVal.value, pReader->pSchema->columns[slotId].bytes);
            p->bytes = pReader->pSchema->columns[slotId].bytes;
          }
        }
      }
59

60
      // pColInfoData->info.bytes includes the VARSTR_HEADER_SIZE, need to substruct it
61
      p->hasResult = true;
62
      varDataSetLen(pRes[i], pColInfoData->info.bytes - VARSTR_HEADER_SIZE);
63 64
      colDataAppend(pColInfoData, numOfRows, (const char*)pRes[i], false);
    }
65

66
    pBlock->info.rows += allNullRow ? 0 : 1;
H
Haojun Liao 已提交
67
  } else if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST_ROW)) {
68 69 70 71 72 73 74 75 76
    for (int32_t i = 0; i < pReader->numOfCols; ++i) {
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);

      if (slotIds[i] == -1) {
        SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0);
        colDataAppend(pColInfoData, numOfRows, (const char*)&pColVal->ts, false);
      } else {
        int32_t   slotId = slotIds[i];
        SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);
77
        SColVal*  pVal = &pColVal->colVal;
78

79
        if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
80 81 82
          if (!COL_VAL_IS_VALUE(&pColVal->colVal)) {
            colDataAppendNULL(pColInfoData, numOfRows);
          } else {
83 84
            varDataSetLen(pReader->transferBuf[slotId], pVal->value.nData);
            memcpy(varDataVal(pReader->transferBuf[slotId]), pVal->value.pData, pVal->value.nData);
85 86
            colDataAppend(pColInfoData, numOfRows, pReader->transferBuf[slotId], false);
          }
87
        } else {
88
          colDataAppend(pColInfoData, numOfRows, (const char*)&pVal->value.val, !COL_VAL_IS_VALUE(pVal));
89 90 91
        }
      }
    }
92

93
    pBlock->info.rows += 1;
H
Haojun Liao 已提交
94 95 96
  } else {
    tsdbError("invalid retrieve type:%d, %s", pReader->type, idStr);
    return TSDB_CODE_INVALID_PARA;
97
  }
H
Haojun Liao 已提交
98 99

  return TSDB_CODE_SUCCESS;
100 101
}

102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
static int32_t setTableSchema(SCacheRowsReader* p, uint64_t suid, const char* idstr) {
  int32_t numOfTables = p->numOfTables;

  if (suid != 0) {
    p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, suid, -1, 1);
    if (p->pSchema == NULL) {
      taosMemoryFree(p);
      tsdbWarn("stable:%" PRIu64 " has been dropped, failed to retrieve cached rows, %s", suid, idstr);
      return TSDB_CODE_PAR_TABLE_NOT_EXIST;
    }
  } else {
    for (int32_t i = 0; i < numOfTables; ++i) {
      uint64_t uid = p->pTableList[i].uid;
      p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, uid, -1, 1);
      if (p->pSchema != NULL) {
        break;
      }

      tsdbWarn("table:%" PRIu64 " has been dropped, failed to retrieve cached rows, %s", uid, idstr);
    }

    // all queried tables have been dropped already, return immediately.
    if (p->pSchema == NULL) {
      taosMemoryFree(p);
      tsdbWarn("all queried tables has been dropped, try next group, %s", idstr);
      return TSDB_CODE_PAR_TABLE_NOT_EXIST;
    }
  }

  return TSDB_CODE_SUCCESS;
}

134
int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols,
H
Haojun Liao 已提交
135
                                uint64_t suid, void** pReader, const char* idstr) {
136 137
  *pReader = NULL;
  SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader));
138 139 140 141
  if (p == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

142 143 144
  p->type = type;
  p->pVnode = pVnode;
  p->numOfCols = numOfCols;
145
  p->suid = suid;
146

147
  if (numOfTables == 0) {
148 149 150 151
    *pReader = p;
    return TSDB_CODE_SUCCESS;
  }

152
  p->pTableList = pTableIdList;
153
  p->numOfTables = numOfTables;
154

155 156 157 158 159 160
  int32_t code = setTableSchema(p, suid, idstr);
  if (code != TSDB_CODE_SUCCESS) {
    tsdbCacherowsReaderClose(p);
    return code;
  }

161
  p->transferBuf = taosMemoryCalloc(p->pSchema->numOfCols, POINTER_BYTES);
162
  if (p->transferBuf == NULL) {
H
Haojun Liao 已提交
163
    tsdbCacherowsReaderClose(p);
H
Haojun Liao 已提交
164
    return TSDB_CODE_OUT_OF_MEMORY;
165 166
  }

167
  for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) {
M
Minglei Jin 已提交
168
    if (IS_VAR_DATA_TYPE(p->pSchema->columns[i].type)) {
169
      p->transferBuf[i] = taosMemoryMalloc(p->pSchema->columns[i].bytes);
170 171 172 173
      if (p->transferBuf[i] == NULL) {
        tsdbCacherowsReaderClose(p);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
174 175
    }
  }
176

177 178 179 180 181 182
  p->pLoadInfo = tCreateLastBlockLoadInfo(p->pSchema, NULL, 0);
  if (p->pLoadInfo == NULL) {
    tsdbCacherowsReaderClose(p);
    return TSDB_CODE_OUT_OF_MEMORY;
  }

H
Haojun Liao 已提交
183 184
  p->idstr = taosMemoryStrDup(idstr);

185 186 187 188
  *pReader = p;
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
189
void* tsdbCacherowsReaderClose(void* pReader) {
190
  SCacheRowsReader* p = pReader;
191

192 193 194 195 196 197 198
  if (p->pSchema != NULL) {
    for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) {
      taosMemoryFreeClear(p->transferBuf[i]);
    }

    taosMemoryFree(p->transferBuf);
    taosMemoryFree(p->pSchema);
199 200
  }

201 202
  destroyLastBlockLoadInfo(p->pLoadInfo);

K
kailixu 已提交
203
  taosMemoryFree((void*)p->idstr);
204
  taosMemoryFree(pReader);
H
Haojun Liao 已提交
205
  return NULL;
206 207
}

H
Haojun Liao 已提交
208
static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint64_t uid, SArray** pRow,
H
Haojun Liao 已提交
209
                                 LRUHandle** h) {
210
  int32_t code = TSDB_CODE_SUCCESS;
211
  *pRow = NULL;
212

H
Haojun Liao 已提交
213
  if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST_ROW)) {
214
    code = tsdbCacheGetLastrowH(lruCache, uid, pr, h);
215
  } else {
216
    code = tsdbCacheGetLastH(lruCache, uid, pr, h);
217
  }
218

219 220 221 222 223 224 225
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  // no data in the table of Uid
  if (*h != NULL) {
    *pRow = (SArray*)taosLRUCacheValue(lruCache, *h);
226 227 228 229 230
  }

  return code;
}

231
static void freeItem(void* pItem) {
232
  SLastCol* pCol = (SLastCol*)pItem;
233 234 235 236 237
  if (IS_VAR_DATA_TYPE(pCol->colVal.type)) {
    taosMemoryFree(pCol->colVal.value.pData);
  }
}

238
int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds, SArray* pTableUidList) {
239
  if (pReader == NULL || pResBlock == NULL) {
240 241 242
    return TSDB_CODE_INVALID_PARA;
  }

243
  SCacheRowsReader* pr = pReader;
244

245
  int32_t    code = TSDB_CODE_SUCCESS;
246
  SLRUCache* lruCache = pr->pVnode->pTsdb->lruCache;
247
  LRUHandle* h = NULL;
H
Haojun Liao 已提交
248
  SArray*    pRow = NULL;
249
  bool       hasRes = false;
250
  SArray*    pLastCols = NULL;
251

252
  void** pRes = taosMemoryCalloc(pr->numOfCols, POINTER_BYTES);
253 254 255 256 257
  if (pRes == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
  }

258
  for (int32_t j = 0; j < pr->numOfCols; ++j) {
259 260 261 262 263
    pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + pr->pSchema->columns[slotIds[j]].bytes + VARSTR_HEADER_SIZE);
    SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[j]);
    p->ts = INT64_MIN;
  }

264 265 266 267 268 269
  pLastCols = taosArrayInit(pr->pSchema->numOfCols, sizeof(SLastCol));
  if (pLastCols == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
  }

270
  for (int32_t i = 0; i < pr->pSchema->numOfCols; ++i) {
271
    struct STColumn* pCol = &pr->pSchema->columns[i];
272
    SLastCol         p = {.ts = INT64_MIN, .colVal.type = pCol->type};
273 274 275 276 277

    if (IS_VAR_DATA_TYPE(pCol->type)) {
      p.colVal.value.pData = taosMemoryCalloc(pCol->bytes, sizeof(char));
    }
    taosArrayPush(pLastCols, &p);
H
Haojun Liao 已提交
278
  }
279

K
kailixu 已提交
280 281 282 283 284
  code = tsdbTakeReadSnap(pr->pVnode->pTsdb, &pr->pReadSnap, "cache-l");
  if (code != TSDB_CODE_SUCCESS) {
    goto _end;
  }

285
  pr->pDataFReader = NULL;
286
  pr->pDataFReaderLast = NULL;
287

288
  // retrieve the only one last row of all tables in the uid list.
H
Haojun Liao 已提交
289
  if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_SINGLE)) {
290 291
    for (int32_t i = 0; i < pr->numOfTables; ++i) {
      STableKeyInfo* pKeyInfo = &pr->pTableList[i];
292

293
      code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
H
Haojun Liao 已提交
294
      if (code != TSDB_CODE_SUCCESS) {
K
kailixu 已提交
295
        goto _end;
296 297
      }

298
      if (h == NULL) {
299 300 301
        continue;
      }

H
Haojun Liao 已提交
302 303
      {
        for (int32_t k = 0; k < pr->numOfCols; ++k) {
304 305 306 307 308
          int32_t slotId = slotIds[k];

          if (slotId == -1) {  // the primary timestamp
            SLastCol* p = taosArrayGet(pLastCols, 0);
            SLastCol* pCol = (SLastCol*)taosArrayGet(pRow, 0);
309
            if (pCol->ts > p->ts) {
310
              hasRes = true;
311 312
              p->ts = pCol->ts;
              p->colVal = pCol->colVal;
H
Haojun Liao 已提交
313 314 315 316 317 318 319 320 321

              // only set value for last row query
              if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST_ROW)) {
                if (taosArrayGetSize(pTableUidList) == 0) {
                  taosArrayPush(pTableUidList, &pKeyInfo->uid);
                } else {
                  taosArraySet(pTableUidList, 0, &pKeyInfo->uid);
                }
              }
H
Haojun Liao 已提交
322 323
            }
          } else {
324
            SLastCol* p = taosArrayGet(pLastCols, slotId);
H
Haojun Liao 已提交
325 326
            SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);

327
            if (pColVal->ts > p->ts) {
328 329 330 331
              if (!COL_VAL_IS_VALUE(&pColVal->colVal) && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) {
                continue;
              }

332
              hasRes = true;
333
              p->ts = pColVal->ts;
H
Haojun Liao 已提交
334

H
Haojun Liao 已提交
335 336 337 338 339 340
              if (!IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
                p->colVal = pColVal->colVal;
              } else {
                if (COL_VAL_IS_VALUE(&pColVal->colVal)) {
                  memcpy(p->colVal.value.pData, pColVal->colVal.value.pData, pColVal->colVal.value.nData);
                }
H
Haojun Liao 已提交
341

H
Haojun Liao 已提交
342 343 344 345
                p->colVal.value.nData = pColVal->colVal.value.nData;
                p->colVal.type = pColVal->colVal.type;
                p->colVal.flag = pColVal->colVal.flag;
                p->colVal.cid = pColVal->colVal.cid;
H
Haojun Liao 已提交
346 347 348
              }
            }
          }
349 350
        }
      }
351

352
      tsdbCacheRelease(lruCache, h);
353
    }
354 355

    if (hasRes) {
H
Haojun Liao 已提交
356
      saveOneRow(pLastCols, pResBlock, pr, slotIds, pRes, pr->idstr);
357 358
    }

H
Haojun Liao 已提交
359
  } else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) {
360 361
    for (int32_t i = pr->tableIndex; i < pr->numOfTables; ++i) {
      STableKeyInfo* pKeyInfo = &pr->pTableList[i];
362
      code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
H
Haojun Liao 已提交
363
      if (code != TSDB_CODE_SUCCESS) {
K
kailixu 已提交
364
        goto _end;
365 366
      }

367
      if (h == NULL) {
368 369 370
        continue;
      }

H
Haojun Liao 已提交
371
      saveOneRow(pRow, pResBlock, pr, slotIds, pRes, pr->idstr);
372
      // TODO reset the pRes
373

374
      taosArrayPush(pTableUidList, &pKeyInfo->uid);
375
      tsdbCacheRelease(lruCache, h);
376

377 378
      pr->tableIndex += 1;
      if (pResBlock->info.rows >= pResBlock->info.capacity) {
379
        goto _end;
380
      }
381 382
    }
  } else {
383
    code = TSDB_CODE_INVALID_PARA;
384 385
  }

386
_end:
387
  tsdbDataFReaderClose(&pr->pDataFReaderLast);
388 389 390
  tsdbDataFReaderClose(&pr->pDataFReader);

  tsdbUntakeReadSnap(pr->pVnode->pTsdb, pr->pReadSnap, "cache-l");
391
  resetLastBlockLoadInfo(pr->pLoadInfo);
392

393 394 395 396 397
  for (int32_t j = 0; j < pr->numOfCols; ++j) {
    taosMemoryFree(pRes[j]);
  }

  taosMemoryFree(pRes);
398
  taosArrayDestroyEx(pLastCols, freeItem);
399
  return code;
400
}