tsdbCacheRead.c 14.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "taoserror.h"
#include "tarray.h"
#include "tcommon.h"
#include "tsdb.h"

21
#define HASTYPE(_type, _t) (((_type) & (_t)) == (_t))
22

H
Haojun Liao 已提交
23 24
static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds,
                          void** pRes, const char* idStr) {
25 26
  int32_t numOfRows = pBlock->info.rows;

27
  if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) {
28 29
    bool allNullRow = true;

30 31 32
    for (int32_t i = 0; i < pReader->numOfCols; ++i) {
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
      SFirstLastRes*   p = (SFirstLastRes*)varDataVal(pRes[i]);
33

34 35 36 37 38
      if (slotIds[i] == -1) {  // the primary timestamp
        SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0);
        p->ts = pColVal->ts;
        p->bytes = TSDB_KEYSIZE;
        *(int64_t*)p->buf = pColVal->ts;
39
        allNullRow = false;
40
      } else {
41
        int32_t slotId = slotIds[i];
42
        // add check for null value, caused by the modification of table schema (new column added).
43
        if (slotId >= taosArrayGetSize(pRow)) {
44 45
          p->ts = 0;
          p->isNull = true;
46
          colDataSetNULL(pColInfoData, numOfRows);
47 48 49
          continue;
        }

50 51
        SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);

52 53
        p->ts = pColVal->ts;
        p->isNull = !COL_VAL_IS_VALUE(&pColVal->colVal);
54 55
        allNullRow = p->isNull & allNullRow;

56 57 58 59
        if (!p->isNull) {
          if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
            varDataSetLen(p->buf, pColVal->colVal.value.nData);
            memcpy(varDataVal(p->buf), pColVal->colVal.value.pData, pColVal->colVal.value.nData);
60
            p->bytes = pColVal->colVal.value.nData + VARSTR_HEADER_SIZE;  // binary needs to plus the header size
61 62 63 64 65 66
          } else {
            memcpy(p->buf, &pColVal->colVal.value, pReader->pSchema->columns[slotId].bytes);
            p->bytes = pReader->pSchema->columns[slotId].bytes;
          }
        }
      }
67

68
      // pColInfoData->info.bytes includes the VARSTR_HEADER_SIZE, need to substruct it
69
      p->hasResult = true;
70
      varDataSetLen(pRes[i], pColInfoData->info.bytes - VARSTR_HEADER_SIZE);
71
      colDataSetVal(pColInfoData, numOfRows, (const char*)pRes[i], false);
72
    }
73

74
    pBlock->info.rows += allNullRow ? 0 : 1;
H
Haojun Liao 已提交
75
  } else if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST_ROW)) {
76 77 78 79 80
    for (int32_t i = 0; i < pReader->numOfCols; ++i) {
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);

      if (slotIds[i] == -1) {
        SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0);
81
        colDataSetVal(pColInfoData, numOfRows, (const char*)&pColVal->ts, false);
82
      } else {
83 84 85 86 87 88
        int32_t slotId = slotIds[i];
        // add check for null value, caused by the modification of table schema (new column added).
        if (slotId >= taosArrayGetSize(pRow)) {
          colDataSetNULL(pColInfoData, numOfRows);
          continue;
        }
89
        SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);
90
        SColVal*  pVal = &pColVal->colVal;
91

92
        if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
93
          if (!COL_VAL_IS_VALUE(&pColVal->colVal)) {
94
            colDataSetNULL(pColInfoData, numOfRows);
95
          } else {
96 97
            varDataSetLen(pReader->transferBuf[slotId], pVal->value.nData);
            memcpy(varDataVal(pReader->transferBuf[slotId]), pVal->value.pData, pVal->value.nData);
98
            colDataSetVal(pColInfoData, numOfRows, pReader->transferBuf[slotId], false);
99
          }
100
        } else {
101
          colDataSetVal(pColInfoData, numOfRows, (const char*)&pVal->value.val, !COL_VAL_IS_VALUE(pVal));
102 103 104
        }
      }
    }
105

106
    pBlock->info.rows += 1;
H
Haojun Liao 已提交
107 108 109
  } else {
    tsdbError("invalid retrieve type:%d, %s", pReader->type, idStr);
    return TSDB_CODE_INVALID_PARA;
110
  }
H
Haojun Liao 已提交
111 112

  return TSDB_CODE_SUCCESS;
113 114
}

115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
static int32_t setTableSchema(SCacheRowsReader* p, uint64_t suid, const char* idstr) {
  int32_t numOfTables = p->numOfTables;

  if (suid != 0) {
    p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, suid, -1, 1);
    if (p->pSchema == NULL) {
      tsdbWarn("stable:%" PRIu64 " has been dropped, failed to retrieve cached rows, %s", suid, idstr);
      return TSDB_CODE_PAR_TABLE_NOT_EXIST;
    }
  } else {
    for (int32_t i = 0; i < numOfTables; ++i) {
      uint64_t uid = p->pTableList[i].uid;
      p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, uid, -1, 1);
      if (p->pSchema != NULL) {
        break;
      }

      tsdbWarn("table:%" PRIu64 " has been dropped, failed to retrieve cached rows, %s", uid, idstr);
    }

    // all queried tables have been dropped already, return immediately.
    if (p->pSchema == NULL) {
      tsdbWarn("all queried tables has been dropped, try next group, %s", idstr);
      return TSDB_CODE_PAR_TABLE_NOT_EXIST;
    }
  }

  return TSDB_CODE_SUCCESS;
}

145
int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols,
H
Haojun Liao 已提交
146
                                uint64_t suid, void** pReader, const char* idstr) {
147 148
  *pReader = NULL;
  SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader));
149 150 151 152
  if (p == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

153 154
  p->type = type;
  p->pVnode = pVnode;
155 156
  p->pTsdb = p->pVnode->pTsdb;
  p->verRange = (SVersionRange){.minVer = 0, .maxVer = UINT64_MAX};
157
  p->numOfCols = numOfCols;
158
  p->suid = suid;
159

160
  if (numOfTables == 0) {
161 162 163 164
    *pReader = p;
    return TSDB_CODE_SUCCESS;
  }

165
  p->pTableList = pTableIdList;
166
  p->numOfTables = numOfTables;
167

168 169 170 171 172 173
  int32_t code = setTableSchema(p, suid, idstr);
  if (code != TSDB_CODE_SUCCESS) {
    tsdbCacherowsReaderClose(p);
    return code;
  }

174
  p->transferBuf = taosMemoryCalloc(p->pSchema->numOfCols, POINTER_BYTES);
175
  if (p->transferBuf == NULL) {
H
Haojun Liao 已提交
176
    tsdbCacherowsReaderClose(p);
H
Haojun Liao 已提交
177
    return TSDB_CODE_OUT_OF_MEMORY;
178 179
  }

180
  for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) {
M
Minglei Jin 已提交
181
    if (IS_VAR_DATA_TYPE(p->pSchema->columns[i].type)) {
182
      p->transferBuf[i] = taosMemoryMalloc(p->pSchema->columns[i].bytes);
183 184 185 186
      if (p->transferBuf[i] == NULL) {
        tsdbCacherowsReaderClose(p);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
187 188
    }
  }
189

190 191
  int32_t numOfStt = ((SVnode*)pVnode)->config.sttTrigger;
  p->pLoadInfo = tCreateLastBlockLoadInfo(p->pSchema, NULL, 0, numOfStt);
192 193 194 195 196
  if (p->pLoadInfo == NULL) {
    tsdbCacherowsReaderClose(p);
    return TSDB_CODE_OUT_OF_MEMORY;
  }

197
  p->idstr = taosStrdup(idstr);
198 199
  taosThreadMutexInit(&p->readerMutex, NULL);

200 201
  p->lastTs = INT64_MIN;

202 203 204 205
  *pReader = p;
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
206
void* tsdbCacherowsReaderClose(void* pReader) {
207
  SCacheRowsReader* p = pReader;
208

209 210 211 212 213 214 215
  if (p->pSchema != NULL) {
    for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) {
      taosMemoryFreeClear(p->transferBuf[i]);
    }

    taosMemoryFree(p->transferBuf);
    taosMemoryFree(p->pSchema);
216 217
  }

218 219
  taosMemoryFree(p->pCurrSchema);

220 221
  destroyLastBlockLoadInfo(p->pLoadInfo);

222
  taosMemoryFree((void*)p->idstr);
223 224
  taosThreadMutexDestroy(&p->readerMutex);

225
  taosMemoryFree(pReader);
H
Haojun Liao 已提交
226
  return NULL;
227 228
}

H
Haojun Liao 已提交
229
static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint64_t uid, SArray** pRow,
H
Haojun Liao 已提交
230
                                 LRUHandle** h) {
231
  int32_t code = TSDB_CODE_SUCCESS;
232
  *pRow = NULL;
233

H
Haojun Liao 已提交
234
  if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST_ROW)) {
235
    code = tsdbCacheGetLastrowH(lruCache, uid, pr, h);
236
  } else {
237
    code = tsdbCacheGetLastH(lruCache, uid, pr, h);
238
  }
239

240 241 242 243 244 245 246
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  // no data in the table of Uid
  if (*h != NULL) {
    *pRow = (SArray*)taosLRUCacheValue(lruCache, *h);
247 248 249 250 251
  }

  return code;
}

252
static void freeItem(void* pItem) {
253
  SLastCol* pCol = (SLastCol*)pItem;
254 255 256 257 258
  if (IS_VAR_DATA_TYPE(pCol->colVal.type)) {
    taosMemoryFree(pCol->colVal.value.pData);
  }
}

259
static int32_t tsdbCacheQueryReseek(void* pQHandle) {
260 261 262
  int32_t           code = 0;
  SCacheRowsReader* pReader = pQHandle;

263 264 265 266
  code = taosThreadMutexTryLock(&pReader->readerMutex);
  if (code == 0) {
    // pause current reader's state if not paused, save ts & version for resuming
    // just wait for the big all tables' snapshot untaking for now
267

268
    code = TSDB_CODE_VND_QUERY_BUSY;
269

270 271 272 273 274 275 276 277
    taosThreadMutexUnlock(&pReader->readerMutex);

    return code;
  } else if (code == EBUSY) {
    return TSDB_CODE_VND_QUERY_BUSY;
  } else {
    return -1;
  }
278 279
}

280
int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds, SArray* pTableUidList) {
281
  if (pReader == NULL || pResBlock == NULL) {
282 283 284
    return TSDB_CODE_INVALID_PARA;
  }

285
  SCacheRowsReader* pr = pReader;
286

287
  int32_t    code = TSDB_CODE_SUCCESS;
288
  SLRUCache* lruCache = pr->pVnode->pTsdb->lruCache;
289
  LRUHandle* h = NULL;
H
Haojun Liao 已提交
290
  SArray*    pRow = NULL;
291
  bool       hasRes = false;
292
  SArray*    pLastCols = NULL;
293

294
  void** pRes = taosMemoryCalloc(pr->numOfCols, POINTER_BYTES);
295 296 297 298 299
  if (pRes == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
  }

300
  for (int32_t j = 0; j < pr->numOfCols; ++j) {
X
Xiaoyu Wang 已提交
301 302
    pRes[j] = taosMemoryCalloc(
        1, sizeof(SFirstLastRes) + pr->pSchema->columns[-1 == slotIds[j] ? 0 : slotIds[j]].bytes + VARSTR_HEADER_SIZE);
303 304 305 306
    SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[j]);
    p->ts = INT64_MIN;
  }

307 308 309 310 311 312
  pLastCols = taosArrayInit(pr->pSchema->numOfCols, sizeof(SLastCol));
  if (pLastCols == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
  }

313
  for (int32_t i = 0; i < pr->pSchema->numOfCols; ++i) {
314
    struct STColumn* pCol = &pr->pSchema->columns[i];
315
    SLastCol         p = {.ts = INT64_MIN, .colVal.type = pCol->type, .colVal.flag = CV_FLAG_NULL};
316 317 318 319 320

    if (IS_VAR_DATA_TYPE(pCol->type)) {
      p.colVal.value.pData = taosMemoryCalloc(pCol->bytes, sizeof(char));
    }
    taosArrayPush(pLastCols, &p);
H
Haojun Liao 已提交
321
  }
322

323
  taosThreadMutexLock(&pr->readerMutex);
X
Xiaoyu Wang 已提交
324
  code = tsdbTakeReadSnap((STsdbReader*)pr, tsdbCacheQueryReseek, &pr->pReadSnap);
K
kailixu 已提交
325 326 327
  if (code != TSDB_CODE_SUCCESS) {
    goto _end;
  }
328
  pr->pDataFReader = NULL;
329
  pr->pDataFReaderLast = NULL;
330

331
  // retrieve the only one last row of all tables in the uid list.
H
Haojun Liao 已提交
332
  if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_SINGLE)) {
333
    int64_t st = taosGetTimestampUs();
X
Xiaoyu Wang 已提交
334
    int64_t totalLastTs = INT64_MAX;
335 336
    for (int32_t i = 0; i < pr->numOfTables; ++i) {
      STableKeyInfo* pKeyInfo = &pr->pTableList[i];
337

338
      code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
H
Haojun Liao 已提交
339
      if (code != TSDB_CODE_SUCCESS) {
K
kailixu 已提交
340
        goto _end;
341 342
      }

343
      if (h == NULL) {
344 345
        continue;
      }
346 347
      if (taosArrayGetSize(pRow) <= 0) {
        tsdbCacheRelease(lruCache, h);
348 349 350
        continue;
      }

H
Haojun Liao 已提交
351
      {
352
        bool    hasNotNullRow = true;
X
Xiaoyu Wang 已提交
353
        int64_t singleTableLastTs = INT64_MAX;
H
Haojun Liao 已提交
354
        for (int32_t k = 0; k < pr->numOfCols; ++k) {
355 356 357 358 359
          int32_t slotId = slotIds[k];

          if (slotId == -1) {  // the primary timestamp
            SLastCol* p = taosArrayGet(pLastCols, 0);
            SLastCol* pCol = (SLastCol*)taosArrayGet(pRow, 0);
360
            if (pCol->ts > p->ts) {
361
              hasRes = true;
362 363
              p->ts = pCol->ts;
              p->colVal = pCol->colVal;
X
Xiaoyu Wang 已提交
364
              singleTableLastTs = pCol->ts;
H
Haojun Liao 已提交
365 366 367 368 369 370 371 372 373

              // only set value for last row query
              if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST_ROW)) {
                if (taosArrayGetSize(pTableUidList) == 0) {
                  taosArrayPush(pTableUidList, &pKeyInfo->uid);
                } else {
                  taosArraySet(pTableUidList, 0, &pKeyInfo->uid);
                }
              }
H
Haojun Liao 已提交
374 375
            }
          } else {
376
            SLastCol* p = taosArrayGet(pLastCols, slotId);
H
Haojun Liao 已提交
377 378
            SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);

379
            if (pColVal->ts > p->ts) {
380
              if (!COL_VAL_IS_VALUE(&pColVal->colVal) && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) {
381 382 383
                if (!COL_VAL_IS_VALUE(&p->colVal)) {
                  hasNotNullRow = false;
                }
384 385 386
                continue;
              }

387
              hasRes = true;
388
              p->ts = pColVal->ts;
X
Xiaoyu Wang 已提交
389 390
              if (pColVal->ts < singleTableLastTs && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) {
                singleTableLastTs = pColVal->ts;
391
              }
H
Haojun Liao 已提交
392

H
Haojun Liao 已提交
393 394 395 396 397 398
              if (!IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
                p->colVal = pColVal->colVal;
              } else {
                if (COL_VAL_IS_VALUE(&pColVal->colVal)) {
                  memcpy(p->colVal.value.pData, pColVal->colVal.value.pData, pColVal->colVal.value.nData);
                }
H
Haojun Liao 已提交
399

H
Haojun Liao 已提交
400 401 402 403
                p->colVal.value.nData = pColVal->colVal.value.nData;
                p->colVal.type = pColVal->colVal.type;
                p->colVal.flag = pColVal->colVal.flag;
                p->colVal.cid = pColVal->colVal.cid;
H
Haojun Liao 已提交
404 405 406
              }
            }
          }
407
        }
408 409

        if (hasNotNullRow) {
X
Xiaoyu Wang 已提交
410
          if (INT64_MAX == totalLastTs || (INT64_MAX != singleTableLastTs && totalLastTs < singleTableLastTs)) {
X
Xiaoyu Wang 已提交
411
            totalLastTs = singleTableLastTs;
X
Xiaoyu Wang 已提交
412
          }
413 414
          double cost = (taosGetTimestampUs() - st) / 1000.0;
          if (cost > tsCacheLazyLoadThreshold) {
X
Xiaoyu Wang 已提交
415
            pr->lastTs = totalLastTs;
416
          }
417
        }
418
      }
419

420
      tsdbCacheRelease(lruCache, h);
421
    }
422 423

    if (hasRes) {
H
Haojun Liao 已提交
424
      saveOneRow(pLastCols, pResBlock, pr, slotIds, pRes, pr->idstr);
425
    }
H
Haojun Liao 已提交
426
  } else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) {
427 428
    for (int32_t i = pr->tableIndex; i < pr->numOfTables; ++i) {
      STableKeyInfo* pKeyInfo = &pr->pTableList[i];
429
      code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
H
Haojun Liao 已提交
430
      if (code != TSDB_CODE_SUCCESS) {
K
kailixu 已提交
431
        goto _end;
432 433
      }

434
      if (h == NULL) {
435 436
        continue;
      }
437 438
      if (taosArrayGetSize(pRow) <= 0) {
        tsdbCacheRelease(lruCache, h);
439 440 441
        continue;
      }

H
Haojun Liao 已提交
442
      saveOneRow(pRow, pResBlock, pr, slotIds, pRes, pr->idstr);
443
      // TODO reset the pRes
444

445
      taosArrayPush(pTableUidList, &pKeyInfo->uid);
446
      tsdbCacheRelease(lruCache, h);
447

448 449
      pr->tableIndex += 1;
      if (pResBlock->info.rows >= pResBlock->info.capacity) {
450
        goto _end;
451
      }
452 453
    }
  } else {
454
    code = TSDB_CODE_INVALID_PARA;
455 456
  }

457
_end:
458
  tsdbDataFReaderClose(&pr->pDataFReaderLast);
459 460
  tsdbDataFReaderClose(&pr->pDataFReader);

461
  resetLastBlockLoadInfo(pr->pLoadInfo);
462
  tsdbUntakeReadSnap((STsdbReader*)pr, pr->pReadSnap, true);
463
  taosThreadMutexUnlock(&pr->readerMutex);
464

H
Haojun Liao 已提交
465 466 467 468
  if (pRes != NULL) {
    for (int32_t j = 0; j < pr->numOfCols; ++j) {
      taosMemoryFree(pRes[j]);
    }
469 470 471
  }

  taosMemoryFree(pRes);
472
  taosArrayDestroyEx(pLastCols, freeItem);
473
  return code;
474
}