tsdbCacheRead.c 15.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "taoserror.h"
#include "tarray.h"
#include "tcommon.h"
#include "tsdb.h"

21
#define HASTYPE(_type, _t) (((_type) & (_t)) == (_t))
22

H
Haojun Liao 已提交
23 24
static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds,
                          void** pRes, const char* idStr) {
25 26
  int32_t numOfRows = pBlock->info.rows;

27
  if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) {
28 29
    bool allNullRow = true;

30 31 32
    for (int32_t i = 0; i < pReader->numOfCols; ++i) {
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
      SFirstLastRes*   p = (SFirstLastRes*)varDataVal(pRes[i]);
33

34 35 36 37 38
      if (slotIds[i] == -1) {  // the primary timestamp
        SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0);
        p->ts = pColVal->ts;
        p->bytes = TSDB_KEYSIZE;
        *(int64_t*)p->buf = pColVal->ts;
39
        allNullRow = false;
40
      } else {
41
        int32_t slotId = slotIds[i];
42
        // add check for null value, caused by the modification of table schema (new column added).
43
        if (slotId >= taosArrayGetSize(pRow)) {
44 45
          p->ts = 0;
          p->isNull = true;
46
          colDataSetNULL(pColInfoData, numOfRows);
47 48 49
          continue;
        }

50 51
        SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);

52 53
        p->ts = pColVal->ts;
        p->isNull = !COL_VAL_IS_VALUE(&pColVal->colVal);
54 55
        allNullRow = p->isNull & allNullRow;

56 57 58 59
        if (!p->isNull) {
          if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
            varDataSetLen(p->buf, pColVal->colVal.value.nData);
            memcpy(varDataVal(p->buf), pColVal->colVal.value.pData, pColVal->colVal.value.nData);
60
            p->bytes = pColVal->colVal.value.nData + VARSTR_HEADER_SIZE;  // binary needs to plus the header size
61 62 63 64 65 66
          } else {
            memcpy(p->buf, &pColVal->colVal.value, pReader->pSchema->columns[slotId].bytes);
            p->bytes = pReader->pSchema->columns[slotId].bytes;
          }
        }
      }
67

68
      // pColInfoData->info.bytes includes the VARSTR_HEADER_SIZE, need to substruct it
69
      p->hasResult = true;
70
      varDataSetLen(pRes[i], pColInfoData->info.bytes - VARSTR_HEADER_SIZE);
71
      colDataSetVal(pColInfoData, numOfRows, (const char*)pRes[i], false);
72
    }
73

74
    pBlock->info.rows += allNullRow ? 0 : 1;
H
Haojun Liao 已提交
75
  } else if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST_ROW)) {
76 77 78 79 80
    for (int32_t i = 0; i < pReader->numOfCols; ++i) {
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);

      if (slotIds[i] == -1) {
        SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0);
81
        colDataSetVal(pColInfoData, numOfRows, (const char*)&pColVal->ts, false);
82
      } else {
83 84 85 86 87 88
        int32_t slotId = slotIds[i];
        // add check for null value, caused by the modification of table schema (new column added).
        if (slotId >= taosArrayGetSize(pRow)) {
          colDataSetNULL(pColInfoData, numOfRows);
          continue;
        }
89
        SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);
90
        SColVal*  pVal = &pColVal->colVal;
91

92
        if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
93
          if (!COL_VAL_IS_VALUE(&pColVal->colVal)) {
94
            colDataSetNULL(pColInfoData, numOfRows);
95
          } else {
96 97
            varDataSetLen(pReader->transferBuf[slotId], pVal->value.nData);
            memcpy(varDataVal(pReader->transferBuf[slotId]), pVal->value.pData, pVal->value.nData);
98
            colDataSetVal(pColInfoData, numOfRows, pReader->transferBuf[slotId], false);
99
          }
100
        } else {
101
          colDataSetVal(pColInfoData, numOfRows, (const char*)&pVal->value.val, !COL_VAL_IS_VALUE(pVal));
102 103 104
        }
      }
    }
105

106
    pBlock->info.rows += 1;
H
Haojun Liao 已提交
107 108 109
  } else {
    tsdbError("invalid retrieve type:%d, %s", pReader->type, idStr);
    return TSDB_CODE_INVALID_PARA;
110
  }
H
Haojun Liao 已提交
111 112

  return TSDB_CODE_SUCCESS;
113 114
}

115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
static int32_t setTableSchema(SCacheRowsReader* p, uint64_t suid, const char* idstr) {
  int32_t numOfTables = p->numOfTables;

  if (suid != 0) {
    p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, suid, -1, 1);
    if (p->pSchema == NULL) {
      tsdbWarn("stable:%" PRIu64 " has been dropped, failed to retrieve cached rows, %s", suid, idstr);
      return TSDB_CODE_PAR_TABLE_NOT_EXIST;
    }
  } else {
    for (int32_t i = 0; i < numOfTables; ++i) {
      uint64_t uid = p->pTableList[i].uid;
      p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, uid, -1, 1);
      if (p->pSchema != NULL) {
        break;
      }

      tsdbWarn("table:%" PRIu64 " has been dropped, failed to retrieve cached rows, %s", uid, idstr);
    }

    // all queried tables have been dropped already, return immediately.
    if (p->pSchema == NULL) {
      tsdbWarn("all queried tables has been dropped, try next group, %s", idstr);
      return TSDB_CODE_PAR_TABLE_NOT_EXIST;
    }
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
145 146 147 148 149 150 151 152 153 154 155 156
int32_t tsdbReuseCacherowsReader(void* reader, void* pTableIdList, int32_t numOfTables) {
  SCacheRowsReader* pReader = (SCacheRowsReader*)reader;

  pReader->pTableList = pTableIdList;
  pReader->numOfTables = numOfTables;
  pReader->lastTs = INT64_MIN;

  resetLastBlockLoadInfo(pReader->pLoadInfo);

  return TSDB_CODE_SUCCESS;
}

157
int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols,
H
Haojun Liao 已提交
158
                                uint64_t suid, void** pReader, const char* idstr) {
159 160
  *pReader = NULL;
  SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader));
161 162 163 164
  if (p == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

165 166
  p->type = type;
  p->pVnode = pVnode;
167 168
  p->pTsdb = p->pVnode->pTsdb;
  p->verRange = (SVersionRange){.minVer = 0, .maxVer = UINT64_MAX};
169
  p->numOfCols = numOfCols;
170
  p->suid = suid;
171

172
  if (numOfTables == 0) {
173 174 175 176
    *pReader = p;
    return TSDB_CODE_SUCCESS;
  }

177
  p->pTableList = pTableIdList;
178
  p->numOfTables = numOfTables;
179

180 181 182 183 184 185
  int32_t code = setTableSchema(p, suid, idstr);
  if (code != TSDB_CODE_SUCCESS) {
    tsdbCacherowsReaderClose(p);
    return code;
  }

186
  p->transferBuf = taosMemoryCalloc(p->pSchema->numOfCols, POINTER_BYTES);
187
  if (p->transferBuf == NULL) {
H
Haojun Liao 已提交
188
    tsdbCacherowsReaderClose(p);
H
Haojun Liao 已提交
189
    return TSDB_CODE_OUT_OF_MEMORY;
190 191
  }

192
  for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) {
M
Minglei Jin 已提交
193
    if (IS_VAR_DATA_TYPE(p->pSchema->columns[i].type)) {
194
      p->transferBuf[i] = taosMemoryMalloc(p->pSchema->columns[i].bytes);
195 196 197 198
      if (p->transferBuf[i] == NULL) {
        tsdbCacherowsReaderClose(p);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
199 200
    }
  }
201

202 203 204
  SVnodeCfg* pCfg = &((SVnode*)pVnode)->config;

  int32_t numOfStt = pCfg->sttTrigger;
205
  p->pLoadInfo = tCreateLastBlockLoadInfo(p->pSchema, NULL, 0, numOfStt);
206 207 208 209 210
  if (p->pLoadInfo == NULL) {
    tsdbCacherowsReaderClose(p);
    return TSDB_CODE_OUT_OF_MEMORY;
  }

211 212 213 214 215 216
  p->pDataIter = taosMemoryCalloc(pCfg->sttTrigger, sizeof(SLDataIter));
  if (p->pDataIter == NULL) {
    tsdbCacherowsReaderClose(p);
    return TSDB_CODE_OUT_OF_MEMORY;
  }

217
  p->idstr = taosStrdup(idstr);
218 219
  taosThreadMutexInit(&p->readerMutex, NULL);

220 221
  p->lastTs = INT64_MIN;

222 223 224 225
  *pReader = p;
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
226
void* tsdbCacherowsReaderClose(void* pReader) {
227
  SCacheRowsReader* p = pReader;
228

229 230 231 232 233 234 235
  if (p->pSchema != NULL) {
    for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) {
      taosMemoryFreeClear(p->transferBuf[i]);
    }

    taosMemoryFree(p->transferBuf);
    taosMemoryFree(p->pSchema);
236 237
  }

238
  taosMemoryFreeClear(p->pDataIter);
239 240
  taosMemoryFree(p->pCurrSchema);

241 242
  destroyLastBlockLoadInfo(p->pLoadInfo);

243
  taosMemoryFree((void*)p->idstr);
244 245
  taosThreadMutexDestroy(&p->readerMutex);

246
  taosMemoryFree(pReader);
H
Haojun Liao 已提交
247
  return NULL;
248 249
}

H
Haojun Liao 已提交
250
static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint64_t uid, SArray** pRow,
H
Haojun Liao 已提交
251
                                 LRUHandle** h) {
252
  int32_t code = TSDB_CODE_SUCCESS;
253
  *pRow = NULL;
254

H
Haojun Liao 已提交
255
  if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST_ROW)) {
256
    code = tsdbCacheGetLastrowH(lruCache, uid, pr, h);
257
  } else {
258
    code = tsdbCacheGetLastH(lruCache, uid, pr, h);
259
  }
260

261 262 263 264 265 266 267
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  // no data in the table of Uid
  if (*h != NULL) {
    *pRow = (SArray*)taosLRUCacheValue(lruCache, *h);
268 269 270 271 272
  }

  return code;
}

273
static void freeItem(void* pItem) {
274
  SLastCol* pCol = (SLastCol*)pItem;
275 276 277 278 279
  if (IS_VAR_DATA_TYPE(pCol->colVal.type)) {
    taosMemoryFree(pCol->colVal.value.pData);
  }
}

280
static int32_t tsdbCacheQueryReseek(void* pQHandle) {
281 282 283
  int32_t           code = 0;
  SCacheRowsReader* pReader = pQHandle;

284 285 286 287
  code = taosThreadMutexTryLock(&pReader->readerMutex);
  if (code == 0) {
    // pause current reader's state if not paused, save ts & version for resuming
    // just wait for the big all tables' snapshot untaking for now
288

289
    code = TSDB_CODE_VND_QUERY_BUSY;
290

291 292 293 294 295 296 297 298
    taosThreadMutexUnlock(&pReader->readerMutex);

    return code;
  } else if (code == EBUSY) {
    return TSDB_CODE_VND_QUERY_BUSY;
  } else {
    return -1;
  }
299 300
}

301
int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds, SArray* pTableUidList) {
302
  if (pReader == NULL || pResBlock == NULL) {
303 304 305
    return TSDB_CODE_INVALID_PARA;
  }

306
  SCacheRowsReader* pr = pReader;
307

308
  int32_t    code = TSDB_CODE_SUCCESS;
309
  SLRUCache* lruCache = pr->pVnode->pTsdb->lruCache;
310
  LRUHandle* h = NULL;
H
Haojun Liao 已提交
311
  SArray*    pRow = NULL;
312
  bool       hasRes = false;
313
  SArray*    pLastCols = NULL;
314

315
  void** pRes = taosMemoryCalloc(pr->numOfCols, POINTER_BYTES);
316 317 318 319 320
  if (pRes == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
  }

321
  for (int32_t j = 0; j < pr->numOfCols; ++j) {
X
Xiaoyu Wang 已提交
322 323
    pRes[j] = taosMemoryCalloc(
        1, sizeof(SFirstLastRes) + pr->pSchema->columns[-1 == slotIds[j] ? 0 : slotIds[j]].bytes + VARSTR_HEADER_SIZE);
324 325 326 327
    SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[j]);
    p->ts = INT64_MIN;
  }

328 329 330 331 332 333
  pLastCols = taosArrayInit(pr->pSchema->numOfCols, sizeof(SLastCol));
  if (pLastCols == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
  }

334
  for (int32_t i = 0; i < pr->pSchema->numOfCols; ++i) {
335
    struct STColumn* pCol = &pr->pSchema->columns[i];
336
    SLastCol         p = {.ts = INT64_MIN, .colVal.type = pCol->type, .colVal.flag = CV_FLAG_NULL};
337 338 339 340 341

    if (IS_VAR_DATA_TYPE(pCol->type)) {
      p.colVal.value.pData = taosMemoryCalloc(pCol->bytes, sizeof(char));
    }
    taosArrayPush(pLastCols, &p);
H
Haojun Liao 已提交
342
  }
343

344
  taosThreadMutexLock(&pr->readerMutex);
X
Xiaoyu Wang 已提交
345
  code = tsdbTakeReadSnap((STsdbReader*)pr, tsdbCacheQueryReseek, &pr->pReadSnap);
K
kailixu 已提交
346 347 348
  if (code != TSDB_CODE_SUCCESS) {
    goto _end;
  }
349
  pr->pDataFReader = NULL;
350
  pr->pDataFReaderLast = NULL;
351

352
  // retrieve the only one last row of all tables in the uid list.
H
Haojun Liao 已提交
353
  if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_SINGLE)) {
354
    int64_t st = taosGetTimestampUs();
X
Xiaoyu Wang 已提交
355
    int64_t totalLastTs = INT64_MAX;
356 357
    for (int32_t i = 0; i < pr->numOfTables; ++i) {
      STableKeyInfo* pKeyInfo = &pr->pTableList[i];
358

359
      code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
H
Haojun Liao 已提交
360
      if (code != TSDB_CODE_SUCCESS) {
K
kailixu 已提交
361
        goto _end;
362 363
      }

364
      if (h == NULL) {
365 366
        continue;
      }
367 368
      if (taosArrayGetSize(pRow) <= 0) {
        tsdbCacheRelease(lruCache, h);
369 370 371
        continue;
      }

H
Haojun Liao 已提交
372
      {
373
        bool    hasNotNullRow = true;
X
Xiaoyu Wang 已提交
374
        int64_t singleTableLastTs = INT64_MAX;
H
Haojun Liao 已提交
375
        for (int32_t k = 0; k < pr->numOfCols; ++k) {
376 377 378 379 380
          int32_t slotId = slotIds[k];

          if (slotId == -1) {  // the primary timestamp
            SLastCol* p = taosArrayGet(pLastCols, 0);
            SLastCol* pCol = (SLastCol*)taosArrayGet(pRow, 0);
381
            if (pCol->ts > p->ts) {
382
              hasRes = true;
383 384
              p->ts = pCol->ts;
              p->colVal = pCol->colVal;
X
Xiaoyu Wang 已提交
385
              singleTableLastTs = pCol->ts;
H
Haojun Liao 已提交
386 387 388 389 390 391 392 393 394

              // only set value for last row query
              if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST_ROW)) {
                if (taosArrayGetSize(pTableUidList) == 0) {
                  taosArrayPush(pTableUidList, &pKeyInfo->uid);
                } else {
                  taosArraySet(pTableUidList, 0, &pKeyInfo->uid);
                }
              }
H
Haojun Liao 已提交
395 396
            }
          } else {
397
            SLastCol* p = taosArrayGet(pLastCols, slotId);
H
Haojun Liao 已提交
398 399
            SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);

400
            if (pColVal->ts > p->ts) {
401
              if (!COL_VAL_IS_VALUE(&pColVal->colVal) && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) {
402 403 404
                if (!COL_VAL_IS_VALUE(&p->colVal)) {
                  hasNotNullRow = false;
                }
405 406 407
                continue;
              }

408
              hasRes = true;
409
              p->ts = pColVal->ts;
X
Xiaoyu Wang 已提交
410 411
              if (pColVal->ts < singleTableLastTs && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) {
                singleTableLastTs = pColVal->ts;
412
              }
H
Haojun Liao 已提交
413

H
Haojun Liao 已提交
414 415 416 417 418 419
              if (!IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
                p->colVal = pColVal->colVal;
              } else {
                if (COL_VAL_IS_VALUE(&pColVal->colVal)) {
                  memcpy(p->colVal.value.pData, pColVal->colVal.value.pData, pColVal->colVal.value.nData);
                }
H
Haojun Liao 已提交
420

H
Haojun Liao 已提交
421 422 423 424
                p->colVal.value.nData = pColVal->colVal.value.nData;
                p->colVal.type = pColVal->colVal.type;
                p->colVal.flag = pColVal->colVal.flag;
                p->colVal.cid = pColVal->colVal.cid;
H
Haojun Liao 已提交
425 426 427
              }
            }
          }
428
        }
429 430

        if (hasNotNullRow) {
X
Xiaoyu Wang 已提交
431
          if (INT64_MAX == totalLastTs || (INT64_MAX != singleTableLastTs && totalLastTs < singleTableLastTs)) {
X
Xiaoyu Wang 已提交
432
            totalLastTs = singleTableLastTs;
X
Xiaoyu Wang 已提交
433
          }
434 435
          double cost = (taosGetTimestampUs() - st) / 1000.0;
          if (cost > tsCacheLazyLoadThreshold) {
X
Xiaoyu Wang 已提交
436
            pr->lastTs = totalLastTs;
437
          }
438
        }
439
      }
440

441
      tsdbCacheRelease(lruCache, h);
442
    }
443 444

    if (hasRes) {
H
Haojun Liao 已提交
445
      saveOneRow(pLastCols, pResBlock, pr, slotIds, pRes, pr->idstr);
446
    }
H
Haojun Liao 已提交
447
  } else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) {
448 449
    for (int32_t i = pr->tableIndex; i < pr->numOfTables; ++i) {
      STableKeyInfo* pKeyInfo = &pr->pTableList[i];
450
      code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
H
Haojun Liao 已提交
451
      if (code != TSDB_CODE_SUCCESS) {
K
kailixu 已提交
452
        goto _end;
453 454
      }

455
      if (h == NULL) {
456 457
        continue;
      }
458 459
      if (taosArrayGetSize(pRow) <= 0) {
        tsdbCacheRelease(lruCache, h);
460 461 462
        continue;
      }

H
Haojun Liao 已提交
463
      saveOneRow(pRow, pResBlock, pr, slotIds, pRes, pr->idstr);
464
      // TODO reset the pRes
465

466
      taosArrayPush(pTableUidList, &pKeyInfo->uid);
467
      tsdbCacheRelease(lruCache, h);
468

469 470
      pr->tableIndex += 1;
      if (pResBlock->info.rows >= pResBlock->info.capacity) {
471
        goto _end;
472
      }
473 474
    }
  } else {
475
    code = TSDB_CODE_INVALID_PARA;
476 477
  }

478
_end:
479
  tsdbDataFReaderClose(&pr->pDataFReaderLast);
480 481
  tsdbDataFReaderClose(&pr->pDataFReader);

482
  resetLastBlockLoadInfo(pr->pLoadInfo);
483
  tsdbUntakeReadSnap((STsdbReader*)pr, pr->pReadSnap, true);
484
  taosThreadMutexUnlock(&pr->readerMutex);
485

H
Haojun Liao 已提交
486 487 488 489
  if (pRes != NULL) {
    for (int32_t j = 0; j < pr->numOfCols; ++j) {
      taosMemoryFree(pRes[j]);
    }
490 491 492
  }

  taosMemoryFree(pRes);
493
  taosArrayDestroyEx(pLastCols, freeItem);
494
  return code;
495
}